๐ ํด๋น ํฌ์คํ ์ ์คํํฌ ์๋ฒฝ ๊ฐ์ด๋ ์ฑ ๊ณผ ์ธํ๋ฐ์ ์คํํฌ ๋จธ์ ๋ฌ๋ ์๋ฒฝ ๊ฐ์ด๋ ๊ฐ์๋ก ๊ณต๋ถํ ํ ๋ฐฐ์ด ๋ด์ฉ์ ์ ๋ง์ ๋ฐฉ์์ผ๋ก ์ฌ๊ตฌ์ฑํ ๊ฒ์์ ์๋ฆฝ๋๋ค. ํนํ, ์ฐธ๊ณ ํ ์ธํ๋ฐ ๊ฐ์์ ๊ฐ์ ์๋ฃ๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉํ์ง ์์์์ ํํ ์๋ฆฝ๋๋ค!
์ด๋ฒ ํฌ์คํ
์์๋ ์คํํฌ๋ฅผ ํ์ฉํด์ ๋ชจ๋ธ์ ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ๊ณ ๋ชจ๋ธ์ ํ์ดํผํ๋ผ๋ฏธํฐ๋ฅผ ํ๋์ ํจ๊ป ์ํํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ณด์. ๊ทธ๋ฆฌ๊ณ ๋ ๋์๊ฐ ๊ต์ฐจ๊ฒ์ฆ๊ณผ ํ๋ผ๋ฏธํฐ ํ๋ ๊ณผ์ ์ ์ ๋ฒ ํฌ์คํ
์์ ๋ฐฐ์ด Pipeline
์ ํ์ฉํด ์ํํ๋ ๋ฐฉ๋ฒ๋ ์์๋ณด๋๋ก ํ์.
1. ๊ต์ฐจ๊ฒ์ฆ๊ณผ ํ๋ผ๋ฏธํฐ ํ๋์ ๋์์, CrossValidator
์คํํฌ์ CrossValidator ํด๋์ค๋ ๊ต์ฐจ๊ฒ์ฆ๊ณผ ํ๋ผ๋ฏธํฐ ํ๋์ ๋์์ ์ํํ ์ ์๋๋ก ํ๋ค. ์ฌ๊ธฐ์ '๊ต์ฐจ๊ฒ์ฆ'์ด๋ K-fold ๊ต์ฐจ๊ฒ์ฆ์ ์๋ฏธํ๋ค. K-fold ๊ต์ฐจ๊ฒ์ฆ์ ๋ํด ๋ชจ๋ฅธ๋ค๋ฉด ์์ ํฌ์คํ ์ ์ฐธ๊ณ ํ๋๋ก ํ์.
๊ทธ๋ฆฌ๊ณ ํ๋ผ๋ฏธํฐ ํ๋์ ํ๋ผ๋ฏธํฐ ์กฐํฉ์ '๋ชจ๋ ๊ฒฝ์ฐ์ ์'๋ฅผ ํ์ํ๋ ๊ทธ๋ฆฌ๋ ์์น(Grid Search) ๋ฐฉ๋ฒ์ ํ์ฉํ๋ค. ์ฐธ๊ณ ๋ก ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ ๋, (๋ถ๋ฅ ๋ฌธ์ ์ ๊ฒฝ์ฐ)ํด๋์ค์ ๋ถ๊ท ํ์ ๊ณ ๋ คํ Stratified K-fold ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ์ง๋ ์๋๋ค. ์ฆ, ์คํํฌ์ CrossValidator
๋ ํด๋์ค ๊ฐ์ ๋ถํฌ๋ฅผ ๊ณ ๋ คํ์ง ์๊ณ ๊ทธ๋ฅ ๋ฌด์์๋ก ํ์ต/๊ฒ์ฆ ๋ฐ์ดํฐ๋ฅผ ๋ถํ ํ๋ค๋ ํ๊ณ์ ์ ์กด์ฌํ๋ค.
CrossValidator
ํด๋์ค๋ ์ธ์๋ก ํฌ๊ฒ ์ฌ์ฉํ ๋ชจ๋ธ ๊ฐ์ฒด(Estimator), ํ๋ํ ํ๋ผ๋ฏธํฐ ๋ฒ์(์ด๋ฅผ Parameter Grid ๋ผ๊ณ ๋ณดํต ์ผ์ปซ๋๋ค), ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ๋ฉด์ ๋ชจ๋ธ์ ์ฑ๋ฅ์ ํ๊ฐํ Evaluator, ๊ต์ฐจ๊ฒ์ฆํ ๋ ์ฌ์ฉํ Fold ๊ฐ์ ์ด 4๊ฐ๋ฅผ ๋ฃ์ด์ค๋ค. ๊ทธ๋์ CrossValidator
๋ฅผ ์ค์ง์ ์ผ๋ก ์ฌ์ฉํ๊ธฐ ์ํด์๋ ๋ฐฉ๊ธ ์ธ๊ธํ 4๊ฐ์ ์ธ์์ ๋ฃ์ด์ค ๊ฐ์ฒด ๋๋ ๊ฐ์ ์ ์ธํด์ฃผ์ด์ผ ํ๋ค. ์ด์ ์ฝ๋๋ฅผ ํตํด ์ฐจ๋ก์ฐจ๋ก ์ดํด๋ณด๋๋ก ํ์.
์ฌ์ฉํ๋ ๋ฐ์ดํฐ๋ ๋ถ๊ฝ(iris) ๋ฐ์ดํฐ์ด๋ค. ๋จผ์ Scikit-learn์ ํ์ฉํด pandas.dataframe
ํํ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๊ณ ์ด๋ฅผ spark.dataframe
๊ฐ์ฒด๋ก ๋ณํ์ํค์.
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
iris = load_iris()
iris_data = iris.data
iris_label = iris.target
iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
iris_pdf = pd.DataFrame(iris_data, columns=iris_columns)
iris_pdf['label'] = iris_label
iris_sdf = spark.createDataFrame(iris_pdf)
๋ค์์ ๋ก๋ํ ๋ถ๊ฝ ๋ฐ์ดํฐ๋ฅผ Train, Test ์ฉ์ผ๋ก ๋๋๊ณ ๋ณ์๋ค์ Feature Vectorization์ ์ํค์. ๊ทธ๋ฆฌ๊ณ ์ฌ์ฉํ ๋ชจ๋ธ์ธ Decision Tree๋ฅผ ์ ์ํด๋ณด์.
# Feature Vectorization ๋ณํ๊ณผ ๋ชจ๋ธ estimator ๊ฐ์ฒด ์์ฑ
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
feature_cols = iris_sdf.columns[:-1]
# ๋ฐ์ดํฐ ๋ถํ
train, test = iris_sdf.randomSplit(weights=[0.7, 0.3], seed=42)
# Feature Vectorization
vec_assembler = VectorAssembler(inputCols=feature_cols,
outputCol='feature')
train_vec = vec_assembler.transform(train)
test_vec = vec_assembler.transform(test)
# ๋ชจ๋ธ ์ ์
dt_clf = DecisionTreeClassifier(featuresCol='feature', labelCol='label')
์ด์ ๋ค์์ผ๋ก๋ ์ด๋ฒ ํฌ์คํ ์์ ํต์ฌ์ธ ํ๋ํ ํ๋ผ๋ฏธํฐ์ ๋ฒ์๋ฅผ ์ ์ํด์ฃผ๋๋ก ํ์. ์ด ๋, ํ๋ผ๋ฏธํฐ ๋ฒ์๋ฅผ ์ง์ ํ ๋๋ ๋ฐฉ๊ธ ์์์ ์ ์ํ Decision Tree ๋ชจ๋ธ ๊ฐ์ฒด์ ์์ฑ๊ฐ์ ์ด์ด๋ฐ๋๋ก ํ๋ค.
from pyspark.ml.tuning import ParamGridBuilder
# ํ๋ผ๋ฏธํฐ ํ๋ํ ๋ฒ์๋ฅผ ์ง์ ํ๋ param grid
param_grid = ParamGridBuilder().addGrid(dt_clf.maxDepth, [5, 20]) \
.addGrid(dt_clf.minInstancesPerNode, [2, 50])\
.build()
์ ์ฝ๋๋ฅผ ๋ณด๋ฉด ์ฝ๊ฐ ๋ฌธ๋ฒ ํํ๊ฐ ์ด์ํ๋ค๊ณ ๋๋ ์๋ ์๋ค. Functional API ํํ๋ก ์์ฑ๋์๋๋ฐ, Tensorflow 2.x ๋ฒ์ ๋ฌธ๋ฒ์ ์ต์ํ์ ๋ถ๋ค์ ์๋ง ์์ฝ๊ฒ ์ดํดํ ์ ์์ ๊ฒ์ด๋ค. ์์ ๊ฐ์ ๋ฐฉ์์ ํ์ฉํด์ ์ฌ์ฉํ ๋ชจ๋ธ์ ํ๋ผ๋ฏธํฐ ๋ฒ์๋ฅผ ์ง์ ํด์ฃผ์.
๋ค์์ผ๋ก๋ ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ ๋ ๋ชจ๋ธ์ ์ฑ๋ฅ์ ํ๊ฐํ Evaluator๋ฅผ ์ ์ํด์ฃผ๊ณ ์ค์ง์ ์ธ '๊ต์ฐจ๊ฒ์ฆ'์ ์ํํ ํด๋์ค์ธ CrossValidator
ํด๋์ค๋ฅผ ์ ์ํ์.
from pyspark.ml.tuning import CrossValidator
# ํ๋ผ๋ฏธํฐ ํ๋ํ๋ฉด์ ์ธก์ ํ ๋ฉํธ๋ฆญ evaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
# Cross Validator ๊ฐ์ฒด ์์ฑ -> [๋ชจ๋ธ, Param grid, evaluator, ํด๋ ์] ์ธ์๋ก ๋ฃ๊ธฐ
cv = CrossValidator(estimator=dt_clf,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3)
โ
์์์ CrossValidator
ํด๋์ค๋ฅผ ํ์ฉํด ์ ์ํ ๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด์ ํ์ต ๋ฐ์ดํฐ์ ๋ํด ํ์ต, ํ๋ผ๋ฏธํฐ ํ๋, ๊ต์ฐจ๊ฒ์ฆ์ ์ํ์์ผ๋ณด์. fit()
๋ฉ์๋๋ฅผ ํ์ฉํ๋ฉด ๋๋ค.
cv_model = cv.fit(train_vec)
โ
์ฐธ๊ณ ๋ก ์์ fit()
๋ฉ์๋๋ฅผ ํ์ฉํ๊ฒ ๋๋ฉด ํ๋ผ๋ฏธํฐ ํ๋์ ์ํํ ํ, ๊ฐ์ฅ ์ต๊ณ ์ ์ฑ๋ฅ์ ๋๋ฌํ ํ๋ผ๋ฏธํฐ๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ง์ง๋ง์ ์ฌํ์ต(refit)ํ ํ์ ๋ชจ๋ธ ๊ฐ์ฒด๋ฅผ ๋ฐํํ๊ฒ ๋๋ค. ๊ทธ๋์ ์ด ๋ชจ๋ธ ๊ฐ์ฒด(์ ์ฝ๋ ์์ผ๋ก๋ cv_model
์ด๋ผ๋ ๋ณ์)์ transform()
๋ฉ์๋๋ฅผ ํ์ฉํด์ ํ
์คํธ ๋ฐ์ดํฐ์ ๋ํด ์ต์ข
์์ธก์ ์ํํ ์ ์๊ฒ ๋๋ค.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# ํ
์คํธ ๋ฐ์ดํฐ์ ๋ํด์ cv_model ๊ฐ์ฒด์ transform ๋ฉ์๋๋ฅผ ํ์ฉ
prediction = cv_model.transform(test_vec) # spark.dataframe
# ํ
์คํธ์ฉ evaluator
test_evaluator = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
test_acc = test_evaluator.evaluate(prediction)
print('Test_acc:', round(test_acc, 3))
2. ๊ฐ์ฅ ๋ง์ง๋ง์ ๋ฐ์ดํฐ๋ง ๊ฒ์ฆ ๋ฐ์ดํฐ๋ก ์ฌ์ฉํ์, TrainValidationSplit
๋ชจ๋ธ์ ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ๋ ๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ผ๋ก TrainValidationSplit
ํด๋์ค๊ฐ ์กด์ฌํ๋ค. TrainValidationSplit
๋ CrossValidator
์๋ ๋ฌ๋ฆฌ ์ ์ฒด ๋ฐ์ดํฐ ์ค ํน์ ๋น์จ์ ๊ฐ์ฅ ๋ง์ง๋ง ์์์ ์๋ ๋ฐ์ดํฐ๋ง ๊ฒ์ฆ ๋ฐ์ดํฐ๋ก ํ์ฉํ๋ ๊ฒ์ด๋ค. ๋ ๊ฐ์ง์ ์ฐจ์ด์ ์ ์์๋ณด๊ธฐ ์ํ ๋์ํ ๊ทธ๋ฆผ์ ์๋์ ๊ฐ๋ค.
์ ๊ทธ๋ฆผ์ ๋ณด๋ฉด ์์ฐ์ค๋ฝ๊ฒ ๊ฒ์ฆ์ ์ํํ๋ ํ์์์๋ ์ฐจ์ด๊ฐ ๋๋ ๊ฒ์ ์ ์๊ฐ ์๋ค. ์ผ์ชฝ์ 5๋ฒ ์ํํ๋ ๋ฐ๋ฉด, ์ค๋ฅธ์ชฝ์ 1๋ฒ ๋ฐ์ ์ํํ์ง ์๋๋ค. ๊ทธ๋ ๋ค๋ฉด ๊ตณ์ด ์ค๋ฅธ์ชฝ์ ๋ฐฉ๋ฒ์ ์ด์ฉํ ํ์๊ฐ ์์๊น? ์ค๋ฅธ์ชฝ ๋ฐฉ๋ฒ์ ์ด์ฉํ๋ ๊ฒฝ์ฐ๋ฅผ ๋ค์๋ฉด, ํ๋ํ ํ๋ผ๋ฏธํฐ ๋ฒ์๊ฐ ๋๋ฌด ๋ง์ ๋ ์ด๋ค. ๋ง์ฝ K-fold ๊ต์ฐจ๊ฒ์ฆ ๋ฐฉ๋ฒ์ ์ด์ฉํ๋ค๋ฉด ๊ทธ๋ฆฌ๋ ์์น๋ฅผ ์ํํ๋ ๋ฐ ๋งค์ฐ ๋ง์ ์๊ฐ์ด ๊ฑธ๋ฆด ๊ฒ์ด๋ค. ๊ทธ๋์ ์ด์ ๊ฐ์ด ํ๋ํ ํ๋ผ๋ฏธํฐ ๋ฒ์๊ฐ ๊ทน๋๋ก ๋ง์ ๋๋ ๋จ์ํ 1๋ฒ๋ง ๊ฒ์ฆ์ ์ํํ๋ ์ค๋ฅธ์ชฝ์ ๋ฐฉ๋ฒ์ ์ด์ฉํ๋ ๊ฒ์ด ์๊ฐํจ์จ์ ์ธ ์ธก๋ฉด์์๋ ์ ๋ฆฌํ ์๋ ์๊ฒ ๋ค. ๋ฌผ๋ก ์ผ์ชฝ ๋ฐฉ๋ฒ์ ์ด์ฉํ์ ๋๋ณด๋ค ๋ชจ๋ธ ์ฑ๋ฅ์ ๊ฐ๊ด์ฑ์ ๋น์ฐํ ๋ฎ์ ์ ๋ฐ์ ์๋ค๋ ๊ฒ์ ๊ฐ์ํด์ผ ํ๋ค.
์ด์ ์ฝ๋๋ก ์์๋ณด์. ๋จ์ง ์ํ ๋ฐฉ๋ฒ ์ฆ, ์ฌ์ฉํ๋ ํด๋์ค ์ด๋ฆ๋ง ๋ค๋ฅผ ๋ฟ, ์คํํฌ์ ๋ฌธ๋ฒ์ ์ฐจ์ด๋ ๊ฑฐ์ ๋์ผํ๋ค. ์๋ ์ฝ๋๋ฅผ ์ฒ์ฒํ ์ดํด๋ณด๋๋ก ํ์.
# TrainValidation Split์ ํ์ฉํด์ ์ํํ๊ธฐ
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
vec_assembler = VectorAssembler(inputCols=train.columns[:-1],
outputCol='feature')
train_vec = vec_assembler.transform(train)
test_vec = vec_assembler.transform(test)
dt_clf = DecisionTreeClassifier(featuresCol='feature', labelCol='label')
param_grid = ParamGridBuilder().addGrid(dt_clf.maxDepth, [5, 10]) \
.addGrid(dt_clf.minInstancesPerNode, [7, 20]) \
.build()
evaluator = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
tvs = TrainValidationSplit(estimator=dt_clf, estimatorParamMaps=param_grid,
evaluator=evaluator, trainRatio=0.75, seed=49)
# ํ์ต ๋ฐ์ดํฐ
tvs_model = tvs.fit(train_vec)
test_pred = tvs_model.transform(test_vec)
test_eval = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
test_acc = test_eval.evaluate(test_pred)
print('Test Accuracy:', round(test_acc, 3))
3. Pipeline์ ํ์ฉํ๊ธฐ
์ง๊ธ๊น์ง ๋ฐฐ์ด ํ๋ผ๋ฏธํฐ ํ๋, ๊ต์ฐจ๊ฒ์ฆ ์ํ ๊ณผ์ ์ ํ๋์ ํ์ดํ๋ผ์ธ์ผ๋ก ํ์ฉํด์ ML flow ๊ณผ์ ์ ๊ฐ๋จํํ์ฌ ์ฝ๋์ ๋ง์ฐ์ฑ์ ์ค์ผ ์๋ ์๋ค. Pipeline
ํด๋์ค๋ฅผ ํ์ฉํด ํ๋ผ๋ฏธํฐ ํ๋, ๊ต์ฐจ๊ฒ์ฆ ์ํํ๋ ๊ณผ์ ์ ํฌ๊ฒ 2๊ฐ์ง๋ก ์ฌ์ฉํด ๋ณผ ์ ์๋ค.
3-1. CrossValidator ์์์ Pipeline์ ํ์ฉํ๊ธฐ
์ ์ํ Pipeline
์ CrossValidator
์ ๋ชจ๋ธ ๊ฐ์ฒด์ ๋ฃ์ด์ฃผ์ด ํ์ฉํ ์๊ฐ ์๋ค. ์ด์ ๋ํด์๋ ์ค์ง์ ์ธ ์ฝ๋๋ฅผ ๋ด์ผ ์ดํด๊ฐ ์์ํ ๊ฒ์ด๋ค. ๋จผ์ ํ์ดํ๋ผ์ธ์ ์ ์ํด์ผ ํ๋๋ฐ, ์ฌ๊ธฐ์๋ Feature Vectorization ํ๋ ๊ณผ์ ๊ณผ ๋ชจ๋ธ์ ์ ์ํ๋ ๋ถ๋ถ์ ํ๋์ ํ์ดํ๋ผ์ธ์ผ๋ก ๋ง๋ค์๋ค.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
#====================================
# 1.CrossValidator ์์์ ํ์ดํ๋ผ์ธ์ ์ ์ฉ
#====================================
train, test = iris_sdf.randomSplit(weights=[0.7, 0.3], seed=43)
vec_stage = VectorAssembler(inputCols=train.columns[:-1],
outputCol='feature')
dt_stage = DecisionTreeClassifier(featuresCol='feature',
labelCol='label')
# [Feature Vectorization -> ๋ชจ๋ธ]์ ํ๋์ ํ์ดํ๋ผ์ธ์ผ๋ก ์ ์
pipeline_01 = Pipeline(stages=[vec_stage, dt_stage])
์ด์ ํ๋ํ ํ๋ผ๋ฏธํฐ ๋ฒ์์ ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ๋ฉด์ ๋ชจ๋ธ์ ์ฑ๋ฅ์ ํ๊ฐํ evaluator๋ฅผ ์ ์ํด๋ณด์.
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# ํ๋ผ๋ฏธํฐ ํ๋ grid
param_grid = ParamGridBuilder().addGrid(dt_stage.maxDepth, [8, 20]) \
.addGrid(dt_stage.minInstancesPerNode, [5, 10]) \
.build()
# ๋ฉํธ๋ฆญ evaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
๋ค์์ผ๋ก๋ CrossValidator
ํด๋์ค๋ฅผ ํ์ฉํ๋๋ฐ, ์ด ๋ ์ธ์์ ์์์ ์ ์ํ ํ์ดํ๋ผ์ธ ๊ฐ์ฒด๋ฅผ ๋ฃ์ด์ฃผ๋๋ก ํ์. ๋๋จธ์ง ํ๋ผ๋ฏธํฐ ๋ฒ์, evaluator ๋ฅผ ๋ฃ์ด์ฃผ๋ ๋ถ๋ถ์ ๋์ผํ๋ค. ๊ทธ๋ฆฌ๊ณ fit()
๋ฉ์๋๋ฅผ ํ์ฉํด ํ์ต ๋ฐ์ดํฐ์ ๋ํด ์ํํ๊ณ ๋ฐํ๋ ๋ชจ๋ธ ๊ฐ์ฒด์ transform()
๋ฉ์๋๋ก ํ
์คํธ ๋ฐ์ดํฐ์ ๋ํด ์ต์ข
์์ธก์ ์ํํ๋ค. ์ด์ ๋ํ ์ฝ๋๋ ์๋์ ๊ฐ๋ค.
from pyspark.ml.tuning import CrossValidator
# Cross Validator์์ ํ์ดํ๋ผ์ธ ์ ์
cv = CrossValidator(estimator=pipeline_01, estimatorParamMaps=param_grid,
evaluator=evaluator, numFolds=3)
# ํ์ต ๋ฐ์ดํฐ์ ๋ํด ํ๋ผ๋ฏธํฐ ํ๋ ๋ฐ ๊ต์ฐจ๊ฒ์ฆ
cv_model = cv.fit(train)
# ํ
์คํธ ๋ฐ์ดํฐ์ ๋ํด ์ต์ ์ ํ๋ผ๋ฏธํฐ ๋ชจ๋ธ๋ก ์์ธก ์ํ
test_pred = cv_model.transform(test)
test_eval = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
test_acc = test_eval.evaluate(test_pred)
print('Test Accuracy:', round(test_acc, 3))
3-2. Pipeline ์์์ CrossValidator๋ฅผ ์ ์ํ๊ธฐ
ํ์ดํ๋ผ์ธ์ ํ์ฉํด์ ํ๋ผ๋ฏธํฐ ํ๋ ๋ฐ ๊ต์ฐจ๊ฒ์ฆ์ ์ํํ๋ ๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ผ๋ก๋ [3-1.๋ชฉ์ฐจ] ์๋ ์์๋ฅผ ๋ฐ๋๋ก ํ ๋ฐฉ๋ฒ์ด๋ค. ์ฆ, ์ด๋ฒ์๋ Pipeline
์ ์ ์ธํ ๋ ์ค์ ํ๋ stage ์ค ํ๋๋ก CrossValidator
๋ฅผ ์ ์ํ๋ ๊ฒ์ด๋ค. ์ด ๋ชฉ์ฐจ์ ๋ํ ์ฝ๋๋ ํ๋์ฉ ์ค๋ช
ํ์ง๋ ์๊ฒ ๋ค. ๋จ์ํ [3-1. ๋ชฉ์ฐจ]์ ML Flow ์์๋ฅผ ๋ฐ๊พธ์ด ๋์ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ์๋ ์ฝ๋์ ์ฃผ์์ ์ฝ์ผ๋ฉด์ ์ดํดํด๋ณด๋ฉด ๊ทธ๋ฆฌ ์ด๋ ต์ง ์์ ๊ฒ์ด๋ค.
from pyspark.ml import Pipeline
#====================================
# 2.ํ์ดํ๋ผ์ธ ์์์ CrossValidator๋ฅผ ์ ์ฉ
#====================================
train, test = iris_sdf.randomSplit(weights=[0.7, 0.3], seed=43)
# 1๋ฒ์งธ) stage: Feaure Vectorization
vec_stage = VectorAssembler(inputCols=train.columns[:-1],
outputCol='feature')
# 2๋ฒ์งธ) stage: [๋ชจ๋ธ - param_grid - cv ๊ฐ์ฒด]
dt_clf = DecisionTreeClassifier(featuresCol='feature',
labelCol='label')
param_grid = ParamGridBuilder().addGrid(dt_clf.maxDepth, [5, 15]) \
.addGrid(dt_clf.minInstancesPerNode, [3, 10]) \
.build()
evaluator = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
cv_stage = CrossValidator(estimator=dt_clf,
estimatorParamMaps=param_grid,
evaluator=evaluator)
# ํ์ดํ๋ผ์ธ ์์ฑ
pipeline_02 = Pipeline(stages=[vec_stage, cv_stage])
# ํ์ต ๋ฐ์ดํฐ์ ๋ํด ํ๋ผ๋ฏธํฐ ํ๋ ๋ฐ ๊ต์ฐจ๊ฒ์ฆ
pipeline_model = pipeline_02.fit(train)
avg_metrics = pipeline_model.stages[-1].avgMetrics
print('Train Accuracy :',
round(sum(avg_metrics)/len(avg_metrics), 3)
)
print()
# ํ
์คํธ ๋ฐ์ดํฐ์ ๋ํด ์์ธก ์ํ
test_pred = pipeline_model.transform(test)
test_eval = MulticlassClassificationEvaluator(labelCol='label',
predictionCol='prediction',
metricName='accuracy')
test_acc = test_eval.evaluate(test_pred)
print('Test Accuracy:', round(test_acc, 3))
'Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[PySpark] Spark๋ก Encoding ๊ณผ Scaling ์ ์ํํด๋ณด์! (0) | 2022.03.07 |
---|---|
[PySpark] Spark์ Pipeline์ผ๋ก ๋ถ๋ฅ ๋ชจ๋ธ์ ๋ง๋ค์ด๋ณด์! (0) | 2022.02.20 |
[PySpark] Spark์ Dataframe API๋ฅผ ์์๋ณด์!(2) (0) | 2022.02.05 |
[PySpark] Spark์ Dataframe API๋ฅผ ์์๋ณด์!(1) (4) | 2022.02.03 |
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ - Ingestion&Transformation(Event Streaming) (0) | 2021.04.25 |