๐ ๋ณธ ํฌ์คํ ์ PySpark๋ฅผ ํ์ฉํ Kaggle Notebook์ ํ์ฌํ๋ฉด์ ๋ฐฐ์ฐ๊ฒ ๋ ์ฝ๋ ๋ด์ฉ์ ๊ธฐ๋ฐ์ผ๋ก ํฌ์คํ ํ์์ ์๋ ค๋๋ฆฝ๋๋ค. ๋ํ ์์ผ๋ก ์๊ฐ๋ PySpark์ ๋ฌธ๋ฒ์ ๋ํด์ ์์ํ์๋ค๋ฉด ์ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํด ๊ฐ๋จํ ์์๋ฅผ ํตํด ์ดํด๋ฅผ ํ ์ ์์ต๋๋ค.
์ด๋ฒ ํฌ์คํ ์์๋ PySpark ๊ทธ ์ค์์๋ Spark SQL ๊ณผ Spark MLlib์ ํ์ฉํ ๋จธ์ ๋ฌ๋ ๋ถ๋ฅ ๋ชจ๋ธ์ ๋ง๋๋ ๋ฐฉ๋ฒ์ ๋ํด ์๊ฐํ๋ ค๊ณ ํ๋ค. ํ์ฉํ ๋ฐ์ดํฐ๋ ๋จธ์ ๋ฌ๋์ ์ ๋ฌธํ ๋ ๊ฐ์ฅ ๋ง์ด ์ฌ์ฉ๋๋ ํ์ดํ๋์ Train ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ๋ค.
1. ํ์ํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๋ฐ์ดํฐ ๋ก๋
๊ฐ์ฅ ๋จผ์ ํ ์ผ์ ๋ฐ์ดํฐ ๋ก๋์ ๋ชจ๋ธ ๋น๋ฉ ์ ์ฌ์ฉ๋ Spark SQL, Spark MLlib์ ์ฌ๋ฌ๊ฐ์ง ๋ฉ์๋๋ค์ ์ํฌํธํ์. ์ฐธ๊ณ ๋ก PySpark๋ฅผ ์ฌ์ฉํด ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ ๋๋ os.chdir()
๋ก ๋๋ ํ ๋ฆฌ๋ฅผ ๋ณ๊ฒฝํ๊ณ ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ ์๊ฐ ์๋ค๋ ์ ์ ์์๋์.(์ด์ ๋ฅผ ๊ตฌ๊ธ๋งํด์ ์ฐพ์๋ณด๋ ค ํ๋๋ฐ.. spark os.chdir ๋ฑ๊ณผ ๊ฐ์ ํค์๋๋ก.. ๊ทธ๋ฐ๋ฐ ์ด์ ์ ๋ํด์๋ ์ฐพ์ง ๋ชปํ์ต๋๋ค.. ์ด์ ๋ํด ์๋ ๋ถ์ด ์๋ค๋ฉด ๋๊ธ ๋ฌ์์ฃผ์๋ฉด ๋งค์ฐ ๊ฐ์ฌํ๊ฒ ์ต๋๋ค (__) )
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
plt.rc('font', family='AppleGothic')
plt.rcParams['axes.unicode_minus'] = False
# Pyspark - SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# Pyspark - ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
# Spark์ ์ฒซ ์์์ธ SparkSession์ ๋ง๋ค์ด์ฃผ์!
spark = SparkSession.builder\
.appName('Play with pypsark ML on Titanic Data')\
.getOrCreate()
# ๋ฐ์ดํฐ ๋ก๋
df = spark.read.csv('/Users/younghun/Desktop/gitrepo/data/titanic/train.csv', header=True, inferSchema=True)
# toPandas()๋ฅผ ์ด์ฉํด ํ๋ค์ค์์ ์ ๊ณตํ๋ ๋ฐ์ดํฐํ๋ ์ ํํ๋ก ์ถ๋ ฅ
df.limit(3).toPandas()
df.limit(num=3)
: ํด๋น ๋ฐ์ดํฐํ๋ ์์ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ํ๋ ๋ฐฉ๋ฒ์ด๋ค. Pandas์์๋head()
์ ๊ฐ๋ค๊ณ ์๊ฐํ๋ฉด ๋๋ค. ์ฐจ์ด์ ์ด๋ผ๊ณ ํ๋ค๋ฉด Pandas์์head()
๋ ์๋ฌด๋ฐ ์ธ์๋ฅผ ๋ฃ์ง ์์๋ ๋ํดํธ๊ฐ 5๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ณด์ฌ์ฃผ์ง๋งlimit()
์ ์ด๋ค ์ธ์๋ ๋ฃ์ด์ฃผ์ง ์๋๋ค๋ฉด ์๋ฌ๊ฐ ๋ฐ์ํ๋ค.df.toPandas()
: ๋ฐ์ดํฐํ๋ ์์ Pandas์ ๋ฐ์ดํฐํ๋ ์ ํํ๋ก ๋ณํํด์ค๋ค. ๋ฐ๋ผ์ ์๋ก์ด ๋ณ์๋ก ํ ๋น ์ํค๋ ๊ฒ์ด ๊ฐ๋ฅํ๋ค.
2. Matplotlib, Seaborn์ ํ์ฉํ ๊ฐ๋จํ ์๊ฐํ
Matplotlib, Seaborn ์ ํ์ฉํ๋ ค๋ฉด ์ฐ์ Spark์ ๋ฐ์ดํฐํ๋ ์์์ Pandas์ ๋ฐ์ดํฐํ๋ ์์ผ๋ก ๋ณํํด์ฃผ๊ณ ์๊ฐํ๋ฅผ ๊ตฌํํด์ผ ํ๋ค. ๋ฐ๋ผ์ ๋จผ์ toPandas()
๋ฉ์๋๋ก Pandas ํํ์ ๋ฐ์ดํฐํ๋ ์์ ์๋ก ํ ๋นํด์ฃผ์.
# Pandas ๋ฐ์ดํฐํ๋ ์ ํํ๋ก ์ฐ์ ๋ณํ!
pandas_df = df.toPandas()
print("pandas_df ํ์
:", type(pandas_df))
# Seaborn ์ฌ์ฉํด๋ณด๊ธฐ
plt.figure(figsize=(10, 5))
plt.title("ํ์ดํ๋ ํ์น๊ฐ์ Age KDE ๋ถํฌ")
sns.distplot(pandas_df['Age'])
plt.show()
์ ๊ฒฐ๊ณผํ๋ฉด์์ ๋ณผ ์ ์๋ฏ์ด toPandas()
๋ฉ์๋๋ก ์๋ก ํ ๋นํ ๋ฐ์ดํฐํ๋ ์์ type
์ pandas.dataframe
์์ ํ์ธํ ์ ์๋ค.
3. PySpark๋ก ๊ฒฐ์ธก์น ์ฒดํฌํ๊ณ ๊ธฐ์กด๋ณ์๋ก๋ถํฐ ํ์๋ณ์ ์์ฑํ๊ธฐ
๊ฒฐ์ธก์น๋ฅผ ์ฒดํฌํ๊ธฐ ์ํด ๋ฌผ๋ก Spark์ ๋ฐ์ดํฐํ๋ ์ ํํ๋ฅผ Pandas ํํ์ ๋ฐ์ดํฐํ๋ ์ ํํ๋ก ๋ฐ๊พผ ํ ํ์์ ์์ฃผ ์ฌ์ฉํ๋ Pandas API์์ ์ ๊ณตํ๋ ๊ฒฐ์ธก์น ํ์ธ ๋ฉ์๋๋ฅผ ์ฌ์ฉํด๋ ๋๋ค. ํ์ง๋ง ํ์๊ฐ ์ด๋ฐ ํฌ์คํ ์ ์์ฑํ๋ ๋ชฉ์ ์ PySpark์ ๋ํด ์ต์ํด์ง์ด ๋ชฉ์ ์ด๊ธฐ ๋๋ฌธ์ Spark์ ๋ฐ์ดํฐํ๋ ์ ํํ์์ ๊ฒฐ์ธก์น๋ฅผ ์ฒดํฌํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ณด์. ๊ฒฐ์ธก์น๋ฅผ ์ฒดํฌํ๋ ๋ฐฉ๋ฒ์ ํฌ๊ฒ 2๊ฐ์ง๊ฐ ์๋ค.
isnan()
: pyspark.sql.functions ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ํ๋ค. ์ฌ์ฉ๋ฐฉ๋ฒ์isnan('์ฒดํฌํ column ์ด๋ฆ')
isNull()
: pyspark.sql.Column ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ํ๋ค. ์ฌ์ฉ๋ฐฉ๋ฒ์col('์ฒดํฌํ column ์ด๋ฆ').isNull()
# ์ 2๊ฐ์ง ๋ฐฉ๋ฒ์ ๋์์ ์ฌ์ฉํด๋ณด์!
# ๊ฒฐ์ธก์น๊ฐ ์๋ ๋ณ์๋ฅผ ์ฒดํฌํ๊ณ ๊ฒฐ์ธก์น๊ฐ ๋ช ๊ฐ ์๋์ง ์ดํด๋ณด๊ธฐ
from pyspark.sql.functions import isnan, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])\
.show()
์ ์ฝ๋์์ when()
๋ฉ์๋๊ฐ ๋ฑ์ฅํ๋๋ฐ when()
๋ฉ์๋๋ filter()
๋ฉ์๋์ ๋น์ทํ ๊ธฐ๋ฅ์ ํ๋ค๊ณ ๋ณผ ์ ์๋ค. when(์กฐ๊ฑดA, ์กฐ๊ฑดA๊ฐ True์ผ ์ value).otherwise(์กฐ๊ฑดA๊ฐ False์ผ ์ value)
๋ก ์ฌ์ฉํ๋ค. ์ฐธ๊ณ ๋ก ์กฐ๊ฑด์ ์ฌ๋ฌ๊ฐ ๋ฃ๊ณ ์ถ๋ค๋ฉด ๋
ผ๋ฆฌ์ฐ์ฐ์ |
, &
์ฌ์ฉ์ด ๊ฐ๋ฅํ๋ค. ์ ์ฝ๋์ ๋ํ ๊ฒฐ๊ณผํ๋ฉด์ ๋ค์๊ณผ ๊ฐ๋ค.
์ถ๊ฐ์ ์ผ๋ก ๋ถํ์ํ ์นผ๋ผ์ธ Cabin
์นผ๋ผ์ ์์ ์ฃผ๊ธฐ ์ํด drop()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์๋ค. ์ด๋ Pandas์ ๋ฉ์๋ ์ด๋ฆ์ด ๋์ผํ์ง๋ง inplace = False
์ด๊ธฐ ๋๋ฌธ์ ๋ค์๊ณผ ๊ฐ์ด ์ฌํ ๋น์ ํด์ฃผ์ด์ผ ํ๋ค.
# Cabin ๋ณ์๋ ๊ฒฐ์ธก์น๊ฐ 50%๊ฐ ๋๊ธฐ ๋๋ฌธ์ ํด๋น ๋ณ์๋ฅผ ์ญ์ ํ์
df = df.drop('Cabin')
ํ์น๊ฐ์ ์ด๋ฆ์ ๋ด๊ณ ์๋ Initial
์นผ๋ผ์ ์ด์ฉํด ํ์๋ณ์๋ฅผ ๋ง๋ค์ด๋ณด์. withColumn()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋๋ฐ, ์ฌ๊ธฐ์๋ PySpark์์ ์ ๊ณตํ๋ ์ ๊ทํํ์ API๋ฅผ ํจ๊ป ์ด์ฉํด๋ณด์. ์ฐ์ ๊ฐ ๋ฉ์๋์ ๋ํ ์ฌ์ฉ๋ฐฉ๋ฒ์ ์ค๋ช
ํ๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
df.withColumn('New column', df['column 1'] + df['column 2'] )
: column 1 ๊ฐ๊ณผ column 2 ๊ฐ์ ๊ฐ ๋ํด New column ์ด๋ผ๋ ์๋ก์ด ํ์๋ณ์ ์์ฑregexp_extract(col('column 1'), '์ ๊ทํํ์ ํจํด')
: column 1์ด๋ผ๋ ์นผ๋ผ ๊ฐ์์ ์ง์ ํด์ค ์ ๊ทํํ์ ํจํด์ ํด๋นํ๋ ๊ฐ๋ค๋ง ์ถ์ถ(extract)ํด๋ผ.(์ฌ๊ธฐ์col('column 1')
์df['column 1']
๋ก ํํํด๋ ๋์ผํ ํํ์ด๋ค.)
์ฌ๊ธฐ์๋ ํ์น๊ฐ๋ค์ ์ด๋ฆ ์ค Mr, Mrs์ ๊ฐ์ ์ฑ๋ณ๊ณผ ์ฐ๋ น ์๋ฏธ๋ฅผ ๋ดํฌํ๊ณ ์๋ salutation๋ง์ ์ถ์ถํ์.(salutation์ ์ฐ๋ฆฌ๋ง๋ก '์ธ์ฌ๋ง'์ด๋ฉฐ ์ฌ๊ธฐ์๋ ์ฌ๋ ์ด๋ฆ์ ๊ฐ์ฅ ๋งจ ์์ ๋์ค๋ Mr, Mrs ๊ฐ์ ๊ฒ๋ค์ ์๋ฏธํ๋ค.)
# ์๋ก์ด ํ์๋ณ์๋ฅผ ์์ฑํ ๋ฐ์ดํฐํ๋ ์์ ์๋ก ํ ๋น!
df = df.withColumn("Initial", regexp_extract(col("Name"),
"([A-Za-z]+)\.", # ()์ด๊ฒ ํ๋์ ๊ทธ๋ฃน์!
1)) # ๊ทธ๋ฃน ์ธ๋ฑ์ค๋ 1๋ถํฐ!
df.limit(3).show()
์ถ๊ฐ์ ์ผ๋ก ํ์น๊ฐ๋ค์ salutation ๊ฐ์ ์คํ์๊ฐ ์์ด replace([์คํ์], [์์ ๋ ๊ธ์])
๋ฉ์๋๋ฅผ ํตํด ์์ ํด์ค๋ค. ์์ ํ ๋ฌธ์์ด๊ณผ ๋ฐ๋ก์ก์ ๋ฌธ์์ด์ ๋ฆฌ์คํธ ์๋ฃ๊ตฌ์กฐ๋ก ์ฌ๋ฌ๊ฐ ๋ถ์ฌํ ์ ์๋ค.
df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
['Miss','Miss','Miss','Mr','Mr', 'Mrs', 'Mrs', 'Other', 'Other','Other','Mr','Mr','Mr'])
# Initial ๋ณ์ ๊ฐ๋ค๋ก ๊ทธ๋ฃนํํ ํ ํ๊ท Age ๊ตฌํ๊ธฐ
df.groupby('Initial').avg('Age').collect()
์ ์ฝ๋์์ collect()
๊ฐ ๋ฑ์ฅํ๋ค. collect()
๋ select()
์ ๋ง์ฐฌ๊ฐ์ง๋ก ์ผ๋ถ์ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ๋ ์ญํ ์ ํ๋ค. ํ์ง๋ง ์ฌ์ฉ๋ชฉ์ ์ ๋ฐ๋ผ ์ฝ๊ฐ์ ์ฐจ์ด์ ์ด ์กด์ฌํ๋ค. collect()
๋ฉ์๋๋ ๋ค์๊ณผ ๊ฐ์ ๊ฒฝ์ฐ์ ์ฌ์ฉํ๋ ๊ฒ์ด ๊ถ์ฅ๋๋ค.
- ์ ์ ์์ ๋ฐ์ดํฐ์ ์ ๋ฐํํ ๋ ์์ฃผ ์ฌ์ฉ. ํฐ ๋ฐ์ดํฐ์ ์ ๋ก๋ํ ๋๋ ๋ฉ๋ชจ๋ฆฌ ์๋ฌ๊ฐ ๋ฐ์ํ ๊ฐ๋ฅ์ฑ์ด ๋๋ค.
- ๋ณดํต
filter()
,group()
,count()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ ํ ๊ฐ์ด ์์ฃผ ์ฌ์ฉ๋๋ค. - ์ธ์์ ์๋ฌด๊ฒ๋ ๋ฃ์ง ์์ผ๋ฉด ์ฆ,
collect()
์์ฒด๋ก๋ง ์ฌ์ฉํ๋ค๋ฉด ํด๋น ๋ฐ์ดํฐํ๋ ์์ ๋ชจ๋ row๋ฅผ ๋ฐํํ๋ค. collect()
๊ฐ ๋ฐํํ๋ ํ์ ์ Spark์ ๋ฐ์ดํฐํ๋ ์ ํํ๊ฐ ์๋ PySpark์ Row ํ์ ์ ๋ฐ์ดํฐ๊ฐ ๋ด๊ธดlist
๋ฅผ ๋ฐํํ๋ค.list
์๋ฃ๊ตฌ์กฐ์ด๊ธฐ ๋๋ฌธ์ loop ๋ฌธ์ผ๋ก ํ์ฉ์ด ๊ฐ๋ฅํ๋ค. ๋ํ PySpark์ Row ํ์ ์ Python์ named Tuple ํํ์ด๋ค.
์ ์ฝ๋์์ ํ์น๊ฐ ์ด๋ฆ์ salutation ๋ณ๋ก Age ํ๊ท ๊ฐ์ ๊ณ์ฐํ๊ณ ์ด๋ฅผ ์ด์ฉํด ๊ฒฐ์ธก์น๋ฅผ ๋์ฒดํด๋ณด์.
df = df.withColumn('Age',
when((df['Initial'] == 'Miss') & (df['Age'].isNull()),
22).otherwise(df['Age']))
df = df.withColumn('Age',
when((df['Initial'] == 'Other') & (df['Age'].isNull()),
46).otherwise(df['Age']))
df = df.withColumn('Age',
when((df['Initial'] == 'Master') & (df['Age'].isNull()),
5).otherwise(df['Age']))
df = df.withColumn('Age',
when((df['Initial'] == 'Mr') & (df['Age'].isNull()),
33).otherwise(df['Age']))
df = df.withColumn('Age',
when((df['Initial'] == 'Mrs') & (df['Age'].isNull()),
36).otherwise(df['Age']))
๊ฒฐ์ธก์น๊ฐ ์กด์ฌํ๋ ์นผ๋ผ์ด Age
์ด์ธ์ Embarked
์๋ ์์๋๋ฐ Embarked์ ๊ฒฐ์ธก์น๋ฅผ ํ ๋ฒ ์ดํด๋ณด์.
# Embarked ๋ณ์์๋ ๊ฒฐ์ธก์น๊ฐ 2๊ฐ ์์๋๋ฐ ๋ฌด์์ธ์ง ํ์ธํด๋ณด๊ธฐ
df.groupBy('Embarked').count().show()
๊ฒฐ์ธก์น๊ฐ 2๊ฐ๋ฐ์ ๋์ง ์๊ธฐ ๋๋ฌธ์ ๊ฒฐ์ธก์น๋ Embarked์ ์ต๋น๊ฐ์ธ 'S' ๊ฐ์ผ๋ก ๋์ฒดํด์ฃผ์. ๊ฒฐ์ธก์น๋ฅผ ๋์ฒดํ๋ ๋ ๋ค๋ฅธ ๋ฉ์๋๋ ๋ค์๊ณผ ๊ฐ๋ค. df.na.fill({'column' : 'value'})
# Embarked์ ๊ฒฐ์ธก์น๋ ์ต๋น๊ฐ์ธ 'S'๋ก ๋์ฒดํด์ฃผ๊ธฐ
df = df.na.fill({"Embarked": "S"})
# ๊ฒฐ์ธก์น๊ฐ ์ฑ์์ก๋์ง ๋ค์ ํ์ธ
df.groupBy('Embarked').count().show()
4. Feature Engineering ํ๊ธฐ
์ 3๋ฒ์ด๋ ์ผ๋ถ ์ค๋ณต๋๋ ๋ด์ฉ์ผ ์๋ ์๋ค. 3๋ฒ ๋ชฉ์ฐจ์์ Name
์นผ๋ผ์ ์ด์ฉํด Initial
์ด๋ผ๋ ์๋ก์ด ํ์๋ณ์๋ฅผ ์์ฑํ ๊ฒ์ฒ๋ผ withcolumn
, when
, otherwise
๋ฅผ ์ฌ์ฉํด Feature Engineering ์ ์ํํด์ค ์ ์๋ค.
# Family size๋ผ๋ ํ์๋ณ์ ์์ฑ
df = df.withColumn("Family_Size",
col('SibSp')+col('Parch')) # df['SibSp']๋ ๊ฐ๋ฅ!
# Alone์ด๋ผ๋ Binary ํ์๋ณ์ ์์ฑํ๋๋ฐ, ์ฐ์ 0์ผ๋ก ๋ค ํด๋๊ธฐ
df = df.withColumn('Alone', lit(0))
# ์กฐ๊ฑด์ ๋ง๊ฒ Alone ๋ณ์๊ฐ ๋ณ๊ฒฝ
df = df.withColumn('Alone',
when(col('Family_Size') == 0, 1)\
.otherwise(col('Alone')))
๋ค์์ ๋ฌธ์์ด๋ก ๋์ด์๋ ๋ณ์์ธ Sex
, Embarked
, Initial
๊ฐ์ ์ซ์๋ก ๋ณํํด์ฃผ๋ ์ฆ, Label Encoding์ ํด์ฃผ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ณด์. StringIndexer()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค. ์ด๋ฆ ๊ทธ๋๋ก String์ Index(์ซ์)๋ก ๋ณํํด์ฃผ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค. ์์ธํ ์ฌ์ฉ๋ฒ์ ํ๋จ์ ์ฝ๋๋ฅผ ์ฐธ๊ณ ํ์.
convert_cols = ['Sex', 'Embarked', 'Initial']
# ์ถํ์ IndexToStringํ ๋ ค๋ฉด indexer ๊ฐ์ฒด๋ฅผ ์ฌ์ฉํ๋ฉด ๋จ!
indexer = [StringIndexer(inputCol=col,
outputCol=col+'_index').fit(df) for col in convert_cols]
for i in indexer:
print(i)
print('-'*80)
print(type(indexer))
์ ๊ฒฐ๊ณผ๊ฐ ํ๋ฉด์์ uid
๋ฅผ ๋ณด๋ฉด ์ธ ๊ฐ๊ฐ ๋ชจ๋ ๊ฐ๊ธฐ ๋ค๋ฅธ ๊ฐ์ ๊ฐ๋ ๊ฒ์ ์ ์ ์๋ค. ์ฆ, ์นผ๋ผ ๋ณ๋ก ๊ณ ์ ํ StringIndexer๊ฐ ๋ง๋ค์ด์ก์์ ์ ์ ์๋ค. ์ด์ fit()
๊ณผ transform()
๋ฉ์๋๋ฅผ ์ด์ฉํด Label Encoding ์ ์ํํด๋ณด์.
# Pipeline์ ์ด์ฉํด stage์๋ค๊ฐ ์คํ ๊ณผ์ ๋ด์ ๋๊ธฐ๊ธฐ
pipeline = Pipeline(stages=indexer)
df = pipeline.fit(df).transform(df)
5. ๋ถํ์ํ ์นผ๋ผ๋ค ์ญ์ ํ ์ต์ข Feature๋ค์ Vector๋ก ๋ณํํ๊ธฐ
PySpark์์ ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ ์ญ์ ํ๋ ค๋ฉด Pandas์๋ ์ฝ๊ฐ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ ์ฌ์ฉํด์ผ ํธ๋ฆฌํ๋ค. ์ฐ์ 2๊ฐ์ง ๋ฐฉ๋ฒ์ ์ฐจ์ด์ ์ ์์๋ณด์.
- Pandas :
df.drop(['columnA', 'columnB', 'columnC'], axis=1)
- PySpark :
df.drop('columnA', 'columnB', 'columnC')
์ฆ, drop()
๋ฉ์๋ ์์ [](๋ฆฌ์คํธ)
๊ฐ ๋ค์ด๊ฐ๋์ง ์ฌ๋ถ์ ์ฐจ์ด๋ค. PySpark๋ ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ ์ญ์ ํ ๋๋ [](๋ฆฌ์คํธ)
๋ก ๊ฐ์ธ์ฃผ์ด์ ์๋๋ค. ๋ฐ๋ผ์ Native Python์ ๊ธฐ๋ฅ ์ค ํ๋์ธ *(unpacking)
์ ์ฌ์ฉํ๋ฉด ๋๋ค.
unpacking์ ์ฌ์ฉํ๋ ์ด์ ๋ ๋ฌผ๋ก ๋ถํ์ํ ์นผ๋ผ์ด ํ, ๋๊ฐ ์ ๋ ์ผ ๋๋ ์ง์ ์ ์ํด์ฃผ์ด๋ ๋์ง๋ง ๋ง์ฝ 100๊ฐ, 200๊ฐ๋ก ๋์ด๋๋ค๋ฉด ๋ถํ์ํ ์นผ๋ผ๋ค๋ง ๋ฆฌ์คํธ๋ก ์ถ์ถํ ํ unpacking์ ์ํํ๋ฉด ์ผ์ผ์ด ์ ๋ ฅํ๋ ์๊ฐ์ ์ค์ผ ์ ์์ ๊ฒ์ด๋ค.
un_cols = ["PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial"]
df = df.drop(*un_cols)
print("์ญ์ ํ ๋จ์ ์นผ๋ผ๋ค:", df.columns)
์ด์ ๋ถํ์ํ ๋ณ์๋ค๋ ์ญ์ ํ๊ณ ์ต์ข
๋จ์ Feature๋ค์ Vector๋ก ๋ณํ์์ผ ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ์
๋ ฅ์ํฌ ์ค๋น๋ฅผ ํด๋ณด์. VectorAssembler()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋๋ฐ, ํ๋ผ๋ฏธํฐ ์ธ์๊ฐ StringIndexer()
๋ ๋น์ทํ์ง๋ง ์ธ๋ถ์ ์ผ๋ก ๋ค์๊ณผ ๊ฐ์ 2๊ฐ์ง ์ฐจ์ด์ ์ด ์กด์ฌํ๋ค.
- ํ๋ผ๋ฏธํฐ ์ธ์ ์ค
inputCol
๋์ s๊ฐ ๋ถ์inputCols
์ด๋ค. VectorAssembler
๋ฅผ ์ ์ํด์ฃผ๊ณ ์ํํด ์ค ๋fit
์ ํ์ง ์๊ณ ๋ฐ๋กtransform
์ ์ํํด์ค๋ค.
feature = VectorAssembler(inputCols = df.columns[1:],
outputCol='features')
feature_vector = feature.transform(df) # ๋ฐ์ดํฐํ๋ ์ ํํ๋ก ๋ฐํ
print('feature type:', type(feature))
print('feature_vector type', type(feature_vector))
feature_vector.limit(3).toPandas()
์ ๊ฒฐ๊ณผ์ ๋นจ๊ฐ์ ๋ค๋ชจ์นธ์ ๋ณด๊ฒ ๋๋ฉด ๋ชจ๋ Feature๋ค์ ๋ํด ํ๋์ ๋ฒกํฐ๋ก ๋ง๋ ๊ฒ์ ๋ณผ ์ ์๋ค. ๊ทธ๋ฐ๋ฐ ๊ฒฐ๊ณผ๊ฐ์ ํ์ธํ๋ค๊ฐ ํน์ดํ ์ ์ ๋ฐ๊ฒฌํ๋ค. features ๊ฐ๋ค ์ค ์ด๋ค ๊ฒ๋ค์ Tuple๋ก ๊ฐ์ธ์ง ๋ฒกํฐ๊ฐ, ์ด๋ค ๊ฒ๋ค์ List๋ก ๊ฐ์ธ์ง ๋ฒกํฐ๊ฐ ๋ค์ด์์๋ค. ๊ตฌ๊ธ๋ง์ ํด๋ด๋ ๋ชจ๋ฅด๊ธฐ์ StackOverflow์ ์ง๋ฌธ์ ์ฌ๋ ค๋๊ณ ๋ต๋ณ์ ํ์ฌ ๊ธฐ๋ค๋ฆฌ๊ณ ์๋ค..(ํน์ ์์๋ ๋ถ ์๋ค๋ฉด ๋๊ธ๋ก ๋ต๋ณํด์ฃผ์๋ฉด ๋๋ฌด๋๋ ๊ฐ์ฌํ๊ฒ ์ต๋๋ค!)
6. Train, Test ๋ฐ์ดํฐ ๋ถํ
์ด์ Feature๋ค์ ๋ฒกํฐํ ์์ผฐ๊ณ ๋ฐ์ดํฐ๋ฅผ ํ์ต, ํ
์คํธ์ฉ์ผ๋ก ๋ถํ ํด๋ณด์. randomSplit([train_ratio, test_ratio])
๋ฉ์๋๋ฅผ ํ์ฉํ๋ฉด ๋๋ค.
titanic_df = feature_vector.select(['features', 'Survived'])
# split train, test
(train_df, test_df) = titanic_df.randomSplit([0.8, 0.2], seed=42)
7. ๋จธ์ ๋ฌ๋ ๋ถ๋ฅ ๋ชจ๋ธ ๋ง๋ค๊ธฐ
์ด์ ์ด์ง ๋ถ๋ฅ ๋ชจ๋ธ์ ๋ง๋ค์ด๋ณด์. PySpark์์๋ ParamGridBuilder()
๋ฅผ ํตํด ํ์ดํผํ๋ผ๋ฏธํฐ ํ๋์ ์ํํ๋ฉด์ TrainValidationSplit()
๋ฉ์๋๋ก ๊ต์ฐจ๊ฒ์ฆ์ ๋์์ ์ํํ ์ ์๋ค.
# ๋ถ๋ฅ ๋ชจ๋ธ
from pyspark.ml.classification import LogisticRegression
# ํ๋ผ๋ฏธํฐ ํ๋ & ๊ต์ฐจ ๊ฒ์ฆ
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.tuning import CrossValidator
# ํ์ดํ๋ผ์ธ
from pyspark.ml import Pipeline
# ๋ฉํธ๋ฆญ ์ป๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# ROC AUC(Sklearn)
from sklearn.metrics import roc_curve, auc
# ๋ชจ๋ธ ์ ์
lr = LogisticRegression(labelCol='Survived')
# ํ๋ํ ํ๋ผ๋ฏธํฐ grid ์ ์
# model.parameter ์์ผ๋ก ์ ์
paramGrid = ParamGridBuilder().addGrid(lr.regParam,
(0.01, 0.1))\
.addGrid(lr.maxIter,
(5, 10))\
.addGrid(lr.tol,
(1e-4, 1e-5))\
.addGrid(lr.elasticNetParam,
(0.25, 0.75))\
.build()
# ๊ต์ฐจ๊ฒ์ฆ ์ ์ - Pipeline์์ผ๋ก ์ ์
tvs = TrainValidationSplit(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(labelCol='Survived'),
trainRatio=0.8)
# ํ์ต์ fit์ผ๋ก!
model = tvs.fit(train_df)
# ํ๊ฐ๋ transform์ผ๋ก!
model_prediction = model.transform(test_df)
# ๋ฉํธ๋ฆญ ํ๊ฐ
print('Accuracy:',
MulticlassClassificationEvaluator(labelCol='Survived',
metricName='accuracy').evaluate(model_prediction))
print('Precision:',
MulticlassClassificationEvaluator(labelCol='Survived',
metricName='weightedPrecision').evaluate(model_prediction))
ํ์๋ ๋ ๋ค๋ฅธ ๋ถ๋ฅ ๋ฉํธ๋ฆญ์ผ๋ก AUC Score๋ฅผ ๊ณ์ฐํ๊ธฐ ์ํด ๋ฐ์ดํฐ๊ฐ Positive(1)๋ก ์์ธก๋ ํ๋ฅ ๊ฐ์ด ํ์ํ๋ค. ๊ทธ๋์ ์ ์ฝ๋ ์ค model_prediction
๊ฐ์ฒด์ ํ ๋น๋ ๊ฐ๋ค์ ๋ก๋ํด ๋ณด์๋ค.
model_prediction.show(10)
์ ๊ฒฐ๊ณผ๊ฐ ๋ฐ์ดํฐํ๋ ์์์ ๋ณ์๊ฐ ์๋ฏธํ๋ ๋ฐ๋ ๋ค์๊ณผ ๊ฐ๋ค.
- featrues : (10, [0,1,2,4,5], ... ) : 10๊ฐ์ feature๊ฐ ์กด์ฌํ๊ณ ๊ฐ feature์ ๊ฐ๋ค์ 0,1,2,4,5 ์ด๋ค. ์ด์ ๋ํ ์ค๋ช ์ StackOverflow๋ฅผ ์ฐธ์กฐํด๋ณด์.
rawPrediction
: ํด๋น feature๋ฅผ ํ๊ท ๋ชจ๋ธ์ ๋ฃ์์ ๋ ๊ณ์ฐ๋์ด ๋์ค๋ Rawํ ๊ฒฐ๊ณผ๊ฐprobability
:rawPrediction
๊ฐ์ ๋ก์ง์คํฑ ํจ์๋ฅผ ์ ์ฉํ ํ ๋ณํ๋ ๊ฐ. ์ฆ 0๊ณผ 1์ฌ์ด์ ํ๋ฅ ๊ฐ์ผ๋ก ๋งคํ๋ ๊ฐprediction
:probability
๊ฐ ํน์ ์๊ณ๊ฐ ๊ธฐ์ค์ ์ํด 1 ๋๋ 0์ผ๋ก ๋ถ๋ฅ๋ ํด๋์ค(label)
8. ROC Curve ์๊ฐํํ๊ณ AUC Score ๊ณ์ฐํ๊ธฐ
์ด์ ์ ํ
์ด๋ธ์์ probability
์ Survived
์นผ๋ผ๋ค๋ง ๋ฝ์์ AUC ๊ณ์ฐ์ ํ์ํ label ๊ฐ๊ณผ Positive(1)๋ก ์์ธก๋ ํ๋ฅ ๊ฐ๋ง์ ๋ฝ์๋ณด์.
# Evaluate ROC metric
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark import SparkContext
# SparkContext๋ฅผ ๋ง๋ค๊ธฐ
sc = SparkContext.getOrCreate()
# ROC ์ ์์ธ AUC๋ฅผ ๊ณ์ฐ ์ํด Logistic๋ฅผ ์ ์ฉํด ๋์จ ํ๋ฅ ๊ฐ๊ณผ ๋ ์ด๋ธ๋ง ๊ฐ์ ธ์ค๊ธฐ
results = model_prediction.select(['probability', 'Survived'])
# ํ๋ฅ ๊ฐ - ๋ ์ด๋ธ set ์ค๋น์ํค๊ธฐ
# collect()๋ก ๋ชจ๋ ๋ฐ์ดํฐ row retrieve(๋ฐํ) - ๋ฆฌ์คํธ ํํ๋ก ๋ฐํ
results_collect = results.collect()
# named tuple ํ์์ด๊ธฐ ๋๋ฌธ์ key ๊ฐ์ผ๋ก ์ํ๋ ๊ฐ์ ๋ถ๋ฌ์ฌ ์ ์๋ค!
print(results_collect[0])
print()
print('probability:', results_collect[0].probability)
print('Survived:', results_collect[0].Survived)
์ ๊ฒฐ๊ณผํ๋ฉด์์ ํ๋์ ๋ค๋ชจ์นธ์ Negative(0)์ผ๋ก ์์ธก ๋ ํ๋ฅ , ๋นจ๊ฐ์ ๋ค๋ชจ์นธ์ Positive(1)๋ก ์์ธก๋ ํ๋ฅ ์ ์๋ฏธํ๋ค. ๋ฐ๋ผ์ AUC ๊ณ์ฐ์ ์ํด ํ์ํ ๋ถ๋ถ์ ๋นจ๊ฐ์ ๋ค๋ชจ์นธ๊ณผ Survived ๊ฐ, ์ด 2๊ฐ์ง๋ค. ์ด์ ํ์ํ ๊ฐ์ด ์ด๋์ ์์นํ์ง ์๊ฒ ๋์์ผ๋ ์ด๋ฅผ ์ด์ฉํด ROC Curve ์ AUC ๊ณ์ฐ์ ํด๋ณด์.
# Evaluate ROC metric
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark import SparkContext
# SparkContext๋ฅผ ๋ง๋ค๊ธฐ
sc = SparkContext.getOrCreate()
# ROC ์ ์์ธ AUC๋ฅผ ๊ณ์ฐ ์ํด Logistic๋ฅผ ์ ์ฉํด ๋์จ ํ๋ฅ ๊ฐ๊ณผ ๋ ์ด๋ธ๋ง ๊ฐ์ ธ์ค๊ธฐ
results = model_prediction.select(['probability', 'Survived'])
# ํ๋ฅ ๊ฐ - ๋ ์ด๋ธ set ์ค๋น์ํค๊ธฐ
# collect()๋ก ๋ชจ๋ ๋ฐ์ดํฐ row retrieve(๋ฐํ) - ๋ฆฌ์คํธ ํํ๋ก ๋ฐํ
results_collect = results.collect()
results_list = [(float(i.probability[1]),
float(i.Survived)) for i in results_collect]
# ์ฌ๋ฌ๊ฐ์ ํํ์ด ๋ด๊ธด list๋ฅผ RDD ์๋ฃ๊ตฌ์กฐ๋ก ๋ณ๊ฒฝ
scoreAndLabels = sc.parallelize(results_list)
# ROC metric ๊ณ์ฐํ๊ธฐ
metrics = metric(scoreAndLabels)
auc = metrics.areaUnderROC
# Visualize ROC Curve
from sklearn.metrics import roc_curve, auc
# roc_curve ๋ ์ค์ ๊ฐ, 1๋ก์ ์์ธกํ๋ฅ ๊ฐ์ ์ธ์๋ก ๋ฃ์ด์ฃผ๋ฉด FPR, TPR, ์๊ณ๊ฐ์ ๋ฐํํด์ค
fpr = []
tpr = []
roc_auc = []
y_test = [i[1] for i in results_list]
y_proba = [i[0] for i in results_list]
fpr, tpr, _ = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)
plt.figure()
# x์ถ์ Fall-out(FPR), y์ถ์ Recall(TPR)
plt.plot(fpr, tpr, label='ROC Curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title("Area under the ROC Curve")
plt.legend(loc='lower right')
plt.show()
์ด๋ ๊ฒ Titanic ๋ฐ์ดํฐ๋ฅผ ํ์ฉํ ๋จธ์ ๋ฌ๋ ๋ถ๋ฅ ๋ชจ๋ธ์ PySpark๋ฅผ ํ์ฉํด ๋ง๋ค๊ณ ์ฑ๋ฅํ๊ฐ๊น์ง ์งํํด๋ณด์๋ค. ํด๋น ํฌ์คํ ์ ์ฌ์ฉ๋ Logistic Regression ์ด์ธ์ ์ถ๊ฐ์ ์ผ๋ก ์ฌ์ฉํด๋ณธ Random Forest, XGBoost ๋ชจ๋ธ์ Kaggle ๋ ธํธ๋ถ ์๋ณธ์ด๋ ํ์์ Github ์ฝ๋๋ฅผ ์ดํด๋ณด๋ฉด ๋์์ด ๋ ๊ฒ์ด๋ค. ๊ธฐ๋ณธ์ ์ธ ๋ชจ๋ธ ๋น๋ฉ, ํ๊ฐ ํ๋ ์์ ๋์ผํ๊ธฐ ๋๋ฌธ์ ๋ชจ๋ธ ํ๋ผ๋ฏธํฐ๋ง ๋ณ๊ฒฝํ๋ฉด ๋๋ค.
'Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ ๊ตฌ์กฐ์ Sources (0) | 2021.04.23 |
---|---|
[PySpark] ์ปจํ ์ธ ๊ธฐ๋ฐ ์ํ ์ถ์ฒ ์์คํ ๋ง๋ค์ด๋ณด๊ธฐ (18) | 2021.02.15 |
[PySpark] PySpark๋ก Regression ๋ชจ๋ธ ๋ง๋ค๊ธฐ (0) | 2021.02.04 |
[PySpark] Spark SQL ํํ ๋ฆฌ์ผ (0) | 2021.02.01 |
[PySpark] Apache Spark ์ RDD ์๋ฃ๊ตฌ์กฐ (0) | 2021.01.30 |