๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] Spark์˜ Pipeline์œผ๋กœ ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ ๋งŒ๋“ค์–ด๋ณด์ž!

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ํ•ด๋‹น ํฌ์ŠคํŒ…์€ ์ŠคํŒŒํฌ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ์ฑ…๊ณผ ์ธํ”„๋Ÿฐ์˜ ์ŠคํŒŒํฌ ๋จธ์‹ ๋Ÿฌ๋‹ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ๊ฐ•์˜๋กœ ๊ณต๋ถ€ํ•œ ํ›„ ๋ฐฐ์šด ๋‚ด์šฉ์„ ์ €๋งŒ์˜ ๋ฐฉ์‹์œผ๋กœ ์žฌ๊ตฌ์„ฑํ•œ ๊ฒƒ์ž„์„ ์•Œ๋ฆฝ๋‹ˆ๋‹ค. ํŠนํžˆ, ์ฐธ๊ณ ํ•œ ์ธํ”„๋Ÿฐ ๊ฐ•์˜์˜ ๊ฐ•์˜ ์ž๋ฃŒ๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•˜์Œ์„ ํ•„ํžˆ ์•Œ๋ฆฝ๋‹ˆ๋‹ค!

Apache Spark๋ฅผ Python์œผ๋กœ ์ด์šฉํ•˜๋Š” PySpark์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž


์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” PySpark๋ฅผ ํ™œ์šฉํ•ด์„œ ๊ฐ„๋‹จํ•œ ๋จธ์‹ ๋Ÿฌ๋‹ ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ ๋งŒ๋“ค์–ด๋ณด๋„๋ก ํ•˜์ž. ๊ทธ๋ฆฌ๊ณ  ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ ๋งŒ๋“œ๋Š” ๋ฐ ๊ฑฐ์น˜๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€ ๊ณผ์ •์„ ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋Š” PySpark์˜ Pipeline์„ ํ™œ์šฉํ•ด๋ณด๋Š” ๋ฐฉ๋ฒ•๋„ ์•Œ์•„๋ณด๋„๋ก ํ•˜์ž.

1. PySpark๋กœ ๊ฐ„๋‹จํ•œ Multi-Class ๋ถ„๋ฅ˜ ๋ชจ๋ธ ๋งŒ๋“ค๊ธฐ

ํ•ด๋‹น ํฌ์ŠคํŒ…์—์„œ๋Š” ๋Œ€ํ‘œ์ ์ธ ๋ฒค์น˜๋งˆํฌ ๋ฐ์ดํ„ฐ์ธ IRIS(๋ถ“๊ฝƒ) ๋ฐ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ•œ ๋ฉ€ํ‹ฐ-ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ ๋งŒ๋“ค์–ด๋ณด์ž. ์šฐ์„  ์•„๋ž˜์˜ Scikit-learn ๋ชจ๋“ˆ์„ ํ™œ์šฉํ•ด์„œ ๋ถ“๊ฝƒ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•˜๊ณ  ์ŠคํŒŒํฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋ณ€ํ™˜์‹œํ‚ค์ž.

 

from sklearn.datasets import load_iris
import pandas as pd
import numpy as np

iris = load_iris()
data = iris.data
label = iris.target

iris_df = pd.DataFrame(data, columns=iris.feature_names)
iris_df['target'] = label

spark_df = spark.createDataFrame(iris_df)

1-1. ๋…๋ฆฝ๋ณ€์ˆ˜๋“ค์„ ํ•˜๋‚˜์˜ ๋ฒกํ„ฐ๋กœ, Feature Vectorization!

Feature Vectorization์€ ์ŠคํŒŒํฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ์žˆ๋Š” ํ•˜๋‚˜ ๋˜๋Š” ๋‘˜ ์ด์ƒ์˜ ๋…๋ฆฝ๋ณ€์ˆ˜(Feature)๋“ค์„ ํ•˜๋‚˜์˜ ๋ฒกํ„ฐ๋กœ ๋ฌถ๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•œ๋‹ค. ๊ทธ๋ฆผ์œผ๋กœ ํ‘œ์‹œํ•˜๋ฉด ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

Feature Vector๋ฅผ ๋งŒ๋“œ๋Š” ๊ฒƒ์ด Feature Vectorization ์ด๋‹ค!

 

์œ„ ๊ทธ๋ฆผ์„ ๋ณด๋ฉด Feature Vectorization ๊ณผ์ •์ด ์–ด๋–ค ๊ฒƒ์„ ์˜๋ฏธํ•˜๋Š”์ง€ ์ดํ•ดํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ์ด๋‹ค. ์œ„ ๊ณผ์ •์„ ์ŠคํŒŒํฌ๋กœ ๊ตฌํ˜„ํ•˜๋ ค๋ฉด ์ŠคํŒŒํฌ์˜ ML ๋ชจ๋“ˆ์—์„œ ์ œ๊ณตํ•˜๋Š” VectorAssembler ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

from pyspark.ml.feature import VectorAssembler

# 1. Train/Test ๋ฐ์ดํ„ฐ๋กœ ๋ถ„ํ• 
train_df, test_df = spark_df.randomSplit(weights=[0.8, 0.2], seed=42)

# 2. Feature Vector๋กœ ๋งŒ๋“ค๊ธฐ
ftr_columns = train_df.columns[:-1]
vec_assembler = VectorAssembler(inputCols=ftr_columns, outputCol='features')

train_ftr_vec = vec_assembler.transform(train_df)
test_ftr_vec = vec_assembler.transform(test_df)  # ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด๋„ ๋™์ผํ•œ ๊ฐ์ฒด ์‚ฌ์šฉ

print('# ํ•™์Šต ์›๋ณธ ๋ฐ์ดํ„ฐ:')
train_df.limit(5).show()

print('# Feature Vectorization ํ›„ ํ•™์Šต ๋ฐ์ดํ„ฐ:')
train_ftr_vec.limit(5).show()

 

VectorAssembler ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” inputCols ์ธ์ž์— ๋…๋ฆฝ๋ณ€์ˆ˜๋“ค ์นผ๋Ÿผ ์ด๋ฆ„๋“ค์„ ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ๋กœ ๋„ฃ์–ด์ฃผ๋ฉด ๋œ๋‹ค. outputCol ์—๋Š” ๋…๋ฆฝ๋ณ€์ˆ˜๋“ค์„ ํ•˜๋‚˜์˜ ๋ฒกํ„ฐํ™” ์ฆ‰, Feature Vectorization ์‹œ์ผฐ์„ ๋•Œ ๋งŒ๋“ค์–ด์งˆ ํ•˜๋‚˜์˜ ์นผ๋Ÿผ์— ๋Œ€ํ•ด ๋ช…์‹œํ•ด์ค„ ์นผ๋Ÿผ ์ด๋ฆ„์ด๋‹ค.

 

์œ„ ์ฝ”๋“œ์—์„œ๋Š” 'features' ๋ผ๋Š” ๋ฌธ์ž์—ด๋กœ ๋ช…์‹œํ•ด์ฃผ์—ˆ๋Š”๋ฐ, ์•„๋ž˜ ์ฝ”๋“œ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด features ๋ผ๋Š” ์ƒˆ๋กœ์šด ์นผ๋Ÿผ์ด ์ƒ๊ธด ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ์ด๋ ‡๊ฒŒ ํ•˜๋‚˜์˜ VectorAssembler ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•ด์ค€ ํ›„, ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด transform ๋ฉ”์†Œ๋“œ๋กœ ์‹ค์งˆ์ ์ธ Feature Vectorization์„ ์ˆ˜ํ–‰ํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

 

๋‹จ, ๊ฒ€์ฆ(๋˜๋Š” ํ…Œ์ŠคํŠธ) ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด์„œ๋„ Feature Vectorization์„ ์ ์šฉํ•ด์ค„ ๋•Œ ํ•™์Šต ๋ฐ์ดํ„ฐ์— ์ ์šฉํ•  ๋•Œ ๋งŒ๋“  ๋™์ผํ•œ VectorAssembler ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

 

์œ„ ์ฝ”๋“œ ๊ฒฐ๊ณผ์ธ ๋‘ VectorAssembler ์ ์šฉ ์ „/ํ›„ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๊ฒฐ๊ณผ ์ฐจ์ด๋Š” ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

VectorAssembler ์ ์šฉ ์ „/ํ›„์˜ ์ฐจ์ด

1-2. ๋ฉ€ํ‹ฐ-ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ๋ชจ๋ธ๋กœ ํ•™์Šต, ์˜ˆ์ธกํ•˜๊ธฐ

ํ•ด๋‹น ์˜ˆ์‹œ์—์„œ๋Š” ์‚ฌ์šฉํ•  ๋ถ„๋ฅ˜ ๋ชจ๋ธ๋กœ์„œ ์˜์‚ฌ๊ฒฐ์ • ๋‚˜๋ฌด(Decision Tree)๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ๋กœ ํ•˜์ž. ์‚ฌ์ดํ‚ท๋Ÿฐ ๋•Œ์™€ ๋น„์Šทํ•œ API ์ด์ง€๋งŒ ์ฐจ์ด์ ์ด ์–ด๋Š์ •๋„ ์žˆ๋‹ค. ๋จผ์ € ์•„๋ž˜์˜ ์ฝ”๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•ด์„œ ๋ชจ๋ธ์„ ์ •์˜ํ•˜๊ณ  ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ๋ชจ๋ธ์„ ํ•™์Šต์‹œํ‚ค๊ณ  ์˜ˆ์ธกํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์‚ดํŽด๋ณด์ž.

 

from pyspark.ml.classification import DecisionTreeClassifier

# ๋ชจ๋ธ ์ •์˜
dt_clf = DecisionTreeClassifier(featuresCol='features', labelCol='target', maxDepth=10)

# ํ•™์Šต
dt_model = dt_clf.fit(train_ftr_vec)

# ํ•™์Šต, ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ์˜ˆ์ธก
train_pred = dt_model.transform(train_ftr_vec)
test_pred = dt_model.transform(test_ftr_vec)

 

๋จผ์ € ๋ชจ๋ธ์„ ์ •์˜ํ•  ๋•Œ, ์‚ฌ์šฉํ•  ๋ชจ๋ธ์˜ ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•ด์„œ featuresCol ์—๋Š” [1-1. ๋ชฉ์ฐจ]์—์„œ ๋งŒ๋“  Feature Vector์˜ ์นผ๋Ÿผ ์ด๋ฆ„์„ ๋ช…์‹œํ•ด์ค€๋‹ค. ๊ทธ๋ฆฌ๊ณ  labelCols ์ธ์ž์—๋Š” ์šฐ๋ฆฌ๊ฐ€ ์˜ˆ์ธกํ•˜๋ ค๋Š” '์ •๋‹ต' ์ฆ‰, ์ข…์†๋ณ€์ˆ˜ ์นผ๋Ÿผ์˜ ์ด๋ฆ„์„ ๋ช…์‹œํ•ด์ค€๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์ถ”๊ฐ€์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๋ ค๋Š” ๋จธ์‹ ๋Ÿฌ๋‹ ๋ชจ๋ธ์˜ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ์ธ์ž๋ฅผ ์ถ”๊ฐ€ํ•ด์ค€๋‹ค. ์—ฌ๊ธฐ์„œ๋Š” maxDepth ๋ผ๋Š” Decision Tree ๋ชจ๋ธ์˜ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๋ช…์‹œํ–ˆ๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  ํ•™์Šต ์‹œ, Feature Vectorization ์‹œ์ผœ์ค€ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ fit() ๋ฉ”์†Œ๋“œ์— ๋„ฃ์–ด์ค€๋‹ค. ์ด ๋•Œ, fit() ๋ฉ”์†Œ๋“œ๋ฅผ ํ˜ธ์ถœํ•  ์‹œ, Model ํด๋ž˜์Šค ๊ฐ์ฒด๊ฐ€ ๋ฐ˜ํ™˜๋œ๋‹ค. ์ด๊ฒƒ์ด ์•„๋งˆ ์‚ฌ์ดํ‚ท๋Ÿฐ๊ณผ์˜ ๊ฐ€์žฅ API ์  ์ฐจ์ด์ ์ด๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๊ฒ ๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  ํ•™์Šต, ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ๋ชจ๋ธ์˜ ์˜ˆ์ธก๊ฐ’์„ ๋‚ด๋ฑ‰๊ธฐ ์œ„ํ•ด์„œ๋Š” fit() ๋ฉ”์†Œ๋“œ๋กœ ๋ฐ˜ํ™˜๋œ Model ํด๋ž˜์Šค ๊ฐ์ฒด์—์„œ transform() ์ด๋ผ๋Š” ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•œ๋‹ค. transform() ๋ฉ”์†Œ๋“œ ์ธ์ž์—๋Š” ์˜ˆ์ธกํ•˜๋ ค๋Š” Feature Vectorization์ด ์ ์šฉ๋œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋„ฃ์–ด์ฃผ๋ฉด ๋œ๋‹ค. ์ด๋ ‡๊ฒŒ ์˜ˆ์ธกํ•จ์œผ๋กœ์จ ๋ฐ˜ํ™˜๋œ ๊ฐ์ฒด๋“ค(์œ„ ์ฝ”๋“œ ์ƒ์—์„œ๋Š” train_pred, test_pred ๋ณ€์ˆ˜)์€ ์ŠคํŒŒํฌ์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๊ฐ์ฒด๋กœ ์ด๊ฒƒ๋“ค์˜ ๊ฒฐ๊ณผ๋ฅผ ์ถœ๋ ฅํ•ด๋ณด๋ฉด ์•„๋ž˜ ์‚ฌ์ง„์ฒ˜๋Ÿผ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์นผ๋Ÿผ๋“ค์ด ์žˆ๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์˜ˆ์ธก๊ฐ’ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„

 

์œ„ ์‚ฌ์ง„์„ ๋ณด๋ฉด ์•Œ๋‹ค์‹œํ”ผ ์›๋ณธ Feature, Feature Vectorization๋œ ๊ฒƒ, ๋ชจ๋ธ์ด ์˜ˆ์ธกํ•œ Rawํ•œ ๊ฒฐ๊ณผ๊ฐ’, ๊ทธ๋ฆฌ๊ณ  Rawํ•œ ๊ฒฐ๊ณผ๊ฐ’์— ํ™œ์„ฑํ•จ์ˆ˜๋ฅผ ์ ์šฉํ•ด ๋ฐ˜ํ™˜๋œ ํ™•๋ฅ  ๊ฐ’, ์ตœ์ข… ์˜ˆ์ธก๊ฐ’๊นŒ์ง€ ๋‹ด๊ฒจ์žˆ๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ์šฐ๋ฆฌ๋Š” ๋ฐ‘์—์„œ ๋ชจ๋ธ ์„ฑ๋Šฅ์„ ํ‰๊ฐ€ํ•˜๊ธฐ ์œ„ํ•ด์„œ prediction ์ด๋ผ๋Š” ์นผ๋Ÿผ์— ์žˆ๋Š” ๊ฐ’๋“ค์„ ํ™œ์šฉํ•˜๋ฉด ๋œ๋‹ค.

1-3. ๋ชจ๋ธ์˜ ์„ฑ๋Šฅ ํ‰๊ฐ€ํ•˜๊ธฐ

ํ•ด๋‹น ์˜ˆ์‹œ์—์„œ ํ•ด๊ฒฐํ•˜๋ ค๋Š” ๋ฌธ์ œ๋Š” ๋ฉ€ํ‹ฐ-ํด๋ž˜์Šค ๋ถ„๋ฅ˜ ๋ฌธ์ œ์ด๊ธฐ ๋•Œ๋ฌธ์— ์ด์— ๋งž๋Š” Evaluator ํด๋ž˜์Šค๋ฅผ ํ™œ์šฉํ•ด์•ผ ํ•œ๋‹ค.

 

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='target', metricName='accuracy')

train_acc = evaluator.evaluate(train_pred)
test_acc = evaluator.evaluate(test_pred)

print('Train Accuracy:', train_acc)
print('Test Accuracy:', test_acc)

 

์ฐธ๊ณ ๋กœ ์ŠคํŒŒํฌ์—๋Š” ๋ฉ€ํ‹ฐ-ํด๋ž˜์Šค๊ฐ€ ์•„๋‹Œ ๋ฉ€ํ‹ฐ-๋ ˆ์ด๋ธ” ๋ฌธ์ œ์— ๋Œ€ํ•ด ๋‹ค๋ฃจ๋Š” MultilabelClassificationEvaluator ๋„ ์กด์žฌํ•œ๋‹ค. ๋ฉ€ํ‹ฐ-ํด๋ž˜์Šค, ๋ฉ€ํ‹ฐ-๋ ˆ์ด๋ธ”์˜ ์ฐจ์ด์ ์€ ๋ฌด์—‡์ธ์ง€ ์˜ˆ์ „ ํฌ์ŠคํŒ…์—์„œ๋„ ์–ธ๊ธ‰ํ–ˆ์—ˆ๋‹ค. ํด๋ž˜์Šค๊ฐ€ ์—ฌ๋Ÿฌ๊ฐœ์ธ ๊ฒƒ ์ค‘ ๋ฌด์กฐ๊ฑด ํ•˜๋‚˜๋งŒ์„ ๊ฐ€์ ธ์•ผ ํ•œ๋‹ค๋ฉด ๋ฉ€ํ‹ฐ-ํด๋ž˜์Šค, ๋™์‹œ์— ์—ฌ๋Ÿฌ๊ฐœ์˜ ํด๋ž˜์Šค๋ฅผ ๊ฐ€์ ธ๋„ ๋œ๋‹ค๋ฉด ๋ฉ€ํ‹ฐ-๋ ˆ์ด๋ธ”์ด๋‹ค.

 

2. Pipeline์„ ํ™œ์šฉํ•ด ์ „์ฒ˜๋ฆฌ, ๋ชจ๋ธ๋ง ๊ณผ์ •์„ ๊ฐ„์†Œํ™”์‹œํ‚ค์ž!

๋‹ค์Œ์€ ์ŠคํŒŒํฌ์—์„œ ์ œ๊ณตํ•˜๋Š” Pipeline ์ด๋ผ๋Š” ํด๋ž˜์Šค๋ฅผ ํ™œ์šฉํ•ด์„œ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ, Feature Vectorization, ๋ชจ๋ธ๋ง ๊ณผ์ •์„ ๊ฐ„์†Œํ™”์‹œํ‚ฌ ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ๋ฐฐ์›Œ๋ณด์ž. ์‚ฌ์ดํ‚ท๋Ÿฐ๋„ Pipeline์ด๋ผ๋Š” ํด๋ž˜์Šค๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ, ์˜ˆ์ „์— ์‚ฌ์šฉํ•˜๋ฉด์„œ ํŽธ๋ฆฌํ–ˆ๋˜ ๊ฒฝํ—˜์ด ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ ๊ฒฝํ—˜์„ ์ŠคํŒŒํฌ์—์„œ๋„ ํ•  ์ˆ˜ ์žˆ๋‹ค!

 

๋จผ์ € ์ŠคํŒŒํฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ Train, Test ์šฉ ๋ฐ์ดํ„ฐ๋กœ ๋ถ„ํ• ํ•˜๊ณ  Feature Vectorization์„ ์ˆ˜ํ–‰ํ•˜๋Š” VectorAssembler ๋ฅผ ํ•˜๋‚˜์˜ ๋‹จ๊ณ„(stage)๋กœ ์ •์˜ํ•˜๊ณ , ๋˜ ํ•˜๋‚˜๋Š” ๋ชจ๋ธ์„ ์ •์˜ํ•จ์œผ๋กœ์จ ๋˜๋‹ค๋ฅธ ๋‹จ๊ณ„(stage)๋ฅผ ์ •์˜ํ•ด๋ณด์ž.

 

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

# 1. ๋ฐ์ดํ„ฐ ๋ถ„ํ• 
train_df, test_df = spark_df.randomSplit(weights=[0.8, 0.2], seed=42)

# 2. Feature Vector ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ์Šคํ…Œ์ด์ง€ ์ƒ์„ฑ
ftr_columns = train_df.columns[:-1]
stage_1 = VectorAssembler(inputCols=ftr_columns, outputCol='features')

# 3. ๋ชจ๋ธ ์ •์˜๋ฅผ ํ•˜๋‚˜์˜ ์Šคํ…Œ์ด์ง€๋กœ ์ƒ์„ฑ
stage_2 = DecisionTreeClassifier(featuresCol='features', 
                                 labelCol='target')

 

์ด์ œ ์ŠคํŒŒํฌ์—์„œ ์ œ๊ณตํ•˜๋Š” Pipeline ํด๋ž˜์Šค๋ฅผ ํ™œ์šฉํ•ด์„œ ์œ„์—์„œ ๋งŒ๋“  stage๋“ค์„ ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ์ •์˜ํ•ด ํ•œ๊บผ๋ฒˆ์— ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋งŒ๋“ค์–ด๋ณด์ž.

 

from pyspark.ml import Pipeline

# 4. ๋‘ ์Šคํ…Œ์ด์ง€๋ฅผ ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ๋งŒ๋“ค๊ธฐ
pipeline = Pipeline(stages=[stage_1, stage_2])

# 5. ํŒŒ์ดํ”„๋ผ์ธ์„ fit ํ•˜๋ฉด ๊ฐ ์Šคํ…Œ์ด์ง€์—์„œ transform ๋˜๋Š” fit ๋ฉ”์†Œ๋“œ๋ฅผ ํ•™์Šต ๋ฐ์ดํ„ฐ์— ์ˆ˜ํ–‰
pipeline_model = pipeline.fit(train_df)

# 6. 5๋ฒˆ ๋‹จ๊ณ„๋กœ ๋งŒ๋“  ํŒŒ์ดํ”„๋ผ์ธ ๋ชจ๋ธ์„ transform์œผ๋กœ ํ•™์Šต ๋ฐ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ์ˆ˜ํ–‰
train_pred = pipeline_model.transform(train_df)
test_pred = pipeline_model.transform(test_df)

 

์ •์˜๋œ ํŒŒ์ดํ”„๋ผ์ธ ๊ฐ์ฒด์—์„œ fit() ๋ฉ”์†Œ๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ฒŒ ๋˜๋ฉด ์ •์˜ํ•œ ๊ฐ stage๋“ค, ์—ฌ๊ธฐ์—์„œ๋Š” VectorAssembler ๊ฐ์ฒด์˜ transform() ๋ฉ”์†Œ๋“œ์™€ ๋ชจ๋ธ ๊ฐ์ฒด์—์„œ์˜ fit() ๋ฉ”์†Œ๋“œ๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ์ž๋™์œผ๋กœ ์‹คํ–‰์‹œ์ผœ์ค€๋‹ค. ์ฆ‰, ํ•™์Šต ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ, ๋ชจ๋ธ ํ•™์Šต์ด ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์ด๋‹ค.

 

ํ—ท๊ฐˆ๋ฆฌ์ง€ ๋ง์•„์•ผ ํ•  ์ ์€ ํŒŒ์ดํ”„๋ผ์ธ ๊ฐ์ฒด์—์„œ๋Š” fit() ๋ฉ”์†Œ๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•˜๋‹ˆ๊นŒ 'fit' ์ด๋ผ๋Š” ๋ฉ”์†Œ๋“œ ์ด๋ฆ„ ์ž์ฒด๋งŒ ๋ณด๊ณ  "์™œ VectorAssembler ๊ฐ์ฒด์—์„œ๋Š” transform() ์„ ์ˆ˜ํ–‰ํ•˜์ง€?" ๋ผ๊ณ  ์ƒ๊ฐํ•  ์ˆ˜ ์žˆ๋‹ค. ํ•˜์ง€๋งŒ ์ŠคํŒŒํฌ๋ฅผ ์ œ์ž‘ํ•œ ๊ฐœ๋ฐœ์ž ๋ถ„๋“ค์ด ํŒŒ์ดํ”„๋ผ์ธ ๊ฐ์ฒด์„œ fit() ๋ฉ”์†Œ๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•˜๋ฉด VectorAssembler ๊ฐ์ฒด์—์„œ๋Š” fit() ์ด ์•„๋‹Œ transform() ๋ฉ”์†Œ๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋กœ์ง์„ ์ž‘์„ฑํ•ด ๋‘์—ˆ๋‹ค.(์ฐธ๊ณ ๋กœ VectorAssembler ์—๋Š” fit() ๋ฉ”์†Œ๋“œ ์ž์ฒด๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š๋‹ค๋Š” ์ ๋„ ์žˆ๋‹ค)

 

์ด๋ ‡๊ฒŒ ํŒŒ์ดํ”„๋ผ์ธ ๊ฐ์ฒด์—์„œ fit() ๋ฉ”์†Œ๋“œ๋กœ ํŒŒ์ดํ”„๋ผ์ธ ๋ชจ๋ธ ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜๋ฐ›๋Š”๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๋‚œ ํ›„, ํŒŒ์ดํ”„๋ผ์ธ ๋ชจ๋ธ ๊ฐ์ฒด์—์„œ transform() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•ด ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ์˜ˆ์ธก์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋ ‡๊ฒŒ ๋ฐ˜ํ•œ๋œ ํ•™์Šต, ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์˜ˆ์ธก๊ฐ’ ๊ฒฐ๊ณผ ํ™”๋ฉด์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

ํŒŒ์ดํ”„๋ผ์ธ ๋ชจ๋ธ์˜ transform() ๊ฒฐ๊ณผ ํ™”๋ฉด

 

์ด๋ ‡๊ฒŒ ํŒŒ์ดํ”„๋ผ์ธ์„ ํ™œ์šฉํ•ด์„œ ์‰ฝ๊ฒŒ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ ๋ฐ ๋ชจ๋ธ ํ•™์Šต, ์˜ˆ์ธกํ•˜๋Š” ๊ณผ์ •์„ ํ•˜๋‚˜์˜ ๊ณผ์ •์œผ๋กœ ๊ฐ„๊ฒฐํ•˜๊ฒŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ฌผ๋ก  ํ•ด๋‹น ์˜ˆ์‹œ์—์„œ๋Š” ์Šค์ผ€์ผ๋ง์ด๋‚˜ ๋ฒ”์ฃผํ˜• ๋ณ€์ˆ˜์— ๋Œ€ํ•œ ๋ ˆ์ด๋ธ” ๋˜๋Š” ์›-ํ•ซ ์ธ์ฝ”๋”ฉ ๊ณผ์ •์ด ์กด์žฌํ•˜์ง€๋Š” ์•Š์ง€๋งŒ, ๋งŒ์•ฝ ์ด๋Ÿฌํ•œ ์ „์ฒ˜๋ฆฌ ๊ณผ์ •์ด ์ถ”๊ฐ€๋กœ ํ•„์š”ํ•˜๋‹ค๋ฉด ์œ„์—์„œ ํ–ˆ๋˜ ๊ฒƒ์ฒ˜๋Ÿผ ์ „์ฒ˜๋ฆฌ ์ˆœ์„œ๋Œ€๋กœ stage๋“ค์„ ์ •์˜ํ•ด์ฃผ๊ณ  ์ˆœ์„œ์— ๋งž๊ฒŒ ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ๋กœ ์ •์˜ํ•œ ํ›„ ํŒŒ์ดํ”„๋ผ์ธ ๊ฐ์ฒด๋กœ ๋งŒ๋“ค์–ด์ฃผ๋ฉด ๋œ๋‹ค.

 

์ฐธ๊ณ ๋กœ ํŒŒ์ดํ”„๋ผ์ธ ๋ชจ๋ธ ๊ฐ์ฒด๋ฅผ dir() ๋งค์ง ๋ฉ”์†Œ๋“œ๋กœ ์–ด๋–ค ์†์„ฑ๊ฐ’๋“ค์ด ์žˆ๋Š”์ง€ ์‚ดํŽด๋ณด๋ฉด ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์†์„ฑ๊ฐ’๋“ค์ด ์žˆ๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ๊ทธ ์ค‘์— stages๋ผ๋Š” ์†์„ฑ ๊ฐ’์ด ์•„๋ž˜์ฒ˜๋Ÿผ ์žˆ๋Š”๋ฐ, ์ด ์†์„ฑ ๊ฐ’์„ ํ†ตํ•ด์„œ ์›ํ•˜๋Š” ํŠน์ • ๋‹จ๊ณ„๋ฅผ ๋”ฐ๋กœ ์ถ”์ถœํ•ด์„œ ํ™œ์šฉํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

 

ํŒŒ์ดํ”„๋ผ์ธ ๋ชจ๋ธ ๊ฐ์ฒด์˜ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์†์„ฑ๊ฐ’

๋ฐ˜์‘ํ˜•