๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] Spark๋กœ Encoding ๊ณผ Scaling ์„ ์ˆ˜ํ–‰ํ•ด๋ณด์ž!

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ํ•ด๋‹น ํฌ์ŠคํŒ…์€ ์ŠคํŒŒํฌ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ์ฑ…๊ณผ ์ธํ”„๋Ÿฐ์˜ ์ŠคํŒŒํฌ ๋จธ์‹ ๋Ÿฌ๋‹ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ๊ฐ•์˜๋กœ ๊ณต๋ถ€ํ•œ ํ›„ ๋ฐฐ์šด ๋‚ด์šฉ์„ ์ €๋งŒ์˜ ๋ฐฉ์‹์œผ๋กœ ์žฌ๊ตฌ์„ฑํ•œ ๊ฒƒ์ž„์„ ์•Œ๋ฆฝ๋‹ˆ๋‹ค. ํŠนํžˆ, ์ฐธ๊ณ ํ•œ ์ธํ”„๋Ÿฐ ๊ฐ•์˜์˜ ๊ฐ•์˜ ์ž๋ฃŒ๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•˜์Œ์„ ํ•„ํžˆ ์•Œ๋ฆฝ๋‹ˆ๋‹ค!

Apache Spark๋ฅผ Python์œผ๋กœ ์ด์šฉํ•˜๋Š” PySpark์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž

 


์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” ๋ฌธ์ž์—ด ๋˜๋Š” Object ํƒ€์ž…์˜ ๊ฐ’์„ ์ˆซ์ž๋กœ '์ธ์ฝ”๋”ฉ' ํ•ด์ฃผ๋Š” ๊ธฐ๋ฒ•๋“ค์ธ ๋ ˆ์ด๋ธ” ์ธ์ฝ”๋”ฉ(Label Encoding)๊ณผ ์›-ํ•ซ ์ธ์ฝ”๋”ฉ(One-Hot Encoding)์„ ์ŠคํŒŒํฌ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด์„œ ์•Œ์•„๋ณด๋„๋ก ํ•˜์ž. ๊ทธ๋ฆฌ๊ณ  ์‹ค์ˆ˜ํ˜• ํ˜•ํƒœ์˜ ๊ฐ’๋“ค์„ ์Šค์ผ€์ผ๋ง ํ•ด์ฃผ๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด์„œ๋„ ๋ฐฐ์›Œ๋ณด์ž.

1. ๋ฌธ์ž์—ด์„ ์ธ๋ฑ์Šค(์ˆซ์ž)๋กœ, StringToIndexer

์‚ฌ์ดํ‚ท๋Ÿฐ์— ์ต์ˆ™ํ•œ ๋ถ„๋“ค์€ ๋ ˆ์ด๋ธ” ์ธ์ฝ”๋”ฉ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค๊ณ  ํ•˜๋ฉด LabelEncoder ํด๋ž˜์Šค๋‚˜ OrdinalEncoder ํด๋ž˜์Šค๋ฅผ ๋– ์˜ฌ๋ฆฌ๊ธฐ ์‰ฝ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ์ŠคํŒŒํฌ์—์„œ๋Š” ํ•ด๋‹น ์ด๋ฆ„๊ณผ ๋™์ผํ•œ ํด๋ž˜์Šค๋Š” ์กด์žฌํ•˜์ง€ ์•Š๊ณ  StringToIndexer ๋ผ๋Š” ํด๋ž˜์Šค๊ฐ€ ์กด์žฌํ•œ๋‹ค. ์ด๋ฆ„ ๊ทธ๋Œ€๋กœ '๋ฌธ์ž์—ด'์ธ String์„ '์ˆซ์ž'์ธ Index๋กœ(to)๋ฅผ ์˜๋ฏธํ•œ๋‹ค. ์‚ฌ์šฉ๋ฒ•์€ ๊ฐ„๋‹จํ•˜๋‹ค. ๋ณ€ํ™˜ ๋Œ€์ƒ์ธ ๋ฌธ์ž์—ด ํƒ€์ž…์˜ ์นผ๋Ÿผ ์ด๋ฆ„๊ณผ ๋ณ€ํ™˜ ํ›„ ๋งŒ๋“ค์–ด๋‚ผ ์นผ๋Ÿผ ์ด๋ฆ„์„ ์ธ์ž๋กœ ์ง€์ •ํ•œ ํ›„, fit(), transform() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด ์•„๋ž˜์ฒ˜๋Ÿผ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

from pyspark.ml.feature import StringIndexer

titanic_df = spark.read.csv('/FileStore/tables/train.csv', header=True, inferSchema=True)

# fit ํ•˜๋ฉด Model ๊ฐ์ฒด๊ฐ€ ๋ฐ˜ํ™˜๋จ. ๊ทธ๋ฆฌ๊ณ  transform์„ ์ˆ˜ํ–‰ํ•ด์„œ ์‹ค์งˆ์ ์ธ ๋ณ€ํ™˜ ์ˆ˜ํ–‰
indexer = StringIndexer(inputCol='Sex', outputCol='Sex_enc')
encoding_df = indexer.fit(titanic_df) \
                     .transform(titanic_df)
encoding_df.limit(5).show()

 

๋‹ค์Œ์€ ๋ถ€๊ฐ€์ ์œผ๋กœ ์•Œ์•„๋‘๋ฉด ์ข‹์„ ํด๋ž˜์Šค์ธ๋ฐ, ๋ฐฉ๊ธˆ ๋ ˆ์ด๋ธ” ์ธ์ฝ”๋”ฉ ์‹œํ‚จ ๊ฐ’์„ ๋‹ค์‹œ ์—ญ์œผ๋กœ ๋ฐ”๊พธ์–ด์ค„ ์ˆ˜ ์žˆ๋Š” ํด๋ž˜์Šค์ด๋‹ค. ์ฆ‰, '์ˆซ์ž' ์—์„œ ๋‹ค์‹œ '๋ฌธ์ž์—ด'๋กœ ๋ฐ”๊ฟ€ ์ˆ˜ ์žˆ๋‹ค. IndexToString ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋˜๋Š”๋ฐ, ์‹ ๊ธฐํ•œ ์ ์€ IndexToString ์ด๋ผ๋Š” ํด๋ž˜์Šค๋ฅผ ๋ณ„๋„๋กœ ์ž„ํฌํŠธํ•ด์„œ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ธ๋ฐ, ์ด์ „์— StringToIndexer ํ•  ๋•Œ ์ˆ˜ํ–‰ํ–ˆ๋˜ '๋ฌธ์ž์—ด' - '์ˆซ์ž' ๊ฐ’์„ ์•Œ์•„์„œ ์ฐพ์•„์„œ ๊ฐ’์„ ๋ณต๊ตฌ ์‹œ์ผœ์ค€๋‹ค.

 

from pyspark.ml.feature import IndexToString

stringer = IndexToString(inputCol='Sex_enc', outputCol='Sex_dec')
decoding_df = stringer.transform(encoding_df)
decoding_df.limit(3).show()

 

๋‹ค์Œ์€ ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ๋“ค์„ ํ•œ ๋ฒˆ์— ๋ ˆ์ด๋ธ” ์ธ์ฝ”๋”ฉ์‹œํ‚ฌ ์ˆ˜๋„ ์žˆ๋‹ค. ์ฐจ์ด์ ์ด๋ผ๊ณ  ํ•œ๋‹ค๋ฉด ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ์„ ๋„ฃ์–ด์ค„ ๋•Œ๋Š” ๋ฆฌ์ŠคํŠธ([]) ํ˜•ํƒœ๋กœ ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ์ด๋ฆ„์„ ์•„๋ž˜์ฒ˜๋Ÿผ ๋„ฃ์–ด์ฃผ๋ฉด ๋œ๋‹ค. ์ฐธ๊ณ ๋กœ ์ธ์ž ์ด๋ฆ„(inputCols, outputCols)์„ ๋ณด๋ฉด ๋‹จ์ผ ์นผ๋Ÿผ์ผ ๋•Œ๋Š” ๋์— '-s'๊ฐ€ ๋ถ™์ง€ ์•Š์ง€๋งŒ ๋ณต์ˆ˜ ์นผ๋Ÿผ์ผ ๋•Œ๋Š” ๋์— '-s'๊ฐ€ ๋ถ™์€ ์ƒํƒœ๋กœ ์ง€์ •ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=['Sex', 'Embarked'], outputCols=['Sex_enc', 'Embarked_enc'])
encoding_df2 = indexer.fit(titanic_df) \
                      .transform(titanic_df)
encoding_df2.limit(3).show()

 

ํ•˜์ง€๋งŒ ๋‹ค์‹œ ๊ฐ’์„ ๋ณต๊ตฌ์‹œํ‚ค๋Š” IndexToString๋Š” ๋‹ค๋ฅด๊ฒŒ ํ•œ๋ฒˆ์— ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ์„ ๋ณต๊ตฌ์‹œํ‚ฌ ์ˆ˜๋Š” ์—†๋‹ค. ์ด ์ ์ด ์•ฝ๊ฐ„ ์•„์‰ฌ์šด๋ฐ ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ์„ ๋ณต๊ตฌ์‹œํ‚ค๋ ค๋ฉด ์ผ์ผ์ด ํ•˜๋‚˜์”ฉ ์•„๋ž˜์ฒ˜๋Ÿผ ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

from pyspark.ml.feature import IndexToString

sex_stringer = IndexToString(inputCol='Sex_enc', outputCol='Sex_dec')
embarked_stringer = IndexToString(inputCol='Embarked_enc', outputCol='Embarked_dec')

decoding_df2 = sex_stringer.transform(encoding_df2)
decoding_df2 = embarked_stringer.transform(decoding_df2)
decoding_df2.limit(5).show()

2. ๋ฌธ์ž์—ด์„ One-Hot ๋ฒกํ„ฐ๋กœ ๋ณ€ํ™˜์‹œํ‚ค์ž, OneHotEncoder

๋ฌธ์ž์—ด์˜ ๊ฐ’์„ ์›-ํ•ซ ๋ฒกํ„ฐ๋กœ ๋ณ€ํ™˜์‹œํ‚ฌ ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ํด๋ž˜์Šค๋Š” ์ŠคํŒŒํฌ์—์„œ๋„ ์‚ฌ์ดํ‚ท๋Ÿฐ๊ณผ ๋˜‘๊ฐ™์ด OneHotEncoder ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. ํ•˜์ง€๋งŒ ์‚ฌ์ดํ‚ท๋Ÿฐ๊ณผ ๋‹ค๋ฅด๊ฒŒ ์ฐจ์ด์ ์ด ์กด์žฌํ•œ๋‹ค. ์‚ฌ์ดํ‚ท๋Ÿฐ์€ OneHotEncoder๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ ๋ฌธ์ž์—ด ํ˜•ํƒœ์˜ ์นผ๋Ÿผ์„ ๊ทธ๋Œ€๋กœ ๋„ฃ์–ด์ฃผ๋ฉด ์•Œ์•„์„œ ๋‚ด๋ถ€์ ์œผ๋กœ ๋ ˆ์ด๋ธ” ์ธ์ฝ”๋”ฉ์„ ํ˜ธ์ถœํ•ด ์ˆ˜ํ–‰ํ•œ ํ›„, ์›-ํ•ซ ๋ฒกํ„ฐ๋กœ ๋ณ€ํ™˜์‹œ์ผœ์ค€๋‹ค.

 

ํ•˜์ง€๋งŒ ์ŠคํŒŒํฌ์˜ OneHotEncoder๋Š” ๊ทธ๋ ‡๊ฒŒ ๋™์ž‘ํ•˜์ง€ ์•Š๋Š”๋‹ค. ๊ทธ๋ž˜์„œ ๋ฐ˜๋“œ์‹œ ์ง์ ‘ ๋ฌธ์ž์—ด์—์„œ ์ˆซ์ž๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๋Š” ๋ ˆ์ด๋ธ” ์ธ์ฝ”๋”ฉ ์ž‘์—…์„ ์‚ฌ์ „์— ์ˆ˜ํ–‰ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค. ์•„๋ž˜ ์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด์„œ ์ดํ•ดํ•ด๋ณด๋„๋ก ํ•˜์ž.

 

# ์›-ํ•ซ ์ธ์ฝ”๋”ฉ ์ˆ˜ํ–‰ํ•˜๋ ค๋จผ ๋จผ์ € ๋ ˆ์ด๋ธ”์ธ์ฝ”๋”ฉ์„ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•จ!
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

indexer = StringIndexer(inputCols=['Sex', 'Embarked'], outputCols=['Sex_label', 'Embarked_label'])
label_df = indexer.fit(titanic_df).transform(titanic_df)

ohe_encoder = OneHotEncoder(inputCols=['Sex_label', 'Embarked_label'], outputCols=['Sex_ohe', 'Embarked_ohe'], dropLast=True)
encoding_df = ohe_encoder.fit(label_df).transform(label_df)

encoding_df.limit(5).show()

 

์œ„์—์„œ ํ—ท๊ฐˆ๋ฆฌ์ง€ ๋ง์•„์•ผ ํ•  ์ ์€ OneHotEncoder ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ dropLast ๋ผ๋Š” ์ธ์ž์ด๋‹ค. dropLast ๋ฅผ True๋กœ ์„ค์ •ํ•˜๋ฉด, ์›-ํ•ซ ๋ฒกํ„ฐ๋กœ ๋ณ€ํ™˜ ์‹œ ๋งˆ์ง€๋ง‰ ๋ฒกํ„ฐ๋ฅผ ๋“œ๋ž์„ ํ•˜๊ฒŒ ๋œ๋‹ค. ์ด๊ฒŒ ๋ฌด์Šจ ๋ง์ผ๊นŒ? ์šฐ์„  ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฒ”์ฃผํ˜• ๋ณ€์ˆ˜๊ฐ€ ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ด๋ณด์ž.

 

3์ข…๋ฅ˜์˜ ๋ฒ”์ฃผํ˜• ๊ฐ’์ด ์žˆ๋‹ค

์œ„ ๊ทธ๋ฆผ์ฒ˜๋Ÿผ $x$ ๋ณ€์ˆ˜๋Š” A, B, C ์ด 3์ข…๋ฅ˜์˜ ๊ฐ’์„ ๊ฐ–๋Š”๋‹ค. ์›๋ž˜๋Œ€๋กœ๋ผ๋ฉด ๊ธธ์ด๊ฐ€ 3์ธ ์›-ํ•ซ ๋ฒกํ„ฐ๋กœ ๋‚˜ํƒ€๋‚ผ ์ˆ˜ ์žˆ๋‹ค(drop_last=False์ธ ๊ฒฝ์šฐ์˜ ๊ทธ๋ฆผ) ํ•˜์ง€๋งŒ ๊ธธ์ด๊ฐ€ 2์ธ ์›-ํ•ซ ๋ฒกํ„ฐ๋กœ 3์ข…๋ฅ˜์˜ ๊ฐ’์„ ๋‚˜ํƒ€๋‚ผ ์ˆ˜๋„ ์žˆ๋‹ค. ์ž˜ ์ƒ๊ฐํ•ด๋ณด๋ฉด ๋ชจ๋“  ์›์†Œ๊ฐ€ 0์ผ ๋•Œ๋„ ํ•˜๋‚˜์˜ ์ข…๋ฅ˜๊ฐ’์„ ๋‚˜ํƒ€๋‚ผ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ๊ทธ๋ž˜์„œ drop_last=True์ธ ๊ฒฝ์šฐ์˜ ๊ทธ๋ฆผ์„ ๋ณด๋ฉด ๊ธธ์ด๊ฐ€ 2์ธ ๋ฒกํ„ฐ๋กœ A, B, C ์ด 3๊ฐœ์˜ ๊ฐ’์„ ๋‚˜ํƒ€๋‚ผ ์ˆ˜ ์žˆ๋Š” ๊ฒƒ์ด๋‹ค.

3. ์ˆ˜์น˜ํ˜• ๋ณ€์ˆ˜๋ฅผ ์Šค์ผ€์ผ๋งํ•˜๊ธฐ!

์ด๋ฒˆ์—” Standard Scaling, Min-Max Scaling ๋“ฑ ๋‹ค์–‘ํ•œ ์Šค์ผ€์ผ๋ง ๊ธฐ๋ฒ•์„ ํ™œ์šฉํ•ด์„œ ์ŠคํŒŒํฌ์—์„œ ์ˆ˜์น˜ํ˜• ๊ฐ’๋“ค์„ ์Šค์ผ€์ผ๋ง ํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด์„œ ์•Œ์•„๋ณด๋„๋ก ํ•˜์ž. ์ŠคํŒŒํฌ์—์„œ๋Š” ํŠน์ดํ•˜๊ฒŒ ์Šค์ผ€์ผ๋ง์„ ์ ์šฉํ•˜๋Š” ์นผ๋Ÿผ์€ ๋ฐ˜๋“œ์‹œ Feature Vectorization๋œ ์นผ๋Ÿผ์ด์–ด์•ผ ํ•œ๋‹ค. Feature Vectorization์ด ๋ฌด์—‡์ธ์ง€์— ๋Œ€ํ•ด์„œ๋Š” ์˜ˆ์ „ ์ŠคํŒŒํฌ ํฌ์ŠคํŒ…์—์„œ ๋ฐฐ์šด ์ ์ด ์žˆ๋‹ค.

 

๋”ฐ๋ผ์„œ ์Šค์ผ€์ผ๋ง์„ ์ ์šฉํ•˜๊ธฐ ์ „์— ๋ฐ˜๋“œ์‹œ VectorAssembler ๋ฅผ ํ™œ์šฉํ•ด Feature Vectorization ์‹œํ‚จ ํ›„์— ์ ์šฉํ•ด์•ผ ํ•œ๋‹ค. ๋ถ“๊ฝƒ ๋ฐ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ•ด์„œ ์•Œ์•„๋ณด์ž.

 

from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

iris = load_iris()
iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
iris_df = pd.DataFrame(iris.data, columns=iris_columns)
iris_df['label'] = iris.target

iris_sdf = spark.createDataFrame(iris_df)

vec_assembler = VectorAssembler(inputCols=iris_sdf.columns[:-1], outputCol='feature')
iris_vec = vec_assembler.transform(iris_sdf)

scaler = StandardScaler(inputCol='feature', outputCol='feature_scaled', withMean=True, withStd=True)  # True๋กœ ํ•˜๋ฉด ํ‰๊ท ์€ 0, std=1๋กœ 'ํ‘œ์ค€์ •๊ทœ๋ถ„ํฌ'
iris_scaled = scaler.fit(iris_vec).transform(iris_vec)
iris_scaled.limit(5).show()

 

์ฐธ๊ณ ๋กœ StandardScaler๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ, withMean ์ด๋ผ๋Š” ์ธ์ž๊ฐ€ ๊ธฐ๋ณธ์ ์œผ๋กœ False๋กœ ์„ค์ •๋œ๋‹ค. withMean, withStd ์ธ์ž๋ฅผ True๋กœ ์„ค์ •ํ•œ๋‹ค๋ฉด, ํ‰๊ท ์ด 0, ํ‘œ์ค€ํŽธ์ฐจ๊ฐ€ 1์ธ ์ฆ‰, ํ‘œ์ค€์ •๊ทœ๋ถ„ํฌ ํ˜•ํƒœ๋กœ ๊ฐ’์„ ์Šค์ผ€์ผ๋งํ•ด์คŒ์„ ์˜๋ฏธํ•œ๋‹ค. ๋งŒ์•ฝ False๋กœ ์„ค์ •ํ•œ๋‹ค๋ฉด ์Šค์ผ€์ผ๋ง์„ ์ ์šฉํ•  ์ˆ˜์น˜ํ˜• ๊ฐ’๋“ค์˜ ์‹ค์ œ ํ‰๊ท ๊ฐ’์œผ๋กœ ์„ค์ •๋˜์–ด ์ •๊ทœํ™”๊ฐ€ ์ˆ˜ํ–‰๋œ๋‹ค๋Š” ์ ๋„ ์•Œ์•„๋‘์ž.

๋ฐ˜์‘ํ˜•