๐ ๋ณธ ํฌ์คํ ์ PySpark๋ฅผ ํ์ฉํ Kaggle Notebook์ ํ์ฌํ๋ฉด์ ๋ฐฐ์ฐ๊ฒ ๋ ์ฝ๋ ๋ด์ฉ์ ๊ธฐ๋ฐ์ผ๋ก ํฌ์คํ ํ์์ ์๋ ค๋๋ฆฝ๋๋ค. ๋ํ ์์ผ๋ก ์๊ฐ๋ PySpark์ ๋ฌธ๋ฒ์ ๋ํด์ ์์ํ์๋ค๋ฉด ์ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํด ๊ฐ๋จํ ์์๋ฅผ ํตํด ์ดํด๋ฅผ ํ ์ ์์ต๋๋ค.
์ด๋ฒ ํฌ์คํ ์์๋ PySpark ๊ทธ ์ค์์๋ Spark SQL ๊ณผ Spark MLlib์ ํ์ฉํด ์ ์ฌ์์ธ ๊ธฐ๋ฐ์ ์ํ ์ถ์ฒ ์์คํ ์ ๊ตฌํํด๋ณด๋ ค ํ๋ค. ํ์ฉํ ๋ฐ์ดํฐ๋ Kaggle์ ๋ฌด๋น๋ ์ฆ ๋ฐ์ดํฐ์ movies, ratings ๋ฐ์ดํฐ๋ฅผ ํ์๊ฐ ์ง์ ์ ์ฒ๋ฆฌํ ๋ฐ์ดํฐ๋ฅผ ํ์ฉํ๋ค. ์ ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ๋ฅผ ํ์ฉํ๋ ค๋ฉด ์ฌ๊ธฐ๋ฅผ ํด๋ฆญํด ๋ค์ด๋ก๋ ๋ฐ์.
์ถ์ฒ ์์คํ ์ค์๋ ์ปจํ ์ธ ๊ธฐ๋ฐ ์ถ์ฒ, ํ์ ๊ธฐ๋ฐ ์ถ์ฒ ๋ฑ์ ์ฌ๋ฌ ์ข ๋ฅ๊ฐ ์์ง๋ง ์ฌ๊ธฐ์๋ ๋ค๋ฅธ ์ฌ์ฉ์๋ค์ ์ ํ์ ๋ํ ํ์ ์ ๊ธฐ๋ฐ์ผ๋ก ํ์ฌ ๋ค๋ฅธ ์ ํ์ ์ถ์ฒํด์ฃผ๋ ํ์ ํํฐ๋ง ๋ฐฉ๋ฒ์ ์ ์ฌ์ ์์ธ ๊ธฐ๋ฐ ๋ชจ๋ธ์ ์ฌ์ฉํ๋ ค๊ณ ํ๋ค.
1. ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋ฐ์ดํฐ ๋ก๋
ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค์ ์ํฌํธํ๊ณ PySpark๋ฅผ ํ์ฉํ๊ธฐ ์ํ ์คํํฌ ์ธ์ ์ ๋ง๋ค์ด๋ณด์. ๊ทธ๋ฆฌ๊ณ ๋ก๋ํ ๋ฐ์ดํฐ๋ฅผ Pandas ํํ๋ก ๋ฏธ๋ฆฌ๋ณด๊ธฐํด์ ์ด๋ค ํผ์ฒ๋ค์ด ์กด์ฌํ๋์ง ์ดํด๋ณด์.
import sklearn
import random
# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import os
import pandas as pd
import numpy as np
os.chdir('/Users/younghun/Desktop/gitrepo/data/movie_dataset/')
# ์คํํฌ ์ธ์
๋ง๋ค๊ธฐ
spark = SparkSession\
.builder\
.appName('recommender_system')\
.getOrCreate()
df = spark.read.csv(os.getcwd() + '/movie_ratings_df.csv',
inferSchema=True, header=True)
df.limit(3).toPandas()
์ ํ๋ฉด์์ ๋ณด๋ ๊ฒ์ฒ๋ผ ์ ์ ๋ฅผ ๋ํ๋ด๋ userId
, ์ํ์ ์ ๋ชฉ์ธ title
, ๊ทธ ํด๋น ์ํ์ ๋ํ ํ์ ์ธ rating
๋ณ์๋ค์ด ์กด์ฌํ๋ค.
2. ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ
๋ฌธ์์ด ํํ๋ก ์ด๋ฃจ์ด์ ธ์๋ ์ํ ์ ๋ชฉ์ ๊ฐ๋ค์ ์์นํ์ผ๋ก ๋ฐ๊พธ์ด์ฃผ์.
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
stringIndexer = StringIndexer(inputCol='title',
outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df)
indexed.limit(5).toPandas()
3. ์ ์ฌ ์์ธ ๊ธฐ๋ฐ ์ถ์ฒ ์์คํ ALS ๋ชจ๋ธ ๋ง๋ค๊ณ ํ๊ฐํด๋ณด๊ธฐ
๊ตฌ์ถํ ์ถ์ฒ ์์คํ ์ ์ ์ฌ ์์ธ์ ์ฌ์ฉํด ๋ง๋๋ ๊ทธ ์ค์์๋ SGD(Stochastic Gradient Descent)๊ฐ ์๋ ALS(Alternate Least Squares) ๋ชจ๋ธ์ ์ฌ์ฉํด๋ณด๋ ค ํ๋ค. ํธ๋ฆฌํ๊ฒ๋ PySpark์์๋ ALS ๋ชจ๋ธ์ ์ฝ๊ฒ ๊ตฌ์ถํ ์ ์๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ ๊ณตํ๋ค. ALS ๋ฉ์๋์ ์ฃผ์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ ๊น ์ดํด๋ณด๊ณ ์ฝ๋๋ก ๊ตฌํํด๋ณด์.
- regParam : ๋ฐ์ดํฐ์ ํฌ๊ธฐ์ ์์กดํ์ง ์๊ณ ์ถ์ฒ ์์คํ ์ ์ผ๋ฐํ๋ฅผ ์์ผ์ฃผ๊ธฐ์ํ ์ ๊ทํ ํญ์ด๋ค.
- coldStartStrategy = 'drop' : ์์ง ํ๊ฐ๋์ง ์์ ์ฆ, ๊ฒฐ์ธก์น ๊ฐ์ ๊ฐ๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ธํ๊ณ ๋ชจ๋ธ์ ์ฑ๋ฅ์ด ํ๊ฐ๋๋ค.(์ฐธ๊ณ ๋ก 'nan' ๊ฐ์ผ๋ก ํ ๋นํ๋ฉด ๊ฒฐ์ธก์น๋ฅผ ํฌํจํด ๋ชจ๋ธ ์ฑ๋ฅ์ ํ๊ฐํ๋ค.)
- nonnegative = True : Least Squares๋ฅผ ํ ๋ ์์๊ฐ์ด ๋ฏธํฌํจ๋์ด ์๋ ์ ํ์ฌํญ์ ์ค ๊ฒ์ธ์ง๋ฅผ ์ ํํ๋ค. ์ด๋ฒ์ ํ์ฉ๋ ์ํ ๋ฐ์ดํฐ์ ํ์ ๊ณผ ์ํ์ ๋ชฉ, ์ ์ ์์ด๋๋ ๋ชจ๋ ์์๊ฐ์ด ์๊ธฐ ๋๋ฌธ์ True๋ก ํ๋ ๊ฒ์ด ๋ฐ๋์งํ๋ค. ์ด NNF(Non-Negative Factorization ๊ฐ๋ ์ ๋ํด์๋ ์ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํด๋ณด์)
์ฐธ๊ณ ๋ก Cold Start๋, ์๋ก์ด ์ ์ ๊ฐ ๋ํ๋ฌ์ ๋, ์๋ก์ด ์ ์ ์ ๋ํ ํ์คํ ๋ฆฌ๊ฐ ์๊ธฐ ๋๋ฌธ์ ์ด ์ ์ ์๊ฒ ์ ํ์ธ ์ถ์ฒํด์ค ์ ์๋ ํ์์ ๋งํ๋ค.
# ALS recommender algorithm
from pyspark.ml.recommendation import ALS
train, test = indexed.randomSplit([0.75, 0.25])
rec = ALS(maxIter=10,
regParam=0.01,
userCol='userId',
itemCol='title_new',
ratingCol='rating', # label -> predictํ ๋๋ ํ์ ์์!
nonnegative=True,
coldStartStrategy='drop')
# ALS๋ชจ๋ธ ํ์ต -> dataframe์ ๋ฃ์ด์ฃผ๊ธฐ
rec_model = rec.fit(train)
# transform์ ์ด์ฉํด ์์ธก -> dataframe์ ๋ฃ์ด์ฃผ๊ธฐ
pred_ratings = rec_model.transform(test)
pred_ratings.limit(5).toPandas()
์ด์ ์์ธก๊ฐ์ ๊ณ์ฐํ์ผ๋ ์ค์ ํ์ ๊ณผ ์์ธก ํ์ ์ฌ์ด์ ์ฐจ์ด๊ฐ์ผ๋ก RMSE์ MAE๊ฐ์ ์ธก์ ํด๋ณด์.
# Get metric for training
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='rating',
predictionCol='prediction',
metricName='rmse')
# evaluate ๋ฉ์๋์ ์์ธก๊ฐ ๋ด๊ฒจ์๋ dataframe ๋ฃ์ด์ฃผ๊ธฐ
rmse = evaluator.evaluate(pred_ratings)
mae_eval = RegressionEvaluator(labelCol='rating',
predictionCol='prediction',
metricName='mae')
mae = mae_eval.evaluate(pred_ratings)
print("RMSE:", rmse)
print("MAE:", mae)
4. ํน์ ์ ์ ๊ฐ ์ข์ํ ๋งํ ์ํ๋ฅผ ์ถ์ฒํด์ฃผ๊ธฐ
์ด์ ํน์ ์ ์ ๋ฅผ ํ๋ผ๋ฏธํฐ๋ก ์ ๋ ฅ์์ผฐ์ ๋ ALS ์ถ์ฒ ๋ชจ๋ธ์ ์ ์ฉํด ๊ทธ ํน์ ์ ์ ๊ฐ ์ข์ํ ๋งํ ์ํ๋ฅผ ์๋์ผ๋ก ์ถ์ฒํด์ฃผ๋ ํจ์๋ฅผ ๋ง๋ค์ด๋ณด์. ํจ์๋ฅผ ์ ์ํ ๋ ํ์๊ฐ ์ฃผ์์ผ๋ก ์์ธํ๊ฒ ์ฝ๋ ํ์คํ์ค ์ค๋ช ์ ์จ๋์๋ค.
import sklearn
import random
import os
import pandas as pd
import numpy as np
os.chdir('/Users/younghun/Desktop/gitrepo/data/movie_dataset/')
# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.recommendation import ALS
df = spark.read.csv(os.getcwd() + '/movie_ratings_df.csv',
inferSchema=True, header=True)
stringIndexer = StringIndexer(inputCol='title',
outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df)
# ์ซ์๋ก ๋ฐ๊พผ ์ํ์ ๋ชฉ๋ค ์ค Uniqueํ ๊ฐ๋ค๋ง ๋ด์ ์ถ์ถํ๊ธฐ -> Dataframe ๋ฐํ
unique_movies = indexed.select("title_new").distinct()
def top_movies(user_id, n):
"""
ํน์ user_id๊ฐ ์ข์ํ ๋งํ n๊ฐ์ ์ํ ์ถ์ฒํด์ฃผ๋ ํจ์
"""
# unique_movies ๋ฐ์ดํฐํ๋ ์์ 'a'๋ผ๋ ๋ฐ์ดํฐํ๋ ์์ผ๋ก alias์ํค๊ธฐ
a = unique_movies.alias('a')
# ํน์ user_id๊ฐ ๋ณธ ์ํ๋ค๋ง ๋ด์ ์๋ก์ด ๋ฐ์ดํฐํ๋ ์ ์์ฑ
watched_movies = indexed.filter(indexed['userId'] == user_id)\
.select('title_new')
# ํน์ user_id๊ฐ ๋ณธ ์ํ๋ค์ 'b'๋ผ๋ ๋ฐ์ดํฐํ๋ ์์ผ๋ก alias์ํค๊ธฐ
b = watched_movies.alias('b')
# unique_movies๋ฅผ ๊ธฐ์ค์ผ๋ก watched_movies๋ฅผ ์กฐ์ธ์์ผ์ user_id๊ฐ ๋ณด์ง ๋ชปํ ์ํ๋ค ํ์
๊ฐ๋ฅ
total_movies = a.join(b, a['title_new'] == b['title_new'],
how='left')
# b ๋ฐ์ดํฐํ๋ ์์ title_new๊ฐ์ด ๊ฒฐ์ธก์น๋ฅผ ๊ฐ๊ณ ์๋ ํ์ a.title_new๋ฅผ ๋ฝ์๋์ผ๋ก์จ user_id๊ฐ ์์ง ๋ชป๋ณธ ์ํ๋ค ์ถ์ถ
# col('b.title_new') => b ๋ฐ์ดํฐํ๋ ์์ title_new์นผ๋ผ ์๋ฏธ(SQL์ฒ๋ผ ๊ฐ๋ฅ!)
remaining_movies = total_movies\
.where(col('b.title_new').isNull())\
.select('a.title_new').distinct()
# remaining_movies ๋ฐ์ดํฐํ๋ ์์ ํน์ user_id๊ฐ์ ๋์ผํ๊ฒ ์๋ก์ด ๋ณ์๋ก ์ถ๊ฐํด์ฃผ๊ธฐ
remaining_movies = remaining_movies.withColumn('userId',
lit(int(user_id)))
# ์์์ ๋ง๋ ALS ๋ชจ๋ธ์ ์ฌ์ฉํ์ฌ ์ถ์ฒ ํ์ ์์ธก ํ n๊ฐ ๋งํผ view ->
recommender = rec_model.transform(remaining_movies)\
.orderBy('prediction', ascending=False)\
.limit(n)
# StringIndexer๋ก ๋ง๋ ๊ฒ์ ์ญ์ผ๋ก ๋ฐ๊พธ๊ธฐ ์ํด IndexToString์ฌ์ฉ(์ํ์ ๋ชฉ์ ์ซ์->ํ๊ธ์ ๋ชฉ)
movie_title = IndexToString(inputCol='title_new',
outputCol='title',
labels=model.labels) #์ฌ๊ธฐ์ model.labels๋ StringIndexer์์ fit์์ผฐ์ ๋ ์๊ธด ๋ ์ด๋ธ. ์ฆ, ์ํ ์ ๋ชฉ๋ค
# transformํด์ ์ํ์ ๋ชฉ์ ์ซ์->ํ๊ธ๋ก ๋ณํ! => dataframe์ผ๋ก ๋ฐํ
final_recommendations = movie_title.transform(recommender)
return final_recommendations.show(n, truncate=False)
# userid๊ฐ 1817๋ฒ์ธ ์ ์ ๊ฐ ๋ณผ๋งํ ์ํ ์์ 5๊ฐ ์ถ์ฒํด์ฃผ๊ธฐ
top_movies(1817, 5)
'Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ - Ingestion&Transformation (0) | 2021.04.23 |
---|---|
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ ๊ตฌ์กฐ์ Sources (0) | 2021.04.23 |
[PySpark] PySpark๋ก Regression ๋ชจ๋ธ ๋ง๋ค๊ธฐ (0) | 2021.02.04 |
[PySpark] ํ์ดํ๋ ๋ฐ์ดํฐ๋ก ๋ถ๋ฅ ๋ชจ๋ธ ๋ง๋ค๊ธฐ (2) | 2021.02.03 |
[PySpark] Spark SQL ํํ ๋ฆฌ์ผ (0) | 2021.02.01 |