๐ ๋ณธ ํฌ์คํ ์ PySpark๋ฅผ ํ์ฉํ Kaggle Notebook์ ํ์ฌํ๋ฉด์ ๋ฐฐ์ฐ๊ฒ ๋ ์ฝ๋ ๋ด์ฉ์ ๊ธฐ๋ฐ์ผ๋ก ํฌ์คํ ํ์์ ์๋ ค๋๋ฆฝ๋๋ค. ๋ํ ์์ผ๋ก ์๊ฐ๋ PySpark์ ๋ฌธ๋ฒ์ ๋ํด์ ์์ํ์๋ค๋ฉด ์ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํด ๊ฐ๋จํ ์์๋ฅผ ํตํด ์ดํด๋ฅผ ํ ์ ์์ต๋๋ค.
์ด๋ฒ ํฌ์คํ ์์๋ PySpark ๊ทธ ์ค์์๋ Spark SQL ๊ณผ Spark MLlib์ ํ์ฉํ ๋จธ์ ๋ฌ๋ ํ๊ท ๋ชจ๋ธ์ ๋ง๋๋ ๋ฐฉ๋ฒ์ ๋ํด ์๊ฐํ๋ ค๊ณ ํ๋ค. ํ์ฉํ ๋ฐ์ดํฐ๋ California Housing ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ๋ค. ๋ฐ์ดํฐ๋ ์ฌ๊ธฐ๋ฅผ ๋๋ฌ ํ๋จ์ Input ๋ชฉ์ฐจ๋ฅผ ๋ณด๋ฉด ๋ค์ด๋ก๋ ๋ฐ์ ์ ์๋ค.
์ด์ ํฌ์คํ ์์ ์ค์ตํด๋ดค๋ ๋ถ๋ฅ ๋ชจ๋ธ๊ณผ ํฐ ํ๋ ์์ด ๋ค๋ฅด์ง ์๋ค. ๋ฐ๋ผ์ ์ด๋ฒ ํฌ์คํ ์์๋ ์์นํ ๋ณ์์ ์ฃผ๋ก ์ ์ฉํ๋ ๊ธฐ์ ํต๊ณ๋ ๊ฐ์ ๋ณด๋ ๋ฐฉ๋ฒ์ด๋ Scaling ๋ฐฉ๋ฒ์ PySpark๋ก ์ด๋ป๊ฒ ํ๋์ง์ ๋ํด ์ด์ ์ ๋ง์ถ์ด ์ค์ตํด ๋๊ฐ๋ฉด ์ข์ ๊ฒ ๊ฐ๋ค.
1. ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋ฐ์ดํฐ ๋ก๋
ํญ์ ๊ทธ๋์๋ฏ์ด ๋ถ์ ๋ฐ ๋ชจ๋ธ๋ง์ ํ๊ธฐ ์ํด ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค๊ณผ ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํด๋ณด์. ๋จผ์ ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค์ด๋ค.
import os
import pandas as pd
import numpy as np
# pyspark for objecy, sql
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
# pyspark for ML
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
import seaborn as sns
import matplotlib.pyplot as plt
# Setting for visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)
from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid',
rc={'figure.figsize': (18, 4)})
rcParams['figure.figsize'] = 18, 4
# ๋
ธํธ๋ถ ์ฌ์คํ์ ๋๋นํ๊ธฐ ์ํด ๋๋ค ์๋ ์ค์ ํด๋๊ธฐ
rnd_seed = 23
np.random.seed = rnd_seed
np.random.set_state = rnd_seed
๊ทธ๋ฐ๋ฐ ์ด๋ฒ ๋ฐ์ดํฐ๋ ํน์ดํ๊ฒ ๋ฐ์ดํฐ๊ฐ data ํ์ฅ๋ช ์ผ๋ก ๋ ํ์ผ์ด๋ฉฐ ๋ฐ์ดํฐ์ header ์ฆ, ์นผ๋ผ ๋ค์๋ค์ด ์๊ธฐ ๋๋ฌธ์ ์ฌ์ฉ์๊ฐ ์ง์ ์คํค๋ง๋ฅผ ์ ์ํด์ฃผ๊ณ ํ๋ก๊ทธ๋๋ฐ์ผ๋ก ๋ช ์ํด์ฃผ์ด์ผ ํ๋ค. ์ด์ ๋ํ ๋ถ๋ถ์ ์ด์ ํฌ์คํ 4๋ฒ ๋ชฉ์ฐจ์์ ๋ค๋ฃจ์์ผ๋ ์ฐธ๊ณ ๋ฐ๋๋ค.
## ๋ฐ์ดํฐ ์ค๋ฏธ์นด ์ง์ ๋ช
์ํด์ฃผ๊ธฐ ##
data_path = '/Users/younghun/Desktop/gitrepo/data/cal_housing.data'
# data์ ์นผ๋ผ๋ช
์ ์คํค๋ง๋ก ์ ์ํด์ฃผ๊ธฐ
schema_string = 'long lat medage totrooms totbdrms pop houshlds medinc medhv'
fields = [StructField(field, FloatType(), True)for field in schema_string.split()]
schema = StructType(fields)
## ๋ฐ์ดํฐ ๋ก๋๋ฅผ ์ํ SparkSession ๋ง๋ค์ด์ฃผ๊ธฐ ##
spark = SparkSession.builder.master('local[2]')\
.appName('Linear-Regression-California-Housing')\
.getOrCreate()
# ๋ฐ์ดํฐํ์ผ ๋ก๋
# cache ๋ฉ์๋๋ฅผ ์ด์ฉํด์ ๋ฉ๋ชจ๋ฆฌ์ keepํด๋๊ธฐ
housing_df = spark.read.csv(path=data_path, schema=schema).cache()
# ์์ 5๊ฐ ํ ๋ฏธ๋ฆฌ ๋ณด๊ธฐ -> ํ๋์ Row๊ฐ namedTuple ํํ๋ก ๋์ด ์์..!
housing_df.take(5)
์ ์ฝ๋์์ ํน์ดํ ๋ถ๋ถ์ take()
๋ผ๋ ๋ถ๋ถ์ ์ฌ์ฉํ ๊ฒ์ธ๋ฐ, ์ด์ ์ ์ฌ์ฉํ๋ limit
๊ณผ show
์๋ ๋ฌ๋ฆฌ ๋ฐ์ดํฐ ๋ฏธ๋ฆฌ๋ณด๊ธฐ๋ฅผ Python named tuple ํํ์ธ DataFrame์ Row ํ์์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณด์ฌ์ค๋ค.
๋ฐ๋ฉด์ show
๋ฉ์๋๋ ๋ค์๊ณผ ๊ฐ์ด ๋ณด์ฌ์ค๋ค.
2. ๋ฐ์ดํฐ ํ์
์ด์ ์ค์ต์์ ์ด๋ฏธ ์ฌ์ฉํด๋ณด์๋ ๋ฉ์๋๋ค์ ์ค๋ช ์ ์๋ตํ๊ณ ์๋กญ๊ฒ ๋ฑ์ฅํ๋ ๋ฉ์๋์ ๋ํด์๋ง ์ค๋ช ํ๋ ค๊ณ ํ๋ค.
df.sort('columnA', ascending=True)
: columnA ๊ฐ์ ๊ธฐ์ค์ผ๋ก ์ค๋ฆ์ฐจ์์ผ๋ก ์ ๋ ฌํ๋ค. ๋ํดํธ๊ฐ์ True๋ก False๋ฉด ๋ด๋ฆผ์ฐจ์์ผ๋ก ์ ๋ ฌํ๋ค. ์ด๋ Pandas API์df.sort_values(by='columnA', ascending=True)
์ ๋น์ทํ๋ค.df.describe()
: ์์นํ ๋ณ์๋ค์ ๊ธฐ์ ํต๊ณ๋์ ๋ณด์ฌ์ฃผ๋ ๋ฉ์๋๋ก Pandas API ํํ์ ๋งค์ฐ ๋น์ทํ๋ค. ๋จ, PySpark์์๋ ๊ธฐ์ ํต๊ณ๋ ๊ฐ์ ์ง์ ๊ด์ฐฐํ๊ณ ์ถ๋ค๋ฉดshow()
์ ๊ฐ์ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ๋ฉ์๋๋ฅผ ๋ถ์ฌdf.describe().show(10)
์ด๋ฐ ์์ผ๋ก ํด์ฃผ์ด์ผ ํ๋ค.
# ๊ทธ๋ฃนํํด์ ์ง๊ณํด๋ณด๊ธฐ
result_df = housing_df.groupBy('medage').count()
# ๊ฐ ์ ๋ ฌ ๊ธฐ์ค ์ค์ ํด์ฃผ๊ณ ๋ด๋ฆผ์ฐจ์์ผ๋ก ์ ๋ ฌ
result_df.sort('medage', ascending=False).show(5)
# ์์นํ ๋ณ์๋ค ๊ธฐ์ ํต๊ณ๋ ์ดํด๋ณด๊ธฐ
housing_df.describe().show(10)
๋ค์์ ์์นํ ๋ณ์ ๊ฐ๋ค์ ์๋ฆฟ์๋ฅผ ์ค์ฌ์ฃผ๋ ๊ฒ๊ณผ ๊ฐ์ด SQL์์ ์ฌ์ฉํ ์ ์๋ ์ฌ๋ฌ๊ฐ์ง ํจ์๊ฐ ๋ด๊ฒจ์๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํด๋ณด์. ํ๋จ์ ์ฝ๋์์ F
๋ pyspark.sql.functions
๋ฅผ F
๋ก alias ์ํจ ๊ฒ์ด๋ค.
# ๋ฐํ๋๋ ํต๊ณ๋ ๊ฐ๋ค์ ์์์ ์ ๊ฑฐํ๊ฑฐ๋ ํ๋ ๋ฑ ์ปค์คํฐ๋ง์ด์งํด์ ์ถ๋ ฅ
(housing_df.describe()).select('summary',
F.round('medage', 4).alias('medage'),
F.round('totrooms', 4).alias('totrooms'),
F.round('totbdrms', 4).alias('totbdrms'),
F.round('pop', 4).alias('pop'),
F.round('houshlds', 4).alias('houshlds'),
F.round('medinc', 4).alias('medinc'),
F.round('medhv', 4).alias('medhv')).show(10)
3. Feature Engineering
๋ค์์ ๊ธฐ์กด ๋ณ์์์ ํ์๋ณ์๋ฅผ ์์ฑํ๋ ์์
์ธ๋ฐ, ์ด์ ์ ํ๋ ๊ฒ์ฒ๋ผ withColumn
๊ตฌ๋ฌธ์ ์ฌ์ฉํ์ฌ ์๋ก์ด ํ์๋ณ์๋ฅผ ์์ฑํด์ฃผ๋ฉด ๋๋ค.
# ํ์๋ณ์ ์์ฑํ๊ธฐ
housing_df = (housing_df.withColumn('rmsperhh',
F.round(col('totrooms')/col('houshlds'), 2))\
.withColumn('popperhh',
F.round(col('pop')/col('houshlds'), 2))\
.withColumn('bdrmsperrm',
F.round(col('totbdrms')/col('totrooms'), 2)))
housing_df.show(5)
์ด์ ํ์ํ ๋ณ์๋ค๋ง ๊ณจ๋ผ์ ๋ฐ์ดํฐํ๋ ์์ ์๋ก ํ ๋นํ ํ ์์นํ ๋ณ์๋ค์ Scaling์ ์ ์ฉ์์ผ๋ณด์. ํน์ดํ ์ ์ Scaling์ ์ ์ฉ์ํค๊ธฐ ์ด์ ์ ํ์ํ Feature๋ค์ Vector๋ก ๋ณํํ๋ ์์ ์ด ์ ํ๋์ด์ผ ํ๋ค๋ ๊ฒ์ด๋ค.
# ์ฌ์ฉํ์ง ์์ ๋ณ์ ์ ์ธํ๊ณ ํ์ํ ๋ณ์๋ค๋ง select
housing_df = housing_df.select('medhv',
'totbdrms',
'pop',
'houshlds',
'medinc',
'rmsperhh',
'popperhh',
'bdrmsperrm')
featureCols = ['totbdrms', 'pop', 'houshlds', 'medinc',
'rmsperhh', 'popperhh', 'bdrmsperrm']
# VectorAssembler๋ก feature vector๋ก ๋ณํ
assembler = VectorAssembler(inputCols=featureCols, outputCol='features')
assembled_df = assembler.transform(housing_df)
assembled_df.show(10, truncate=True)
์ด์ ์ ์ฌ์ง์ ๋นจ๊ฐ์ ๋ฐ์ค์์ ์๋ ์์นํ Feature๋ค์ Scaling ์์ผ๋ณด์.
# ์์์ ๋ง๋ Feature vector์ธ 'features' ๋ฃ๊ธฐ
standardScaler = StandardScaler(inputCol='features',
outputCol='features_scaled')
# fit, transform
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)
# scaling๋ ํผ์ฒ๋ค๋ง ์ถ์ถํด๋ณด๊ธฐ
scaled_df.select('features', 'features_scaled').show(10,
truncate=True)
Scaling ํ๋ API๋ PySpark์์ ์ ๊ณตํ๋๋ฐ ์ฌ๋ฌ๊ฐ์ง ์ค์ผ์ผ๋ง ๋ฐฉ๋ฒ์ด ์์ง๋ง ์ฌ๊ธฐ์๋ ํ์คํ์ธ StandardScaler
๋ฅผ ์ฌ์ฉํด๋ณด์. ๋ฉ์๋ ์ฌ์ฉ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค. StandardScaler(inputCol='Raw Features', outputCol='Scaled Features')
๋ก ์ค์ผ์ผ๋ง์ ์ ์ํด์ฃผ๊ณ fit(dataframe)
๊ณผ transform(dataframe)
์ผ๋ก ์ค์ผ์ผ๋ง์ ์ํํด์ฃผ์.
4. ๋ฐ์ดํฐ ๋ถํ ๊ณผ ํ๊ท ๋ชจ๋ธ๋ง
๋ฐ์ดํฐ๋ฅผ ํ์ต 8, ํ ์คํธ 2์ ๋น์จ๋ก ๋๋์ด์ฃผ๊ณ ํ๊ท ๋ชจ๋ธ์ ๋ง๋ค์ด ํ์ต์์ผ๋ณด์. ํ๊ท ๋ชจ๋ธ์ ์ฌ๋ฌ๊ฐ์ง๊ฐ ์์ง๋ง ์ฌ๊ธฐ์๋ ์ค๋ฒํผํ ์ ์๋ฐฉํ๊ธฐ ์ํ ์ ๊ทํ ํญ์ด ํฌํจ๋์ด ์๋ Elastic Net ํ๊ท ๋ชจ๋ธ์ ์ฌ์ฉํด๋ณด์.
# ๋ฐ์ดํฐ ๋ถํ
train_data, test_data = scaled_df.randomSplit([0.8, 0.2], seed=rnd_seed)
# Elastic Net ๋ชจ๋ธ ์ ์
lr = LinearRegression(featuresCol='features_scaled',
labelCol='medhv',
predictionCol='predmedhv',
maxIter=10,
regParam=0.3,
elasticNetParam=0.8,
standardization=False)
# ๋ชจ๋ธ ํ์ต
linearModel = lr.fit(train_data)
5. ํ ์คํธ ๋ฐ์ดํฐ์ ๋ํ ์์ธก
์ด์ ํ์ต์ ๋ชจ๋ ๋ง์ณค์ผ๋ ํ
์คํธ ๋ฐ์ดํฐ๋ก ์์ธก์ ํ๊ณ RMSE, MAE, R2 Score ๋ฉํธ๋ฆญ๋ค๋ก ์ฑ๋ฅ ํ๊ฐ๋ฅผ ํด๋ณด์. PySpark์์ ํ
์คํธ ๋ฐ์ดํฐ์ ๋ํ ํ๊ฐ๋ transform(test_dataframe)
์ผ๋ก ์ํํด์ค๋ค.
# transoform ์ฌ์ฉํ๋ฉด ๋ชจ๋ธ ์ ์ํ ๋ ์ค์ ํ "์์ธก ๊ฐ ๋ณ์"๋ฅผ ์๋ก ๋ง๋ค์ด ์์ฑํ ๋ฐ์ดํฐํ๋ ์ ๋ฐํ
predictions = linearModel.transform(test_data)
print(type(predictions)) # ์์ธก ๊ฒฐ๊ณผ๊ฐ์ด PySpark์ ๋ฐ์ดํฐํ๋ ์ ํํ๋ก ๋ฐํ๋จ!
๋ณ์ prediction
์ด ๋ฐ์ดํฐํ๋ ์ ํํ๋ก ๋ฐํ๋๋๋ฐ ์ด ๋ฐ์ดํฐํ๋ ์ ์ค์ ์์ธก๊ฐ์ธ predmedhv
์ ์ ๋ต์ธ medhv
์นผ๋ผ๋ง์ ์ถ์ถํด๋ณด์. ๊ฒฐ๊ณผํ๋ฉด์ ๋ณด๋ฉด ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ์์ธก ์ฑ๋ฅ์ด ๊ทธ๋ฆฌ ๋์๋ณด์ด์ง ์๋๋ค..
# predictions ๋ฐ์ดํฐํ๋ ์์์ y๊ฐ๊ณผ ์์ธก๊ฐ๋ง ์ถ์ถํด ๋น๊ต
pred_labels = predictions.select('predmedhv', 'medhv')
pred_labels.show()
๊ทธ๋ ๋ค๋ฉด ์์ธก๊ฐ๊ณผ ์ค์ ๊ฐ์ด ์ผ๋ง๋ ์ฐจ์ด๋๋์ง RMSE, MAE๋ฅผ ๊ณ์ฐํด๋ณด๊ณ ๋ง๋ค์ด์ง ํ๊ท์์ด ์ฃผ์ด์ง ๋ฐ์ดํฐ๋ฅผ ์ผ๋ง๋ ์ ์ค๋ช ํ๋์ง ๋ํ๋ด๋ R2 Score(๊ฒฐ์ ๊ณ์)๊ฐ์ ์ดํด๋ณด์.
print(f"RMSE:{linearModel.summary.rootMeanSquaredError}")
print(f"MAE: {linearModel.summary.meanAbsoluteError}")
print(f"R2 score: {linearModel.summary.r2}")
์ถ๊ฐ์ ์ผ๋ก ๋ฉํธ๋ฆญ์ ์ป์ ์ ์๋ ๋ฐฉ๋ฒ์ ์ ์ฝ๋ ๋ง๊ณ ๋ RegressionEvaluator
๋ก๋ ๊ณ์ฐํ ์ ์๋ค. ํด๋น ๋ฉ์๋์ ์ฌ์ฉ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค.
RegressionEvaluator(predictionCol, labelCol, metricName)
predictionCol
: ์์ธก๊ฐlabelCol
: ์ค์ ๊ฐmetricName
: ์ธก์ ํ ๋ฉํธ๋ฆญ ์ด๋ฆ(ex. RMSE, MAE ๋ฑ๋ฑ)
# RMSE
evaluator = RegressionEvaluator(predictionCol='predmedhv',
labelCol='medhv',
metricName='rmse')
# MAE
evaluator = RegressionEvaluator(predictionCol='predmedhv',
labelCol='medhv',
metricName='mae')
# R2 Score
evaluator = RegressionEvaluator(predictionCol='predmedhv',
labelCol='medhv',
metricName='r2')
print(f"RMSE: {evaluator.evaluate(pred_labels)}")
print(f"MAE: {evaluator.evaluate(pred_labels)}")
print(f"R2 score: {evaluator.evaluate(pred_labels)}")
๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ RegressionMetrics
๋ฉ์๋์ธ๋ฐ ์ด๋ ํน์ดํ๊ฒ ์์ธก๊ฐ์ RDD ์๋ฃ๊ตฌ์กฐ๋ก ๋ณํํ๊ณ ๋ฃ์ด์ฃผ์ด์ผ ํ๋ค.
# ์์ธก๊ฐ์ธ pred_labels๋ฅผ RDD ์๋ฃ๊ตฌ์กฐ๋ก ๋ณํํ๊ณ ๋ฃ์ด์ผ ํจ
metrics = RegressionMetrics(pred_labels.rdd)
print("RMSE:", metrics.rootMeanSquaredError)
print("MAE:", metrics.meanAbsoluteError)
print("R2 score:", metrics.r2)
'Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ ๊ตฌ์กฐ์ Sources (0) | 2021.04.23 |
---|---|
[PySpark] ์ปจํ ์ธ ๊ธฐ๋ฐ ์ํ ์ถ์ฒ ์์คํ ๋ง๋ค์ด๋ณด๊ธฐ (18) | 2021.02.15 |
[PySpark] ํ์ดํ๋ ๋ฐ์ดํฐ๋ก ๋ถ๋ฅ ๋ชจ๋ธ ๋ง๋ค๊ธฐ (2) | 2021.02.03 |
[PySpark] Spark SQL ํํ ๋ฆฌ์ผ (0) | 2021.02.01 |
[PySpark] Apache Spark ์ RDD ์๋ฃ๊ตฌ์กฐ (0) | 2021.01.30 |