๐ ํด๋น ํฌ์คํ ์ ์คํํฌ ์๋ฒฝ ๊ฐ์ด๋ ์ฑ ๊ณผ ์ธํ๋ฐ์ ์คํํฌ ๋จธ์ ๋ฌ๋ ์๋ฒฝ ๊ฐ์ด๋ ๊ฐ์๋ก ๊ณต๋ถํ ํ ๋ฐฐ์ด ๋ด์ฉ์ ์ ๋ง์ ๋ฐฉ์์ผ๋ก ์ฌ๊ตฌ์ฑํ ๊ฒ์์ ์๋ฆฝ๋๋ค. ํนํ, ์ฐธ๊ณ ํ ์ธํ๋ฐ ๊ฐ์์ ๊ฐ์ ์๋ฃ๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉํ์ง ์์์์ ํํ ์๋ฆฝ๋๋ค!
์ด๋ฒ ํฌ์คํ ์์๋ ๋ฌธ์์ด ๋๋ Object ํ์ ์ ๊ฐ์ ์ซ์๋ก '์ธ์ฝ๋ฉ' ํด์ฃผ๋ ๊ธฐ๋ฒ๋ค์ธ ๋ ์ด๋ธ ์ธ์ฝ๋ฉ(Label Encoding)๊ณผ ์-ํซ ์ธ์ฝ๋ฉ(One-Hot Encoding)์ ์คํํฌ๋ก ์ํํ๋ ๋ฐฉ๋ฒ์ ๋ํด์ ์์๋ณด๋๋ก ํ์. ๊ทธ๋ฆฌ๊ณ ์ค์ํ ํํ์ ๊ฐ๋ค์ ์ค์ผ์ผ๋ง ํด์ฃผ๋ ๋ฐฉ๋ฒ์ ๋ํด์๋ ๋ฐฐ์๋ณด์.
1. ๋ฌธ์์ด์ ์ธ๋ฑ์ค(์ซ์)๋ก, StringToIndexer
์ฌ์ดํท๋ฐ์ ์ต์ํ ๋ถ๋ค์ ๋ ์ด๋ธ ์ธ์ฝ๋ฉ์ ์ํํ๋ค๊ณ ํ๋ฉด LabelEncoder
ํด๋์ค๋ OrdinalEncoder
ํด๋์ค๋ฅผ ๋ ์ฌ๋ฆฌ๊ธฐ ์ฝ๋ค. ๊ทธ๋ฐ๋ฐ ์คํํฌ์์๋ ํด๋น ์ด๋ฆ๊ณผ ๋์ผํ ํด๋์ค๋ ์กด์ฌํ์ง ์๊ณ StringToIndexer
๋ผ๋ ํด๋์ค๊ฐ ์กด์ฌํ๋ค. ์ด๋ฆ ๊ทธ๋๋ก '๋ฌธ์์ด'์ธ String์ '์ซ์'์ธ Index๋ก(to)๋ฅผ ์๋ฏธํ๋ค. ์ฌ์ฉ๋ฒ์ ๊ฐ๋จํ๋ค. ๋ณํ ๋์์ธ ๋ฌธ์์ด ํ์
์ ์นผ๋ผ ์ด๋ฆ๊ณผ ๋ณํ ํ ๋ง๋ค์ด๋ผ ์นผ๋ผ ์ด๋ฆ์ ์ธ์๋ก ์ง์ ํ ํ, fit()
, transform()
๋ฉ์๋๋ฅผ ์ฌ์ฉํด ์๋์ฒ๋ผ ๋ณํํ ์ ์๋ค.
from pyspark.ml.feature import StringIndexer
titanic_df = spark.read.csv('/FileStore/tables/train.csv', header=True, inferSchema=True)
# fit ํ๋ฉด Model ๊ฐ์ฒด๊ฐ ๋ฐํ๋จ. ๊ทธ๋ฆฌ๊ณ transform์ ์ํํด์ ์ค์ง์ ์ธ ๋ณํ ์ํ
indexer = StringIndexer(inputCol='Sex', outputCol='Sex_enc')
encoding_df = indexer.fit(titanic_df) \
.transform(titanic_df)
encoding_df.limit(5).show()
๋ค์์ ๋ถ๊ฐ์ ์ผ๋ก ์์๋๋ฉด ์ข์ ํด๋์ค์ธ๋ฐ, ๋ฐฉ๊ธ ๋ ์ด๋ธ ์ธ์ฝ๋ฉ ์ํจ ๊ฐ์ ๋ค์ ์ญ์ผ๋ก ๋ฐ๊พธ์ด์ค ์ ์๋ ํด๋์ค์ด๋ค. ์ฆ, '์ซ์' ์์ ๋ค์ '๋ฌธ์์ด'๋ก ๋ฐ๊ฟ ์ ์๋ค. IndexToString
ํด๋์ค๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋๋ฐ, ์ ๊ธฐํ ์ ์ IndexToString
์ด๋ผ๋ ํด๋์ค๋ฅผ ๋ณ๋๋ก ์ํฌํธํด์ ์ฌ์ฉํ๋ ๊ฒ์ธ๋ฐ, ์ด์ ์ StringToIndexer
ํ ๋ ์ํํ๋ '๋ฌธ์์ด' - '์ซ์' ๊ฐ์ ์์์ ์ฐพ์์ ๊ฐ์ ๋ณต๊ตฌ ์์ผ์ค๋ค.
from pyspark.ml.feature import IndexToString
stringer = IndexToString(inputCol='Sex_enc', outputCol='Sex_dec')
decoding_df = stringer.transform(encoding_df)
decoding_df.limit(3).show()
๋ค์์ ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ๋ค์ ํ ๋ฒ์ ๋ ์ด๋ธ ์ธ์ฝ๋ฉ์ํฌ ์๋ ์๋ค. ์ฐจ์ด์ ์ด๋ผ๊ณ ํ๋ค๋ฉด ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ ๋ฃ์ด์ค ๋๋ ๋ฆฌ์คํธ([]) ํํ๋ก ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ด๋ฆ์ ์๋์ฒ๋ผ ๋ฃ์ด์ฃผ๋ฉด ๋๋ค. ์ฐธ๊ณ ๋ก ์ธ์ ์ด๋ฆ(inputCols
, outputCols
)์ ๋ณด๋ฉด ๋จ์ผ ์นผ๋ผ์ผ ๋๋ ๋์ '-s'๊ฐ ๋ถ์ง ์์ง๋ง ๋ณต์ ์นผ๋ผ์ผ ๋๋ ๋์ '-s'๊ฐ ๋ถ์ ์ํ๋ก ์ง์ ํด์ฃผ์ด์ผ ํ๋ค.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=['Sex', 'Embarked'], outputCols=['Sex_enc', 'Embarked_enc'])
encoding_df2 = indexer.fit(titanic_df) \
.transform(titanic_df)
encoding_df2.limit(3).show()
ํ์ง๋ง ๋ค์ ๊ฐ์ ๋ณต๊ตฌ์ํค๋ IndexToString
๋ ๋ค๋ฅด๊ฒ ํ๋ฒ์ ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ ๋ณต๊ตฌ์ํฌ ์๋ ์๋ค. ์ด ์ ์ด ์ฝ๊ฐ ์์ฌ์ด๋ฐ ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ ๋ณต๊ตฌ์ํค๋ ค๋ฉด ์ผ์ผ์ด ํ๋์ฉ ์๋์ฒ๋ผ ํด์ฃผ์ด์ผ ํ๋ค.
from pyspark.ml.feature import IndexToString
sex_stringer = IndexToString(inputCol='Sex_enc', outputCol='Sex_dec')
embarked_stringer = IndexToString(inputCol='Embarked_enc', outputCol='Embarked_dec')
decoding_df2 = sex_stringer.transform(encoding_df2)
decoding_df2 = embarked_stringer.transform(decoding_df2)
decoding_df2.limit(5).show()
2. ๋ฌธ์์ด์ One-Hot ๋ฒกํฐ๋ก ๋ณํ์ํค์, OneHotEncoder
๋ฌธ์์ด์ ๊ฐ์ ์-ํซ ๋ฒกํฐ๋ก ๋ณํ์ํฌ ๋ ์ฌ์ฉํ๋ ํด๋์ค๋ ์คํํฌ์์๋ ์ฌ์ดํท๋ฐ๊ณผ ๋๊ฐ์ด OneHotEncoder
ํด๋์ค๋ฅผ ์ฌ์ฉํ๋ค. ํ์ง๋ง ์ฌ์ดํท๋ฐ๊ณผ ๋ค๋ฅด๊ฒ ์ฐจ์ด์ ์ด ์กด์ฌํ๋ค. ์ฌ์ดํท๋ฐ์ OneHotEncoder
๋ฅผ ์ฌ์ฉํ ๋ ๋ฌธ์์ด ํํ์ ์นผ๋ผ์ ๊ทธ๋๋ก ๋ฃ์ด์ฃผ๋ฉด ์์์ ๋ด๋ถ์ ์ผ๋ก ๋ ์ด๋ธ ์ธ์ฝ๋ฉ์ ํธ์ถํด ์ํํ ํ, ์-ํซ ๋ฒกํฐ๋ก ๋ณํ์์ผ์ค๋ค.
ํ์ง๋ง ์คํํฌ์ OneHotEncoder
๋ ๊ทธ๋ ๊ฒ ๋์ํ์ง ์๋๋ค. ๊ทธ๋์ ๋ฐ๋์ ์ง์ ๋ฌธ์์ด์์ ์ซ์๋ก ๋ณํํด์ฃผ๋ ๋ ์ด๋ธ ์ธ์ฝ๋ฉ ์์
์ ์ฌ์ ์ ์ํํด์ฃผ์ด์ผ ํ๋ค. ์๋ ์ฝ๋๋ฅผ ๋ณด๋ฉด์ ์ดํดํด๋ณด๋๋ก ํ์.
# ์-ํซ ์ธ์ฝ๋ฉ ์ํํ๋ ค๋จผ ๋จผ์ ๋ ์ด๋ธ์ธ์ฝ๋ฉ์ ์ํํด์ผ ํจ!
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
indexer = StringIndexer(inputCols=['Sex', 'Embarked'], outputCols=['Sex_label', 'Embarked_label'])
label_df = indexer.fit(titanic_df).transform(titanic_df)
ohe_encoder = OneHotEncoder(inputCols=['Sex_label', 'Embarked_label'], outputCols=['Sex_ohe', 'Embarked_ohe'], dropLast=True)
encoding_df = ohe_encoder.fit(label_df).transform(label_df)
encoding_df.limit(5).show()
์์์ ํท๊ฐ๋ฆฌ์ง ๋ง์์ผ ํ ์ ์ OneHotEncoder
ํด๋์ค๋ฅผ ์ฌ์ฉํ ๋ dropLast
๋ผ๋ ์ธ์์ด๋ค. dropLast
๋ฅผ True
๋ก ์ค์ ํ๋ฉด, ์-ํซ ๋ฒกํฐ๋ก ๋ณํ ์ ๋ง์ง๋ง ๋ฒกํฐ๋ฅผ ๋๋์ ํ๊ฒ ๋๋ค. ์ด๊ฒ ๋ฌด์จ ๋ง์ผ๊น? ์ฐ์ ์๋์ ๊ฐ์ ๋ฒ์ฃผํ ๋ณ์๊ฐ ์๋ค๊ณ ๊ฐ์ ํด๋ณด์.
์ ๊ทธ๋ฆผ์ฒ๋ผ $x$ ๋ณ์๋ A, B, C ์ด 3์ข
๋ฅ์ ๊ฐ์ ๊ฐ๋๋ค. ์๋๋๋ก๋ผ๋ฉด ๊ธธ์ด๊ฐ 3์ธ ์-ํซ ๋ฒกํฐ๋ก ๋ํ๋ผ ์ ์๋ค(drop_last=False
์ธ ๊ฒฝ์ฐ์ ๊ทธ๋ฆผ) ํ์ง๋ง ๊ธธ์ด๊ฐ 2์ธ ์-ํซ ๋ฒกํฐ๋ก 3์ข
๋ฅ์ ๊ฐ์ ๋ํ๋ผ ์๋ ์๋ค. ์ ์๊ฐํด๋ณด๋ฉด ๋ชจ๋ ์์๊ฐ 0์ผ ๋๋ ํ๋์ ์ข
๋ฅ๊ฐ์ ๋ํ๋ผ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค. ๊ทธ๋์ drop_last=True
์ธ ๊ฒฝ์ฐ์ ๊ทธ๋ฆผ์ ๋ณด๋ฉด ๊ธธ์ด๊ฐ 2์ธ ๋ฒกํฐ๋ก A, B, C ์ด 3๊ฐ์ ๊ฐ์ ๋ํ๋ผ ์ ์๋ ๊ฒ์ด๋ค.
3. ์์นํ ๋ณ์๋ฅผ ์ค์ผ์ผ๋งํ๊ธฐ!
์ด๋ฒ์ Standard Scaling, Min-Max Scaling ๋ฑ ๋ค์ํ ์ค์ผ์ผ๋ง ๊ธฐ๋ฒ์ ํ์ฉํด์ ์คํํฌ์์ ์์นํ ๊ฐ๋ค์ ์ค์ผ์ผ๋ง ํ๋ ๋ฐฉ๋ฒ์ ๋ํด์ ์์๋ณด๋๋ก ํ์. ์คํํฌ์์๋ ํน์ดํ๊ฒ ์ค์ผ์ผ๋ง์ ์ ์ฉํ๋ ์นผ๋ผ์ ๋ฐ๋์ Feature Vectorization๋ ์นผ๋ผ์ด์ด์ผ ํ๋ค. Feature Vectorization์ด ๋ฌด์์ธ์ง์ ๋ํด์๋ ์์ ์คํํฌ ํฌ์คํ ์์ ๋ฐฐ์ด ์ ์ด ์๋ค.
๋ฐ๋ผ์ ์ค์ผ์ผ๋ง์ ์ ์ฉํ๊ธฐ ์ ์ ๋ฐ๋์ VectorAssembler
๋ฅผ ํ์ฉํด Feature Vectorization ์ํจ ํ์ ์ ์ฉํด์ผ ํ๋ค. ๋ถ๊ฝ ๋ฐ์ดํฐ๋ฅผ ํ์ฉํด์ ์์๋ณด์.
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
iris = load_iris()
iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
iris_df = pd.DataFrame(iris.data, columns=iris_columns)
iris_df['label'] = iris.target
iris_sdf = spark.createDataFrame(iris_df)
vec_assembler = VectorAssembler(inputCols=iris_sdf.columns[:-1], outputCol='feature')
iris_vec = vec_assembler.transform(iris_sdf)
scaler = StandardScaler(inputCol='feature', outputCol='feature_scaled', withMean=True, withStd=True) # True๋ก ํ๋ฉด ํ๊ท ์ 0, std=1๋ก 'ํ์ค์ ๊ท๋ถํฌ'
iris_scaled = scaler.fit(iris_vec).transform(iris_vec)
iris_scaled.limit(5).show()
์ฐธ๊ณ ๋ก StandardScaler๋ฅผ ์ฌ์ฉํ ๋, withMean ์ด๋ผ๋ ์ธ์๊ฐ ๊ธฐ๋ณธ์ ์ผ๋ก False๋ก ์ค์ ๋๋ค. withMean, withStd ์ธ์๋ฅผ True๋ก ์ค์ ํ๋ค๋ฉด, ํ๊ท ์ด 0, ํ์คํธ์ฐจ๊ฐ 1์ธ ์ฆ, ํ์ค์ ๊ท๋ถํฌ ํํ๋ก ๊ฐ์ ์ค์ผ์ผ๋งํด์ค์ ์๋ฏธํ๋ค. ๋ง์ฝ False๋ก ์ค์ ํ๋ค๋ฉด ์ค์ผ์ผ๋ง์ ์ ์ฉํ ์์นํ ๊ฐ๋ค์ ์ค์ ํ๊ท ๊ฐ์ผ๋ก ์ค์ ๋์ด ์ ๊ทํ๊ฐ ์ํ๋๋ค๋ ์ ๋ ์์๋์.
'Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[PySpark] Spark๋ก ๊ต์ฐจ๊ฒ์ฆ, ํ๋ผ๋ฏธํฐ ํ๋์ ํด๋ณด์! (0) | 2022.03.06 |
---|---|
[PySpark] Spark์ Pipeline์ผ๋ก ๋ถ๋ฅ ๋ชจ๋ธ์ ๋ง๋ค์ด๋ณด์! (0) | 2022.02.20 |
[PySpark] Spark์ Dataframe API๋ฅผ ์์๋ณด์!(2) (0) | 2022.02.05 |
[PySpark] Spark์ Dataframe API๋ฅผ ์์๋ณด์!(1) (4) | 2022.02.03 |
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ - Ingestion&Transformation(Event Streaming) (0) | 2021.04.25 |