๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] ์ปจํ…์ธ  ๊ธฐ๋ฐ˜ ์˜ํ™” ์ถ”์ฒœ ์‹œ์Šคํ…œ ๋งŒ๋“ค์–ด๋ณด๊ธฐ

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ๋ณธ ํฌ์ŠคํŒ…์€ PySpark๋ฅผ ํ™œ์šฉํ•œ Kaggle Notebook์„ ํ•„์‚ฌํ•˜๋ฉด์„œ ๋ฐฐ์šฐ๊ฒŒ ๋œ ์ฝ”๋“œ ๋‚ด์šฉ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํฌ์ŠคํŒ…ํ–ˆ์Œ์„ ์•Œ๋ ค๋“œ๋ฆฝ๋‹ˆ๋‹ค. ๋˜ํ•œ ์•ž์œผ๋กœ ์†Œ๊ฐœ๋  PySpark์˜ ๋ฌธ๋ฒ•์— ๋Œ€ํ•ด์„œ ์ƒ์†Œํ•˜์‹œ๋‹ค๋ฉด ์—ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ•ด ๊ฐ„๋‹จํ•œ ์˜ˆ์‹œ๋ฅผ ํ†ตํ•ด ์ดํ•ด๋ฅผ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” PySpark ๊ทธ ์ค‘์—์„œ๋„ Spark SQL ๊ณผ Spark MLlib์„ ํ™œ์šฉํ•ด ์ž ์žฌ์š”์ธ ๊ธฐ๋ฐ˜์˜ ์˜ํ™” ์ถ”์ฒœ ์‹œ์Šคํ…œ์„ ๊ตฌํ˜„ํ•ด๋ณด๋ ค ํ•œ๋‹ค. ํ™œ์šฉํ•œ ๋ฐ์ดํ„ฐ๋Š” Kaggle์˜ ๋ฌด๋น„๋ Œ์ฆˆ ๋ฐ์ดํ„ฐ์˜ movies, ratings ๋ฐ์ดํ„ฐ๋ฅผ ํ•„์ž๊ฐ€ ์ง์ ‘ ์ „์ฒ˜๋ฆฌํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ–ˆ๋‹ค. ์ „์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ•˜๋ ค๋ฉด ์—ฌ๊ธฐ๋ฅผ ํด๋ฆญํ•ด ๋‹ค์šด๋กœ๋“œ ๋ฐ›์ž.

 

์˜ํ™”๋ฅผ ์ถ”์ฒœํ•ด์ฃผ๋Š” ์‹œ์Šคํ…œ์„ ๋งŒ๋“ค์–ด๋ณด์ž

 

์ถ”์ฒœ ์‹œ์Šคํ…œ ์ค‘์—๋Š” ์ปจํ…์ธ  ๊ธฐ๋ฐ˜ ์ถ”์ฒœ, ํ˜‘์—… ๊ธฐ๋ฐ˜ ์ถ”์ฒœ ๋“ฑ์˜ ์—ฌ๋Ÿฌ ์ข…๋ฅ˜๊ฐ€ ์žˆ์ง€๋งŒ ์—ฌ๊ธฐ์„œ๋Š” ๋‹ค๋ฅธ ์‚ฌ์šฉ์ž๋“ค์˜ ์ œํ’ˆ์— ๋Œ€ํ•œ ํ‰์ ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•˜์—ฌ ๋‹ค๋ฅธ ์ œํ’ˆ์„ ์ถ”์ฒœํ•ด์ฃผ๋Š” ํ˜‘์—… ํ•„ํ„ฐ๋ง ๋ฐฉ๋ฒ•์˜ ์ž ์žฌ์  ์š”์ธ ๊ธฐ๋ฐ˜ ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•˜๋ ค๊ณ  ํ•œ๋‹ค.

1. ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์™€ ๋ฐ์ดํ„ฐ ๋กœ๋“œ

ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋“ค์„ ์ž„ํฌํŠธํ•˜๊ณ  PySpark๋ฅผ ํ™œ์šฉํ•˜๊ธฐ ์œ„ํ•œ ์ŠคํŒŒํฌ ์„ธ์…˜์„ ๋งŒ๋“ค์–ด๋ณด์ž. ๊ทธ๋ฆฌ๊ณ  ๋กœ๋“œํ•œ ๋ฐ์ดํ„ฐ๋ฅผ Pandas ํ˜•ํƒœ๋กœ ๋ฏธ๋ฆฌ๋ณด๊ธฐํ•ด์„œ ์–ด๋–ค ํ”ผ์ฒ˜๋“ค์ด ์กด์žฌํ•˜๋Š”์ง€ ์‚ดํŽด๋ณด์ž.

 

import sklearn
import random 

# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import os
import pandas as pd
import numpy as np
os.chdir('/Users/younghun/Desktop/gitrepo/data/movie_dataset/')

# ์ŠคํŒŒํฌ ์„ธ์…˜ ๋งŒ๋“ค๊ธฐ
spark = SparkSession\
        .builder\
        .appName('recommender_system')\
        .getOrCreate()

df = spark.read.csv(os.getcwd() + '/movie_ratings_df.csv',
                   inferSchema=True, header=True)
df.limit(3).toPandas()

 

๋ฐ์ดํ„ฐ ๋กœ๋“œ ํ™”๋ฉด

 

์œ„ ํ™”๋ฉด์—์„œ ๋ณด๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ์œ ์ €๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” userId, ์˜ํ™”์˜ ์ œ๋ชฉ์ธ title, ๊ทธ ํ•ด๋‹น ์˜ํ™”์— ๋Œ€ํ•œ ํ‰์ ์ธ rating ๋ณ€์ˆ˜๋“ค์ด ์กด์žฌํ•œ๋‹ค.

2. ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ

๋ฌธ์ž์—ด ํ˜•ํƒœ๋กœ ์ด๋ฃจ์–ด์ ธ์žˆ๋Š” ์˜ํ™” ์ œ๋ชฉ์˜ ๊ฐ’๋“ค์„ ์ˆ˜์น˜ํ˜•์œผ๋กœ ๋ฐ”๊พธ์–ด์ฃผ์ž.

 

from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString

stringIndexer = StringIndexer(inputCol='title',
                             outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.limit(5).toPandas()

 

์˜ํ™” ์ œ๋ชฉ์„ ์ˆซ์ž์˜ ๊ฐ’์œผ๋กœ ์ธ์ฝ”๋”ฉ ํ•˜์ž

3. ์ž ์žฌ ์š”์ธ ๊ธฐ๋ฐ˜ ์ถ”์ฒœ ์‹œ์Šคํ…œ ALS ๋ชจ๋ธ ๋งŒ๋“ค๊ณ  ํ‰๊ฐ€ํ•ด๋ณด๊ธฐ

๊ตฌ์ถ•ํ•  ์ถ”์ฒœ ์‹œ์Šคํ…œ์€ ์ž ์žฌ ์š”์ธ์„ ์‚ฌ์šฉํ•ด ๋งŒ๋“œ๋Š” ๊ทธ ์ค‘์—์„œ๋„ SGD(Stochastic Gradient Descent)๊ฐ€ ์•„๋‹Œ ALS(Alternate Least Squares) ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•ด๋ณด๋ ค ํ•œ๋‹ค. ํŽธ๋ฆฌํ•˜๊ฒŒ๋„ PySpark์—์„œ๋„ ALS ๋ชจ๋ธ์„ ์‰ฝ๊ฒŒ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์ œ๊ณตํ•œ๋‹ค. ALS ๋ฉ”์†Œ๋“œ์˜ ์ฃผ์š” ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ž ๊น ์‚ดํŽด๋ณด๊ณ  ์ฝ”๋“œ๋กœ ๊ตฌํ˜„ํ•ด๋ณด์ž.

 

  • regParam : ๋ฐ์ดํ„ฐ์…‹ ํฌ๊ธฐ์— ์˜์กดํ•˜์ง€ ์•Š๊ณ  ์ถ”์ฒœ ์‹œ์Šคํ…œ์˜ ์ผ๋ฐ˜ํ™”๋ฅผ ์‹œ์ผœ์ฃผ๊ธฐ์œ„ํ•œ ์ •๊ทœํ™” ํ•ญ์ด๋‹ค.
  • coldStartStrategy = 'drop' : ์•„์ง ํ‰๊ฐ€๋˜์ง€ ์•Š์€ ์ฆ‰, ๊ฒฐ์ธก์น˜ ๊ฐ’์„ ๊ฐ–๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ œ์™ธํ•˜๊ณ  ๋ชจ๋ธ์˜ ์„ฑ๋Šฅ์ด ํ‰๊ฐ€๋œ๋‹ค.(์ฐธ๊ณ ๋กœ 'nan' ๊ฐ’์œผ๋กœ ํ• ๋‹นํ•˜๋ฉด ๊ฒฐ์ธก์น˜๋ฅผ ํฌํ•จํ•ด ๋ชจ๋ธ ์„ฑ๋Šฅ์„ ํ‰๊ฐ€ํ•œ๋‹ค.)
  • nonnegative = True : Least Squares๋ฅผ ํ•  ๋•Œ ์Œ์ˆ˜๊ฐ’์ด ๋ฏธํฌํ•จ๋˜์–ด ์žˆ๋Š” ์ œํ•œ์‚ฌํ•ญ์„ ์ค„ ๊ฒƒ์ธ์ง€๋ฅผ ์„ ํƒํ•œ๋‹ค. ์ด๋ฒˆ์— ํ™œ์šฉ๋œ ์˜ํ™” ๋ฐ์ดํ„ฐ์˜ ํ‰์ ๊ณผ ์˜ํ™”์ œ๋ชฉ, ์œ ์ € ์•„์ด๋””๋Š” ๋ชจ๋‘ ์Œ์ˆ˜๊ฐ’์ด ์—†๊ธฐ ๋•Œ๋ฌธ์— True๋กœ ํ•˜๋Š” ๊ฒƒ์ด ๋ฐ”๋žŒ์งํ•˜๋‹ค. ์ด NNF(Non-Negative Factorization ๊ฐœ๋…์— ๋Œ€ํ•ด์„œ๋Š” ์—ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ•ด๋ณด์ž)

์ฐธ๊ณ ๋กœ Cold Start๋ž€, ์ƒˆ๋กœ์šด ์œ ์ €๊ฐ€ ๋‚˜ํƒ€๋‚ฌ์„ ๋•Œ, ์ƒˆ๋กœ์šด ์œ ์ €์— ๋Œ€ํ•œ ํžˆ์Šคํ† ๋ฆฌ๊ฐ€ ์—†๊ธฐ ๋•Œ๋ฌธ์— ์ด ์œ ์ €์—๊ฒŒ ์ œํ’ˆ์šธ ์ถ”์ฒœํ•ด์ค„ ์ˆ˜ ์—†๋Š” ํ˜„์ƒ์„ ๋งํ•œ๋‹ค.

 

# ALS recommender algorithm
from pyspark.ml.recommendation import ALS

train, test = indexed.randomSplit([0.75, 0.25])

rec = ALS(maxIter=10,
         regParam=0.01,
         userCol='userId',
         itemCol='title_new',
         ratingCol='rating', # label -> predictํ•  ๋•Œ๋Š” ํ•„์š” ์—†์Œ!
         nonnegative=True,
         coldStartStrategy='drop')
# ALS๋ชจ๋ธ ํ•™์Šต -> dataframe์„ ๋„ฃ์–ด์ฃผ๊ธฐ
rec_model = rec.fit(train)

# transform์„ ์ด์šฉํ•ด ์˜ˆ์ธก -> dataframe์„ ๋„ฃ์–ด์ฃผ๊ธฐ
pred_ratings = rec_model.transform(test)
pred_ratings.limit(5).toPandas()

 

ALS ๋ชจ๋ธ๋กœ ์˜ˆ์ธกํ•œ ๊ฒฐ๊ณผ

 

์ด์ œ ์˜ˆ์ธก๊ฐ’์„ ๊ณ„์‚ฐํ–ˆ์œผ๋‹ˆ ์‹ค์ œ ํ‰์ ๊ณผ ์˜ˆ์ธก ํ‰์  ์‚ฌ์ด์˜ ์ฐจ์ด๊ฐ’์œผ๋กœ RMSE์™€ MAE๊ฐ’์„ ์ธก์ •ํ•ด๋ณด์ž.

 

# Get metric for training
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol='rating',
                               predictionCol='prediction',
                               metricName='rmse')
# evaluate ๋ฉ”์†Œ๋“œ์— ์˜ˆ์ธก๊ฐ’ ๋‹ด๊ฒจ์žˆ๋Š” dataframe ๋„ฃ์–ด์ฃผ๊ธฐ
rmse = evaluator.evaluate(pred_ratings)

mae_eval = RegressionEvaluator(labelCol='rating',
                              predictionCol='prediction',
                              metricName='mae')
mae = mae_eval.evaluate(pred_ratings)

print("RMSE:", rmse)
print("MAE:", mae)

 

ํ‰๊ฐ€ ๊ฒฐ๊ณผ

4. ํŠน์ • ์œ ์ €๊ฐ€ ์ข‹์•„ํ• ๋งŒํ•œ ์˜ํ™”๋ฅผ ์ถ”์ฒœํ•ด์ฃผ๊ธฐ

์ด์ œ ํŠน์ • ์œ ์ €๋ฅผ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ž…๋ ฅ์‹œ์ผฐ์„ ๋•Œ ALS ์ถ”์ฒœ ๋ชจ๋ธ์„ ์ ์šฉํ•ด ๊ทธ ํŠน์ • ์œ ์ €๊ฐ€ ์ข‹์•„ํ• ๋งŒํ•œ ์˜ํ™”๋ฅผ ์ž๋™์œผ๋กœ ์ถ”์ฒœํ•ด์ฃผ๋Š” ํ•จ์ˆ˜๋ฅผ ๋งŒ๋“ค์–ด๋ณด์ž. ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•  ๋•Œ ํ•„์ž๊ฐ€ ์ฃผ์„์œผ๋กœ ์ž์„ธํ•˜๊ฒŒ ์ฝ”๋“œ ํ•œ์ค„ํ•œ์ค„ ์„ค๋ช…์„ ์จ๋†“์•˜๋‹ค.

 

import sklearn
import random
import os
import pandas as pd
import numpy as np
os.chdir('/Users/younghun/Desktop/gitrepo/data/movie_dataset/')

# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.recommendation import ALS

df = spark.read.csv(os.getcwd() + '/movie_ratings_df.csv',
                   inferSchema=True, header=True)

stringIndexer = StringIndexer(inputCol='title',
                             outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df)

# ์ˆซ์ž๋กœ ๋ฐ”๊พผ ์˜ํ™”์ œ๋ชฉ๋“ค ์ค‘ Uniqueํ•œ ๊ฐ’๋“ค๋งŒ ๋‹ด์•„ ์ถ”์ถœํ•˜๊ธฐ -> Dataframe ๋ฐ˜ํ™˜
unique_movies = indexed.select("title_new").distinct()

def top_movies(user_id, n):
    """
    ํŠน์ • user_id๊ฐ€ ์ข‹์•„ํ•  ๋งŒํ•œ n๊ฐœ์˜ ์˜ํ™” ์ถ”์ฒœํ•ด์ฃผ๋Š” ํ•จ์ˆ˜
    """
    # unique_movies ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ 'a'๋ผ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ alias์‹œํ‚ค๊ธฐ
    a = unique_movies.alias('a')

    # ํŠน์ • user_id๊ฐ€ ๋ณธ ์˜ํ™”๋“ค๋งŒ ๋‹ด์€ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์ƒ์„ฑ
    watched_movies = indexed.filter(indexed['userId'] == user_id)\
                            .select('title_new')

    # ํŠน์ • user_id๊ฐ€ ๋ณธ ์˜ํ™”๋“ค์„ 'b'๋ผ๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ alias์‹œํ‚ค๊ธฐ
    b = watched_movies.alias('b')

    # unique_movies๋ฅผ ๊ธฐ์ค€์œผ๋กœ watched_movies๋ฅผ ์กฐ์ธ์‹œ์ผœ์„œ user_id๊ฐ€ ๋ณด์ง€ ๋ชปํ•œ ์˜ํ™”๋“ค ํŒŒ์•… ๊ฐ€๋Šฅ
    total_movies = a.join(b, a['title_new'] == b['title_new'],
                         how='left')

    # b ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ title_new๊ฐ’์ด ๊ฒฐ์ธก์น˜๋ฅผ ๊ฐ–๊ณ  ์žˆ๋Š” ํ–‰์˜ a.title_new๋ฅผ ๋ฝ‘์•„๋ƒ„์œผ๋กœ์จ user_id๊ฐ€ ์•„์ง ๋ชป๋ณธ ์˜ํ™”๋“ค ์ถ”์ถœ
    # col('b.title_new') => b ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ title_new์นผ๋Ÿผ ์˜๋ฏธ(SQL์ฒ˜๋Ÿผ ๊ฐ€๋Šฅ!)
    remaining_movies = total_movies\
                       .where(col('b.title_new').isNull())\
                       .select('a.title_new').distinct()
    # remaining_movies ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ํŠน์ • user_id๊ฐ’์„ ๋™์ผํ•˜๊ฒŒ ์ƒˆ๋กœ์šด ๋ณ€์ˆ˜๋กœ ์ถ”๊ฐ€ํ•ด์ฃผ๊ธฐ
    remaining_movies = remaining_movies.withColumn('userId',
                                                  lit(int(user_id)))
    # ์œ„์—์„œ ๋งŒ๋“  ALS ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ถ”์ฒœ ํ‰์  ์˜ˆ์ธก ํ›„ n๊ฐœ ๋งŒํผ view -> 
    recommender = rec_model.transform(remaining_movies)\
                           .orderBy('prediction', ascending=False)\
                           .limit(n)
    # StringIndexer๋กœ ๋งŒ๋“  ๊ฒƒ์„ ์—ญ์œผ๋กœ ๋ฐ”๊พธ๊ธฐ ์œ„ํ•ด IndexToString์‚ฌ์šฉ(์˜ํ™”์ œ๋ชฉ์„ ์ˆซ์ž->ํ•œ๊ธ€์ œ๋ชฉ)
    movie_title = IndexToString(inputCol='title_new',
                               outputCol='title',
                               labels=model.labels) #์—ฌ๊ธฐ์„œ model.labels๋Š” StringIndexer์—์„œ fit์‹œ์ผฐ์„ ๋•Œ ์ƒ๊ธด ๋ ˆ์ด๋ธ”. ์ฆ‰, ์˜ํ™” ์ œ๋ชฉ๋“ค
    # transformํ•ด์„œ ์˜ํ™”์ œ๋ชฉ์„ ์ˆซ์ž->ํ•œ๊ธ€๋กœ ๋ณ€ํ™˜! => dataframe์œผ๋กœ ๋ฐ˜ํ™˜
    final_recommendations = movie_title.transform(recommender)

    return final_recommendations.show(n, truncate=False)

# userid๊ฐ€ 1817๋ฒˆ์ธ ์œ ์ €๊ฐ€ ๋ณผ๋งŒํ•œ ์˜ํ™” ์ƒ์œ„ 5๊ฐœ ์ถ”์ฒœํ•ด์ฃผ๊ธฐ
top_movies(1817, 5)

 

1817๋ฒˆ ์œ ์ €๊ฐ€ ๋ณผ๋งŒํ•œ 5๊ฐœ์˜ ์˜ํ™”

๋ฐ˜์‘ํ˜•