๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] Spark๋กœ ๊ต์ฐจ๊ฒ€์ฆ, ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹์„ ํ•ด๋ณด์ž!

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ํ•ด๋‹น ํฌ์ŠคํŒ…์€ ์ŠคํŒŒํฌ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ์ฑ…๊ณผ ์ธํ”„๋Ÿฐ์˜ ์ŠคํŒŒํฌ ๋จธ์‹ ๋Ÿฌ๋‹ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ๊ฐ•์˜๋กœ ๊ณต๋ถ€ํ•œ ํ›„ ๋ฐฐ์šด ๋‚ด์šฉ์„ ์ €๋งŒ์˜ ๋ฐฉ์‹์œผ๋กœ ์žฌ๊ตฌ์„ฑํ•œ ๊ฒƒ์ž„์„ ์•Œ๋ฆฝ๋‹ˆ๋‹ค. ํŠนํžˆ, ์ฐธ๊ณ ํ•œ ์ธํ”„๋Ÿฐ ๊ฐ•์˜์˜ ๊ฐ•์˜ ์ž๋ฃŒ๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•˜์Œ์„ ํ•„ํžˆ ์•Œ๋ฆฝ๋‹ˆ๋‹ค!

Apache Spark๋ฅผ Python์œผ๋กœ ์ด์šฉํ•˜๋Š” PySpark์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž


์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” ์ŠคํŒŒํฌ๋ฅผ ํ™œ์šฉํ•ด์„œ ๋ชจ๋ธ์˜ ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜๊ณ  ๋ชจ๋ธ์˜ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ํŠœ๋‹์„ ํ•จ๊ป˜ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž. ๊ทธ๋ฆฌ๊ณ  ๋” ๋‚˜์•„๊ฐ€ ๊ต์ฐจ๊ฒ€์ฆ๊ณผ ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ ๊ณผ์ •์„ ์ €๋ฒˆ ํฌ์ŠคํŒ…์—์„œ ๋ฐฐ์šด Pipeline์„ ํ™œ์šฉํ•ด ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•๋„ ์•Œ์•„๋ณด๋„๋ก ํ•˜์ž.

1. ๊ต์ฐจ๊ฒ€์ฆ๊ณผ ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹์„ ๋™์‹œ์—, CrossValidator

์ŠคํŒŒํฌ์˜ CrossValidator ํด๋ž˜์Šค๋Š” ๊ต์ฐจ๊ฒ€์ฆ๊ณผ ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹์„ ๋™์‹œ์— ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค. ์—ฌ๊ธฐ์„œ '๊ต์ฐจ๊ฒ€์ฆ'์ด๋ž€ K-fold ๊ต์ฐจ๊ฒ€์ฆ์„ ์˜๋ฏธํ•œ๋‹ค. K-fold ๊ต์ฐจ๊ฒ€์ฆ์— ๋Œ€ํ•ด ๋ชจ๋ฅธ๋‹ค๋ฉด ์˜ˆ์ „ ํฌ์ŠคํŒ…์„ ์ฐธ๊ณ ํ•˜๋„๋ก ํ•˜์ž.

 

๊ทธ๋ฆฌ๊ณ  ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹์€ ํŒŒ๋ผ๋ฏธํ„ฐ ์กฐํ•ฉ์˜ '๋ชจ๋“  ๊ฒฝ์šฐ์˜ ์ˆ˜'๋ฅผ ํƒ์ƒ‰ํ•˜๋Š” ๊ทธ๋ฆฌ๋“œ ์„œ์น˜(Grid Search) ๋ฐฉ๋ฒ•์„ ํ™œ์šฉํ•œ๋‹ค. ์ฐธ๊ณ ๋กœ ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•  ๋•Œ, (๋ถ„๋ฅ˜ ๋ฌธ์ œ์˜ ๊ฒฝ์šฐ)ํด๋ž˜์Šค์˜ ๋ถˆ๊ท ํ˜•์„ ๊ณ ๋ คํ•œ Stratified K-fold ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜์ง€๋Š” ์•Š๋Š”๋‹ค. ์ฆ‰, ์ŠคํŒŒํฌ์˜ CrossValidator ๋Š” ํด๋ž˜์Šค ๊ฐœ์ˆ˜ ๋ถ„ํฌ๋ฅผ ๊ณ ๋ คํ•˜์ง€ ์•Š๊ณ  ๊ทธ๋ƒฅ ๋ฌด์ž‘์œ„๋กœ ํ•™์Šต/๊ฒ€์ฆ ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„ํ• ํ•œ๋‹ค๋Š” ํ•œ๊ณ„์ ์€ ์กด์žฌํ•œ๋‹ค.

 

CrossValidator ํด๋ž˜์Šค๋Š” ์ธ์ž๋กœ ํฌ๊ฒŒ ์‚ฌ์šฉํ•  ๋ชจ๋ธ ๊ฐ์ฒด(Estimator), ํŠœ๋‹ํ•  ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฒ”์œ„(์ด๋ฅผ Parameter Grid ๋ผ๊ณ  ๋ณดํ†ต ์ผ์ปซ๋Š”๋‹ค), ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜๋ฉด์„œ ๋ชจ๋ธ์˜ ์„ฑ๋Šฅ์„ ํ‰๊ฐ€ํ•  Evaluator, ๊ต์ฐจ๊ฒ€์ฆํ•  ๋•Œ ์‚ฌ์šฉํ•  Fold ๊ฐœ์ˆ˜ ์ด 4๊ฐœ๋ฅผ ๋„ฃ์–ด์ค€๋‹ค. ๊ทธ๋ž˜์„œ CrossValidator ๋ฅผ ์‹ค์งˆ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ๋ฐฉ๊ธˆ ์–ธ๊ธ‰ํ•œ 4๊ฐœ์˜ ์ธ์ž์— ๋„ฃ์–ด์ค„ ๊ฐ์ฒด ๋˜๋Š” ๊ฐ’์„ ์„ ์–ธํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค. ์ด์ œ ์ฝ”๋“œ๋ฅผ ํ†ตํ•ด ์ฐจ๋ก€์ฐจ๋ก€ ์‚ดํŽด๋ณด๋„๋ก ํ•˜์ž.

 

์‚ฌ์šฉํ•˜๋Š” ๋ฐ์ดํ„ฐ๋Š” ๋ถ“๊ฝƒ(iris) ๋ฐ์ดํ„ฐ์ด๋‹ค. ๋จผ์ € Scikit-learn์„ ํ™œ์šฉํ•ด pandas.dataframe ํ˜•ํƒœ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•˜๊ณ  ์ด๋ฅผ spark.dataframe ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜์‹œํ‚ค์ž.

 

from sklearn.datasets import load_iris
import pandas as pd
import numpy as np

iris = load_iris()
iris_data = iris.data
iris_label = iris.target

iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
iris_pdf = pd.DataFrame(iris_data, columns=iris_columns)
iris_pdf['label'] = iris_label

iris_sdf = spark.createDataFrame(iris_pdf)

 

๋‹ค์Œ์€ ๋กœ๋“œํ•œ ๋ถ“๊ฝƒ ๋ฐ์ดํ„ฐ๋ฅผ Train, Test ์šฉ์œผ๋กœ ๋‚˜๋ˆ„๊ณ  ๋ณ€์ˆ˜๋“ค์€ Feature Vectorization์„ ์‹œํ‚ค์ž. ๊ทธ๋ฆฌ๊ณ  ์‚ฌ์šฉํ•  ๋ชจ๋ธ์ธ Decision Tree๋ฅผ ์ •์˜ํ•ด๋ณด์ž.

 

# Feature Vectorization ๋ณ€ํ™˜๊ณผ ๋ชจ๋ธ estimator ๊ฐ์ฒด ์ƒ์„ฑ
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

feature_cols = iris_sdf.columns[:-1]

# ๋ฐ์ดํ„ฐ ๋ถ„ํ• 
train, test = iris_sdf.randomSplit(weights=[0.7, 0.3], seed=42)
# Feature Vectorization
vec_assembler = VectorAssembler(inputCols=feature_cols, 
                                outputCol='feature')
train_vec = vec_assembler.transform(train)
test_vec = vec_assembler.transform(test)

# ๋ชจ๋ธ ์ •์˜
dt_clf = DecisionTreeClassifier(featuresCol='feature', labelCol='label')

 

์ด์ œ ๋‹ค์Œ์œผ๋กœ๋Š” ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ ํ•ต์‹ฌ์ธ ํŠœ๋‹ํ•  ํŒŒ๋ผ๋ฏธํ„ฐ์˜ ๋ฒ”์œ„๋ฅผ ์ •์˜ํ•ด์ฃผ๋„๋ก ํ•˜์ž. ์ด ๋•Œ, ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฒ”์œ„๋ฅผ ์ง€์ •ํ•  ๋•Œ๋Š” ๋ฐฉ๊ธˆ ์œ„์—์„œ ์ •์˜ํ•œ Decision Tree ๋ชจ๋ธ ๊ฐ์ฒด์˜ ์†์„ฑ๊ฐ’์„ ์ด์–ด๋ฐ›๋„๋ก ํ•œ๋‹ค.

 

from pyspark.ml.tuning import ParamGridBuilder

# ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ํ•  ๋ฒ”์œ„๋ฅผ ์ง€์ •ํ•˜๋Š” param grid
param_grid = ParamGridBuilder().addGrid(dt_clf.maxDepth, [5, 20]) \
                               .addGrid(dt_clf.minInstancesPerNode, [2, 50])\
                               .build()

 

์œ„ ์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด ์•ฝ๊ฐ„ ๋ฌธ๋ฒ• ํ˜•ํƒœ๊ฐ€ ์ด์ƒํ•˜๋‹ค๊ณ  ๋Š๋‚„ ์ˆ˜๋„ ์žˆ๋‹ค. Functional API ํ˜•ํƒœ๋กœ ์ž‘์„ฑ๋˜์—ˆ๋Š”๋ฐ, Tensorflow 2.x ๋ฒ„์ „ ๋ฌธ๋ฒ•์— ์ต์ˆ™ํ•˜์‹  ๋ถ„๋“ค์€ ์•„๋งˆ ์†์‰ฝ๊ฒŒ ์ดํ•ดํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ์ด๋‹ค. ์œ„์™€ ๊ฐ™์€ ๋ฐฉ์‹์„ ํ™œ์šฉํ•ด์„œ ์‚ฌ์šฉํ•  ๋ชจ๋ธ์˜ ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฒ”์œ„๋ฅผ ์ง€์ •ํ•ด์ฃผ์ž.

 

๋‹ค์Œ์œผ๋กœ๋Š” ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•  ๋•Œ ๋ชจ๋ธ์˜ ์„ฑ๋Šฅ์„ ํ‰๊ฐ€ํ•  Evaluator๋ฅผ ์ •์˜ํ•ด์ฃผ๊ณ  ์‹ค์งˆ์ ์ธ '๊ต์ฐจ๊ฒ€์ฆ'์„ ์ˆ˜ํ–‰ํ•  ํด๋ž˜์Šค์ธ CrossValidator ํด๋ž˜์Šค๋ฅผ ์ •์˜ํ•˜์ž.

 

from pyspark.ml.tuning import CrossValidator

# ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ํ•˜๋ฉด์„œ ์ธก์ •ํ•  ๋ฉ”ํŠธ๋ฆญ evaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')
# Cross Validator ๊ฐ์ฒด ์ƒ์„ฑ -> [๋ชจ๋ธ, Param grid, evaluator, ํด๋“œ ์ˆ˜] ์ธ์ž๋กœ ๋„ฃ๊ธฐ
cv = CrossValidator(estimator=dt_clf,
                    estimatorParamMaps=param_grid,
                   evaluator=evaluator,
                   numFolds=3)

โ€‹

์œ„์—์„œ CrossValidator ํด๋ž˜์Šค๋ฅผ ํ™œ์šฉํ•ด ์ •์˜ํ•œ ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•ด์„œ ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ํ•™์Šต, ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹, ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰์‹œ์ผœ๋ณด์ž. fit() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

cv_model = cv.fit(train_vec)

โ€‹

์ฐธ๊ณ ๋กœ ์œ„์˜ fit() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•˜๊ฒŒ ๋˜๋ฉด ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹์„ ์ˆ˜ํ–‰ํ•œ ํ›„, ๊ฐ€์žฅ ์ตœ๊ณ ์˜ ์„ฑ๋Šฅ์— ๋„๋‹ฌํ•œ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋งˆ์ง€๋ง‰์— ์žฌํ•™์Šต(refit)ํ•œ ํ›„์˜ ๋ชจ๋ธ ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ฒŒ ๋œ๋‹ค. ๊ทธ๋ž˜์„œ ์ด ๋ชจ๋ธ ๊ฐ์ฒด(์œ„ ์ฝ”๋“œ ์ƒ์œผ๋กœ๋Š” cv_model ์ด๋ผ๋Š” ๋ณ€์ˆ˜)์˜ transform() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•ด์„œ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ์ตœ์ข… ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค.

 

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด์„œ cv_model ๊ฐ์ฒด์˜ transform ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉ
prediction = cv_model.transform(test_vec)  # spark.dataframe

# ํ…Œ์ŠคํŠธ์šฉ evaluator
test_evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                                  predictionCol='prediction',
                                                  metricName='accuracy')
test_acc = test_evaluator.evaluate(prediction)
print('Test_acc:', round(test_acc, 3))

2. ๊ฐ€์žฅ ๋งˆ์ง€๋ง‰์˜ ๋ฐ์ดํ„ฐ๋งŒ ๊ฒ€์ฆ ๋ฐ์ดํ„ฐ๋กœ ์‚ฌ์šฉํ•˜์ž, TrainValidationSplit

๋ชจ๋ธ์˜ ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๋˜ ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์œผ๋กœ TrainValidationSplit ํด๋ž˜์Šค๊ฐ€ ์กด์žฌํ•œ๋‹ค. TrainValidationSplit๋Š” CrossValidator ์™€๋Š” ๋‹ฌ๋ฆฌ ์ „์ฒด ๋ฐ์ดํ„ฐ ์ค‘ ํŠน์ • ๋น„์œจ์˜ ๊ฐ€์žฅ ๋งˆ์ง€๋ง‰ ์ˆœ์„œ์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋งŒ ๊ฒ€์ฆ ๋ฐ์ดํ„ฐ๋กœ ํ™œ์šฉํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ๋‘ ๊ฐ€์ง€์˜ ์ฐจ์ด์ ์„ ์•Œ์•„๋ณด๊ธฐ ์œ„ํ•œ ๋„์‹ํ™” ๊ทธ๋ฆผ์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

CrossValidator ์™€ TrainValidationSplit ๊ฒ€์ฆ ์ˆ˜ํ–‰ ๋ฐฉ๋ฒ•์˜ ์ฐจ์ด

 

์œ„ ๊ทธ๋ฆผ์„ ๋ณด๋ฉด ์ž์—ฐ์Šค๋Ÿฝ๊ฒŒ ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ํšŸ์ˆ˜์—์„œ๋„ ์ฐจ์ด๊ฐ€ ๋‚˜๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜๊ฐ€ ์žˆ๋‹ค. ์™ผ์ชฝ์€ 5๋ฒˆ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐ˜๋ฉด, ์˜ค๋ฅธ์ชฝ์€ 1๋ฒˆ ๋ฐ–์— ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๋Š”๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด ๊ตณ์ด ์˜ค๋ฅธ์ชฝ์˜ ๋ฐฉ๋ฒ•์„ ์ด์šฉํ•  ํ•„์š”๊ฐ€ ์žˆ์„๊นŒ? ์˜ค๋ฅธ์ชฝ ๋ฐฉ๋ฒ•์„ ์ด์šฉํ•˜๋Š” ๊ฒฝ์šฐ๋ฅผ ๋“ค์ž๋ฉด, ํŠœ๋‹ํ•  ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฒ”์œ„๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์„ ๋•Œ ์ด๋‹ค. ๋งŒ์•ฝ K-fold ๊ต์ฐจ๊ฒ€์ฆ ๋ฐฉ๋ฒ•์„ ์ด์šฉํ•œ๋‹ค๋ฉด ๊ทธ๋ฆฌ๋“œ ์„œ์น˜๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐ ๋งค์šฐ ๋งŽ์€ ์‹œ๊ฐ„์ด ๊ฑธ๋ฆด ๊ฒƒ์ด๋‹ค. ๊ทธ๋ž˜์„œ ์ด์™€ ๊ฐ™์ด ํŠœ๋‹ํ•  ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฒ”์œ„๊ฐ€ ๊ทน๋„๋กœ ๋งŽ์„ ๋•Œ๋Š” ๋‹จ์ˆœํžˆ 1๋ฒˆ๋งŒ ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ์˜ค๋ฅธ์ชฝ์˜ ๋ฐฉ๋ฒ•์„ ์ด์šฉํ•˜๋Š” ๊ฒƒ์ด ์‹œ๊ฐ„ํšจ์œจ์ ์ธ ์ธก๋ฉด์—์„œ๋Š” ์œ ๋ฆฌํ•  ์ˆ˜๋„ ์žˆ๊ฒ ๋‹ค. ๋ฌผ๋ก  ์™ผ์ชฝ ๋ฐฉ๋ฒ•์„ ์ด์šฉํ–ˆ์„ ๋•Œ๋ณด๋‹ค ๋ชจ๋ธ ์„ฑ๋Šฅ์˜ ๊ฐ๊ด€์„ฑ์€ ๋‹น์—ฐํžˆ ๋‚ฎ์„ ์ˆ˜ ๋ฐ–์— ์—†๋‹ค๋Š” ๊ฒƒ์€ ๊ฐ์•ˆํ•ด์•ผ ํ•œ๋‹ค.

 

์ด์ œ ์ฝ”๋“œ๋กœ ์•Œ์•„๋ณด์ž. ๋‹จ์ง€ ์ˆ˜ํ–‰ ๋ฐฉ๋ฒ• ์ฆ‰, ์‚ฌ์šฉํ•˜๋Š” ํด๋ž˜์Šค ์ด๋ฆ„๋งŒ ๋‹ค๋ฅผ ๋ฟ, ์ŠคํŒŒํฌ์˜ ๋ฌธ๋ฒ•์  ์ฐจ์ด๋Š” ๊ฑฐ์˜ ๋™์ผํ•˜๋‹ค. ์•„๋ž˜ ์ฝ”๋“œ๋ฅผ ์ฒœ์ฒœํžˆ ์‚ดํŽด๋ณด๋„๋ก ํ•˜์ž.

 

# TrainValidation Split์„ ํ™œ์šฉํ•ด์„œ ์ˆ˜ํ–‰ํ•˜๊ธฐ
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

vec_assembler = VectorAssembler(inputCols=train.columns[:-1],
                           outputCol='feature')
train_vec = vec_assembler.transform(train)
test_vec = vec_assembler.transform(test)

dt_clf = DecisionTreeClassifier(featuresCol='feature', labelCol='label')

param_grid = ParamGridBuilder().addGrid(dt_clf.maxDepth, [5, 10]) \
                               .addGrid(dt_clf.minInstancesPerNode, [7, 20]) \
                               .build()
evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')
tvs = TrainValidationSplit(estimator=dt_clf, estimatorParamMaps=param_grid,
                           evaluator=evaluator, trainRatio=0.75, seed=49)

# ํ•™์Šต ๋ฐ์ดํ„ฐ
tvs_model = tvs.fit(train_vec)

test_pred = tvs_model.transform(test_vec)
test_eval = MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')
test_acc = test_eval.evaluate(test_pred)
print('Test Accuracy:', round(test_acc, 3))

3. Pipeline์„ ํ™œ์šฉํ•˜๊ธฐ

์ง€๊ธˆ๊นŒ์ง€ ๋ฐฐ์šด ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹, ๊ต์ฐจ๊ฒ€์ฆ ์ˆ˜ํ–‰ ๊ณผ์ •์„ ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ํ™œ์šฉํ•ด์„œ ML flow ๊ณผ์ •์„ ๊ฐ„๋‹จํ™”ํ•˜์—ฌ ์ฝ”๋“œ์˜ ๋งŒ์—ฐ์„ฑ์„ ์ค„์ผ ์ˆ˜๋„ ์žˆ๋‹ค. Pipeline ํด๋ž˜์Šค๋ฅผ ํ™œ์šฉํ•ด ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹, ๊ต์ฐจ๊ฒ€์ฆ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ณผ์ •์€ ํฌ๊ฒŒ 2๊ฐ€์ง€๋กœ ์‚ฌ์šฉํ•ด ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

3-1. CrossValidator ์•ˆ์—์„œ Pipeline์„ ํ™œ์šฉํ•˜๊ธฐ

์ •์˜ํ•œ Pipeline์„ CrossValidator์˜ ๋ชจ๋ธ ๊ฐ์ฒด์— ๋„ฃ์–ด์ฃผ์–ด ํ™œ์šฉํ•  ์ˆ˜๊ฐ€ ์žˆ๋‹ค. ์ด์— ๋Œ€ํ•ด์„œ๋Š” ์‹ค์งˆ์ ์ธ ์ฝ”๋“œ๋ฅผ ๋ด์•ผ ์ดํ•ด๊ฐ€ ์ˆ˜์›”ํ•  ๊ฒƒ์ด๋‹ค. ๋จผ์ € ํŒŒ์ดํ”„๋ผ์ธ์„ ์ •์˜ํ•ด์•ผ ํ•˜๋Š”๋ฐ, ์—ฌ๊ธฐ์„œ๋Š” Feature Vectorization ํ•˜๋Š” ๊ณผ์ •๊ณผ ๋ชจ๋ธ์˜ ์ •์˜ํ•˜๋Š” ๋ถ€๋ถ„์„ ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ๋งŒ๋“ค์—ˆ๋‹ค.

 

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

#====================================
# 1.CrossValidator ์•ˆ์—์„œ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ ์šฉ
#====================================
train, test = iris_sdf.randomSplit(weights=[0.7, 0.3], seed=43)
vec_stage = VectorAssembler(inputCols=train.columns[:-1],
                            outputCol='feature')
dt_stage = DecisionTreeClassifier(featuresCol='feature',
                                 labelCol='label')

# [Feature Vectorization -> ๋ชจ๋ธ]์„ ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ์ •์˜
pipeline_01 = Pipeline(stages=[vec_stage, dt_stage])

 

์ด์ œ ํŠœ๋‹ํ•  ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฒ”์œ„์™€ ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜๋ฉด์„œ ๋ชจ๋ธ์˜ ์„ฑ๋Šฅ์„ ํ‰๊ฐ€ํ•  evaluator๋ฅผ ์ •์˜ํ•ด๋ณด์ž.

 

from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ grid
param_grid = ParamGridBuilder().addGrid(dt_stage.maxDepth, [8, 20]) \
                               .addGrid(dt_stage.minInstancesPerNode, [5, 10]) \
                               .build()
# ๋ฉ”ํŠธ๋ฆญ evaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')

 

๋‹ค์Œ์œผ๋กœ๋Š” CrossValidator ํด๋ž˜์Šค๋ฅผ ํ™œ์šฉํ•˜๋Š”๋ฐ, ์ด ๋•Œ ์ธ์ž์— ์œ„์—์„œ ์ •์˜ํ•œ ํŒŒ์ดํ”„๋ผ์ธ ๊ฐ์ฒด๋ฅผ ๋„ฃ์–ด์ฃผ๋„๋ก ํ•˜์ž. ๋‚˜๋จธ์ง€ ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฒ”์œ„, evaluator ๋ฅผ ๋„ฃ์–ด์ฃผ๋Š” ๋ถ€๋ถ„์„ ๋™์ผํ•˜๋‹ค. ๊ทธ๋ฆฌ๊ณ  fit() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•ด ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ์ˆ˜ํ–‰ํ•˜๊ณ  ๋ฐ˜ํ™˜๋œ ๋ชจ๋ธ ๊ฐ์ฒด์˜ transform() ๋ฉ”์†Œ๋“œ๋กœ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ์ตœ์ข… ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ์ด์— ๋Œ€ํ•œ ์ฝ”๋“œ๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

from pyspark.ml.tuning import CrossValidator

# Cross Validator์•ˆ์— ํŒŒ์ดํ”„๋ผ์ธ ์ •์˜
cv = CrossValidator(estimator=pipeline_01, estimatorParamMaps=param_grid,
                   evaluator=evaluator, numFolds=3)

# ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ ๋ฐ ๊ต์ฐจ๊ฒ€์ฆ
cv_model = cv.fit(train)

# ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ์ตœ์ ์˜ ํŒŒ๋ผ๋ฏธํ„ฐ ๋ชจ๋ธ๋กœ ์˜ˆ์ธก ์ˆ˜ํ–‰
test_pred = cv_model.transform(test)
test_eval = MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')
test_acc = test_eval.evaluate(test_pred)
print('Test Accuracy:', round(test_acc, 3))

3-2. Pipeline ์•ˆ์—์„œ CrossValidator๋ฅผ ์ •์˜ํ•˜๊ธฐ

ํŒŒ์ดํ”„๋ผ์ธ์„ ํ™œ์šฉํ•ด์„œ ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ ๋ฐ ๊ต์ฐจ๊ฒ€์ฆ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๋˜ ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์œผ๋กœ๋Š” [3-1.๋ชฉ์ฐจ] ์™€๋Š” ์ˆœ์„œ๋ฅผ ๋ฐ˜๋Œ€๋กœ ํ•œ ๋ฐฉ๋ฒ•์ด๋‹ค. ์ฆ‰, ์ด๋ฒˆ์—๋Š” Pipeline์„ ์„ ์–ธํ•  ๋•Œ ์„ค์ •ํ•˜๋Š” stage ์ค‘ ํ•˜๋‚˜๋กœ CrossValidator๋ฅผ ์ •์˜ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ์ด ๋ชฉ์ฐจ์— ๋Œ€ํ•œ ์ฝ”๋“œ๋Š” ํ•˜๋‚˜์”ฉ ์„ค๋ช…ํ•˜์ง€๋Š” ์•Š๊ฒ ๋‹ค. ๋‹จ์ˆœํžˆ [3-1. ๋ชฉ์ฐจ]์˜ ML Flow ์ˆœ์„œ๋ฅผ ๋ฐ”๊พธ์–ด ๋†“์€ ๊ฒƒ์ด๊ธฐ ๋•Œ๋ฌธ์— ์•„๋ž˜ ์ฝ”๋“œ์˜ ์ฃผ์„์„ ์ฝ์œผ๋ฉด์„œ ์ดํ•ดํ•ด๋ณด๋ฉด ๊ทธ๋ฆฌ ์–ด๋ ต์ง€ ์•Š์„ ๊ฒƒ์ด๋‹ค.

 

from pyspark.ml import Pipeline

#====================================
# 2.ํŒŒ์ดํ”„๋ผ์ธ ์•ˆ์—์„œ CrossValidator๋ฅผ ์ ์šฉ
#====================================
train, test = iris_sdf.randomSplit(weights=[0.7, 0.3], seed=43)

# 1๋ฒˆ์งธ) stage: Feaure Vectorization
vec_stage = VectorAssembler(inputCols=train.columns[:-1],
                           outputCol='feature')
# 2๋ฒˆ์งธ) stage: [๋ชจ๋ธ - param_grid - cv ๊ฐ์ฒด]
dt_clf = DecisionTreeClassifier(featuresCol='feature',
                               labelCol='label')
param_grid = ParamGridBuilder().addGrid(dt_clf.maxDepth, [5, 15]) \
                               .addGrid(dt_clf.minInstancesPerNode, [3, 10]) \
                               .build()
evaluator = MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')
cv_stage = CrossValidator(estimator=dt_clf,
                         estimatorParamMaps=param_grid,
                         evaluator=evaluator)
# ํŒŒ์ดํ”„๋ผ์ธ ์™„์„ฑ
pipeline_02 = Pipeline(stages=[vec_stage, cv_stage])

# ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ ๋ฐ ๊ต์ฐจ๊ฒ€์ฆ
pipeline_model = pipeline_02.fit(train)

avg_metrics = pipeline_model.stages[-1].avgMetrics
print('Train Accuracy :',
      round(sum(avg_metrics)/len(avg_metrics), 3)
     )
print()

# ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ์˜ˆ์ธก ์ˆ˜ํ–‰
test_pred = pipeline_model.transform(test)
test_eval = MulticlassClassificationEvaluator(labelCol='label',
                                             predictionCol='prediction',
                                             metricName='accuracy')
test_acc = test_eval.evaluate(test_pred)
print('Test Accuracy:', round(test_acc, 3))
๋ฐ˜์‘ํ˜•