๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] ํƒ€์ดํƒ€๋‹‰ ๋ฐ์ดํ„ฐ๋กœ ๋ถ„๋ฅ˜ ๋ชจ๋ธ ๋งŒ๋“ค๊ธฐ

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ๋ณธ ํฌ์ŠคํŒ…์€ PySpark๋ฅผ ํ™œ์šฉํ•œ Kaggle Notebook์„ ํ•„์‚ฌํ•˜๋ฉด์„œ ๋ฐฐ์šฐ๊ฒŒ ๋œ ์ฝ”๋“œ ๋‚ด์šฉ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํฌ์ŠคํŒ…ํ–ˆ์Œ์„ ์•Œ๋ ค๋“œ๋ฆฝ๋‹ˆ๋‹ค. ๋˜ํ•œ ์•ž์œผ๋กœ ์†Œ๊ฐœ๋  PySpark์˜ ๋ฌธ๋ฒ•์— ๋Œ€ํ•ด์„œ ์ƒ์†Œํ•˜์‹œ๋‹ค๋ฉด ์—ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ•ด ๊ฐ„๋‹จํ•œ ์˜ˆ์‹œ๋ฅผ ํ†ตํ•ด ์ดํ•ด๋ฅผ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” PySpark ๊ทธ ์ค‘์—์„œ๋„ Spark SQL ๊ณผ Spark MLlib์„ ํ™œ์šฉํ•œ ๋จธ์‹ ๋Ÿฌ๋‹ ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์†Œ๊ฐœํ•˜๋ ค๊ณ  ํ•œ๋‹ค. ํ™œ์šฉํ•œ ๋ฐ์ดํ„ฐ๋Š” ๋จธ์‹ ๋Ÿฌ๋‹์— ์ž…๋ฌธํ•  ๋•Œ ๊ฐ€์žฅ ๋งŽ์ด ์‚ฌ์šฉ๋˜๋Š” ํƒ€์ดํƒ€๋‹‰์˜ Train ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ–ˆ๋‹ค.

 

Titanic ์‚ฌ๊ฑด์€ ์ˆ˜๋งŽ์€ ์‚ฌ๋ง์ž๋ฅผ ์–‘์‚ฐํ•œ ๋น„๊ทน์ ์ธ ์‹คํ™” ์ด์•ผ๊ธฐ์ด๋‹ค.

1. ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์™€ ๋ฐ์ดํ„ฐ ๋กœ๋“œ

๊ฐ€์žฅ ๋จผ์ € ํ•  ์ผ์€ ๋ฐ์ดํ„ฐ ๋กœ๋“œ์™€ ๋ชจ๋ธ ๋นŒ๋”ฉ ์‹œ ์‚ฌ์šฉ๋  Spark SQL, Spark MLlib์˜ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ๋ฉ”์†Œ๋“œ๋“ค์„ ์ž„ํฌํŠธํ•˜์ž. ์ฐธ๊ณ ๋กœ PySpark๋ฅผ ์‚ฌ์šฉํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•  ๋•Œ๋Š” os.chdir() ๋กœ ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ๋ณ€๊ฒฝํ•˜๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•  ์ˆ˜๊ฐ€ ์—†๋‹ค๋Š” ์ ์„ ์•Œ์•„๋‘์ž.(์ด์œ ๋ฅผ ๊ตฌ๊ธ€๋งํ•ด์„œ ์ฐพ์•„๋ณด๋ ค ํ–ˆ๋Š”๋ฐ.. spark os.chdir ๋“ฑ๊ณผ ๊ฐ™์€ ํ‚ค์›Œ๋“œ๋กœ.. ๊ทธ๋Ÿฐ๋ฐ ์ด์œ ์— ๋Œ€ํ•ด์„œ๋Š” ์ฐพ์ง€ ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.. ์ด์— ๋Œ€ํ•ด ์•„๋Š” ๋ถ„์ด ์žˆ๋‹ค๋ฉด ๋Œ“๊ธ€ ๋‹ฌ์•„์ฃผ์‹œ๋ฉด ๋งค์šฐ ๊ฐ์‚ฌํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค (__) )

 

import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
plt.rc('font', family='AppleGothic')
plt.rcParams['axes.unicode_minus'] = False
# Pyspark - SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

# Pyspark - ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

# Spark์˜ ์ฒซ ์‹œ์ž‘์ธ SparkSession์„ ๋งŒ๋“ค์–ด์ฃผ์ž!
spark = SparkSession.builder\
        .appName('Play with pypsark ML on Titanic Data')\
        .getOrCreate()
# ๋ฐ์ดํ„ฐ ๋กœ๋“œ
df = spark.read.csv('/Users/younghun/Desktop/gitrepo/data/titanic/train.csv', header=True, inferSchema=True)
# toPandas()๋ฅผ ์ด์šฉํ•ด ํŒ๋‹ค์Šค์—์„œ ์ œ๊ณตํ•˜๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ์ถœ๋ ฅ
df.limit(3).toPandas()

 

3๊ฐœ์˜ ๋ฐ์ดํ„ฐ ๋ฏธ๋ฆฌ๋ณด๊ธฐ

  • df.limit(num=3) : ํ•ด๋‹น ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. Pandas์—์„œ๋Š” head() ์™€ ๊ฐ™๋‹ค๊ณ  ์ƒ๊ฐํ•˜๋ฉด ๋œ๋‹ค. ์ฐจ์ด์ ์ด๋ผ๊ณ  ํ•œ๋‹ค๋ฉด Pandas์—์„œ head() ๋Š” ์•„๋ฌด๋Ÿฐ ์ธ์ž๋ฅผ ๋„ฃ์ง€ ์•Š์•„๋„ ๋””ํดํŠธ๊ฐ€ 5๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด์—ฌ์ฃผ์ง€๋งŒ limit() ์€ ์–ด๋–ค ์ธ์ž๋„ ๋„ฃ์–ด์ฃผ์ง€ ์•Š๋Š”๋‹ค๋ฉด ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.
  • df.toPandas() : ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ Pandas์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ๋ณ€ํ™˜ํ•ด์ค€๋‹ค. ๋”ฐ๋ผ์„œ ์ƒˆ๋กœ์šด ๋ณ€์ˆ˜๋กœ ํ• ๋‹น ์‹œํ‚ค๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

2. Matplotlib, Seaborn์„ ํ™œ์šฉํ•œ ๊ฐ„๋‹จํ•œ ์‹œ๊ฐํ™”

Matplotlib, Seaborn ์„ ํ™œ์šฉํ•˜๋ ค๋ฉด ์šฐ์„  Spark์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์—์„œ Pandas์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๊ณ  ์‹œ๊ฐํ™”๋ฅผ ๊ตฌํ˜„ํ•ด์•ผ ํ•œ๋‹ค. ๋”ฐ๋ผ์„œ ๋จผ์ € toPandas() ๋ฉ”์†Œ๋“œ๋กœ Pandas ํ˜•ํƒœ์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒˆ๋กœ ํ• ๋‹นํ•ด์ฃผ์ž.

 

# Pandas ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ์šฐ์„  ๋ณ€ํ™˜!
pandas_df = df.toPandas()
print("pandas_df ํƒ€์ž…:", type(pandas_df))

# Seaborn ์‚ฌ์šฉํ•ด๋ณด๊ธฐ
plt.figure(figsize=(10, 5))
plt.title("ํƒ€์ดํƒ€๋‹‰ ํƒ‘์Šน๊ฐ์˜ Age KDE ๋ถ„ํฌ")
sns.distplot(pandas_df['Age'])
plt.show()

 

Age ์นผ๋Ÿผ์˜ Distplot ๊ทธ๋ ค๋ณด๊ธฐ

 

์œ„ ๊ฒฐ๊ณผํ™”๋ฉด์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋“ฏ์ด toPandas() ๋ฉ”์†Œ๋“œ๋กœ ์ƒˆ๋กœ ํ• ๋‹นํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ type์€ pandas.dataframe ์ž„์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

3. PySpark๋กœ ๊ฒฐ์ธก์น˜ ์ฒดํฌํ•˜๊ณ  ๊ธฐ์กด๋ณ€์ˆ˜๋กœ๋ถ€ํ„ฐ ํŒŒ์ƒ๋ณ€์ˆ˜ ์ƒ์„ฑํ•˜๊ธฐ

๊ฒฐ์ธก์น˜๋ฅผ ์ฒดํฌํ•˜๊ธฐ ์œ„ํ•ด ๋ฌผ๋ก  Spark์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋ฅผ Pandas ํ˜•ํƒœ์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ๋ฐ”๊พผ ํ›„ ํ‰์†Œ์— ์ž์ฃผ ์‚ฌ์šฉํ–ˆ๋˜ Pandas API์—์„œ ์ œ๊ณตํ•˜๋Š” ๊ฒฐ์ธก์น˜ ํ™•์ธ ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด๋„ ๋œ๋‹ค. ํ•˜์ง€๋งŒ ํ•„์ž๊ฐ€ ์ด๋Ÿฐ ํฌ์ŠคํŒ…์„ ์ž‘์„ฑํ•˜๋Š” ๋ชฉ์ ์€ PySpark์— ๋Œ€ํ•ด ์ต์ˆ™ํ•ด์ง์ด ๋ชฉ์ ์ด๊ธฐ ๋•Œ๋ฌธ์— Spark์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ์—์„œ ๊ฒฐ์ธก์น˜๋ฅผ ์ฒดํฌํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž. ๊ฒฐ์ธก์น˜๋ฅผ ์ฒดํฌํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ํฌ๊ฒŒ 2๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค.

 

  1. isnan() : pyspark.sql.functions ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์— ์†ํ•œ๋‹ค. ์‚ฌ์šฉ๋ฐฉ๋ฒ•์€ isnan('์ฒดํฌํ•  column ์ด๋ฆ„')
  2. isNull() : pyspark.sql.Column ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์— ์†ํ•œ๋‹ค. ์‚ฌ์šฉ๋ฐฉ๋ฒ•์€ col('์ฒดํฌํ•  column ์ด๋ฆ„').isNull()
# ์œ„ 2๊ฐ€์ง€ ๋ฐฉ๋ฒ•์„ ๋™์‹œ์— ์‚ฌ์šฉํ•ด๋ณด์ž!
# ๊ฒฐ์ธก์น˜๊ฐ€ ์žˆ๋Š” ๋ณ€์ˆ˜๋ฅผ ์ฒดํฌํ•˜๊ณ  ๊ฒฐ์ธก์น˜๊ฐ€ ๋ช‡ ๊ฐœ ์žˆ๋Š”์ง€ ์‚ดํŽด๋ณด๊ธฐ
from pyspark.sql.functions import isnan, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])\
  .show()

 

์œ„ ์ฝ”๋“œ์—์„œ when() ๋ฉ”์†Œ๋“œ๊ฐ€ ๋“ฑ์žฅํ•˜๋Š”๋ฐ when() ๋ฉ”์†Œ๋“œ๋Š” filter() ๋ฉ”์†Œ๋“œ์™€ ๋น„์Šทํ•œ ๊ธฐ๋Šฅ์„ ํ•œ๋‹ค๊ณ  ๋ณผ ์ˆ˜ ์žˆ๋‹ค. when(์กฐ๊ฑดA, ์กฐ๊ฑดA๊ฐ€ True์ผ ์‹œ value).otherwise(์กฐ๊ฑดA๊ฐ€ False์ผ ์‹œ value)๋กœ ์‚ฌ์šฉํ•œ๋‹ค. ์ฐธ๊ณ ๋กœ ์กฐ๊ฑด์„ ์—ฌ๋Ÿฌ๊ฐœ ๋„ฃ๊ณ  ์‹ถ๋‹ค๋ฉด ๋…ผ๋ฆฌ์—ฐ์‚ฐ์ž | , & ์‚ฌ์šฉ์ด ๊ฐ€๋Šฅํ•˜๋‹ค. ์œ„ ์ฝ”๋“œ์— ๋Œ€ํ•œ ๊ฒฐ๊ณผํ™”๋ฉด์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

๊ฒฐ์ธก์น˜๋ฅผ ์ฒดํฌํ•œ ๊ฒฐ๊ณผ ํ™”๋ฉด

 

์ถ”๊ฐ€์ ์œผ๋กœ ๋ถˆํ•„์š”ํ•œ ์นผ๋Ÿผ์ธ Cabin ์นผ๋Ÿผ์„ ์—†์• ์ฃผ๊ธฐ ์œ„ํ•ด drop() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋Š” Pandas์™€ ๋ฉ”์†Œ๋“œ ์ด๋ฆ„์ด ๋™์ผํ•˜์ง€๋งŒ inplace = False ์ด๊ธฐ ๋•Œ๋ฌธ์— ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์žฌํ• ๋‹น์„ ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

# Cabin ๋ณ€์ˆ˜๋Š” ๊ฒฐ์ธก์น˜๊ฐ€ 50%๊ฐ€ ๋„˜๊ธฐ ๋•Œ๋ฌธ์— ํ•ด๋‹น ๋ณ€์ˆ˜๋ฅผ ์‚ญ์ œํ•˜์ž
df = df.drop('Cabin')

 

ํƒ‘์Šน๊ฐ์˜ ์ด๋ฆ„์„ ๋‹ด๊ณ  ์žˆ๋Š” Initial ์นผ๋Ÿผ์„ ์ด์šฉํ•ด ํŒŒ์ƒ๋ณ€์ˆ˜๋ฅผ ๋งŒ๋“ค์–ด๋ณด์ž. withColumn() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋˜๋Š”๋ฐ, ์—ฌ๊ธฐ์„œ๋Š” PySpark์—์„œ ์ œ๊ณตํ•˜๋Š” ์ •๊ทœํ‘œํ˜„์‹ API๋ฅผ ํ•จ๊ป˜ ์ด์šฉํ•ด๋ณด์ž. ์šฐ์„  ๊ฐ ๋ฉ”์†Œ๋“œ์— ๋Œ€ํ•œ ์‚ฌ์šฉ๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

  • df.withColumn('New column', df['column 1'] + df['column 2'] ) : column 1 ๊ฐ’๊ณผ column 2 ๊ฐ’์„ ๊ฐ ๋”ํ•ด New column ์ด๋ผ๋Š” ์ƒˆ๋กœ์šด ํŒŒ์ƒ๋ณ€์ˆ˜ ์ƒ์„ฑ
  • regexp_extract(col('column 1'), '์ •๊ทœํ‘œํ˜„์‹ ํŒจํ„ด') : column 1์ด๋ผ๋Š” ์นผ๋Ÿผ ๊ฐ’์—์„œ ์ง€์ •ํ•ด์ค€ ์ •๊ทœํ‘œํ˜„์‹ ํŒจํ„ด์— ํ•ด๋‹นํ•˜๋Š” ๊ฐ’๋“ค๋งŒ ์ถ”์ถœ(extract)ํ•ด๋ผ.(์—ฌ๊ธฐ์„œ col('column 1') ์„ df['column 1'] ๋กœ ํ‘œํ˜„ํ•ด๋„ ๋™์ผํ•œ ํ‘œํ˜„์ด๋‹ค.)

 

์—ฌ๊ธฐ์„œ๋Š” ํƒ‘์Šน๊ฐ๋“ค์˜ ์ด๋ฆ„ ์ค‘ Mr, Mrs์™€ ๊ฐ™์€ ์„ฑ๋ณ„๊ณผ ์—ฐ๋ น ์˜๋ฏธ๋ฅผ ๋‚ดํฌํ•˜๊ณ  ์žˆ๋Š” salutation๋งŒ์„ ์ถ”์ถœํ•˜์ž.(salutation์€ ์šฐ๋ฆฌ๋ง๋กœ '์ธ์‚ฌ๋ง'์ด๋ฉฐ ์—ฌ๊ธฐ์„œ๋Š” ์‚ฌ๋žŒ ์ด๋ฆ„์˜ ๊ฐ€์žฅ ๋งจ ์•ž์— ๋‚˜์˜ค๋Š” Mr, Mrs ๊ฐ™์€ ๊ฒƒ๋“ค์„ ์˜๋ฏธํ•œ๋‹ค.)

 

# ์ƒˆ๋กœ์šด ํŒŒ์ƒ๋ณ€์ˆ˜๋ฅผ ์ƒ์„ฑํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒˆ๋กœ ํ• ๋‹น!
df = df.withColumn("Initial", regexp_extract(col("Name"),
                                            "([A-Za-z]+)\.", # ()์ด๊ฒŒ ํ•˜๋‚˜์˜ ๊ทธ๋ฃน์ž„!
                                            1)) # ๊ทธ๋ฃน ์ธ๋ฑ์Šค๋Š” 1๋ถ€ํ„ฐ!
df.limit(3).show()

 

Initial ํŒŒ์ƒ๋ณ€์ˆ˜ ๋งŒ๋“  ๊ฒฐ๊ณผ

 

์ถ”๊ฐ€์ ์œผ๋กœ ํƒ‘์Šน๊ฐ๋“ค์˜ salutation ๊ฐ’์— ์˜คํƒˆ์ž๊ฐ€ ์žˆ์–ด replace([์˜คํƒˆ์ž], [์ˆ˜์ •๋œ ๊ธ€์ž]) ๋ฉ”์†Œ๋“œ๋ฅผ ํ†ตํ•ด ์ˆ˜์ •ํ•ด์ค€๋‹ค. ์ˆ˜์ •ํ•  ๋ฌธ์ž์—ด๊ณผ ๋ฐ”๋กœ์žก์€ ๋ฌธ์ž์—ด์„ ๋ฆฌ์ŠคํŠธ ์ž๋ฃŒ๊ตฌ์กฐ๋กœ ์—ฌ๋Ÿฌ๊ฐœ ๋ถ€์—ฌํ•  ์ˆ˜ ์žˆ๋‹ค.

 

df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
                ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

# Initial ๋ณ€์ˆ˜ ๊ฐ’๋“ค๋กœ ๊ทธ๋ฃนํ•‘ํ•œ ํ›„ ํ‰๊ท  Age ๊ตฌํ•˜๊ธฐ
df.groupby('Initial').avg('Age').collect()

 

collect() ์˜ ๊ฒฐ๊ณผ ํ™”๋ฉด

 

์œ„ ์ฝ”๋“œ์—์„œ collect() ๊ฐ€ ๋“ฑ์žฅํ•œ๋‹ค. collect() ๋„ select() ์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ์ผ๋ถ€์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•˜๋Š” ์—ญํ• ์„ ํ•œ๋‹ค. ํ•˜์ง€๋งŒ ์‚ฌ์šฉ๋ชฉ์ ์— ๋”ฐ๋ผ ์•ฝ๊ฐ„์˜ ์ฐจ์ด์ ์ด ์กด์žฌํ•œ๋‹ค. collect() ๋ฉ”์†Œ๋“œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ฒฝ์šฐ์— ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ๊ถŒ์žฅ๋œ๋‹ค.

 

  • ์ ์€ ์–‘์˜ ๋ฐ์ดํ„ฐ์…‹์„ ๋ฐ˜ํ™˜ํ•  ๋•Œ ์ž์ฃผ ์‚ฌ์šฉ. ํฐ ๋ฐ์ดํ„ฐ์…‹์„ ๋กœ๋“œํ•  ๋•Œ๋Š” ๋ฉ”๋ชจ๋ฆฌ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฐ€๋Šฅ์„ฑ์ด ๋†’๋‹ค.
  • ๋ณดํ†ต filter(), group(), count() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ ํ›„ ๊ฐ™์ด ์ž์ฃผ ์‚ฌ์šฉ๋œ๋‹ค.
  • ์ธ์ž์— ์•„๋ฌด๊ฒƒ๋„ ๋„ฃ์ง€ ์•Š์œผ๋ฉด ์ฆ‰, collect() ์ž์ฒด๋กœ๋งŒ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด ํ•ด๋‹น ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ๋ชจ๋“  row๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
  • collect() ๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” ํƒ€์ž…์€ Spark์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๊ฐ€ ์•„๋‹Œ PySpark์˜ Row ํƒ€์ž…์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋‹ด๊ธด list๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. list ์ž๋ฃŒ๊ตฌ์กฐ์ด๊ธฐ ๋•Œ๋ฌธ์— loop ๋ฌธ์œผ๋กœ ํ™œ์šฉ์ด ๊ฐ€๋Šฅํ•˜๋‹ค. ๋˜ํ•œ PySpark์˜ Row ํƒ€์ž…์€ Python์˜ named Tuple ํ˜•ํƒœ์ด๋‹ค.

์œ„ ์ฝ”๋“œ์—์„œ ํƒ‘์Šน๊ฐ ์ด๋ฆ„์˜ salutation ๋ณ„๋กœ Age ํ‰๊ท ๊ฐ’์„ ๊ณ„์‚ฐํ–ˆ๊ณ  ์ด๋ฅผ ์ด์šฉํ•ด ๊ฒฐ์ธก์น˜๋ฅผ ๋Œ€์ฒดํ•ด๋ณด์ž.

 

df = df.withColumn('Age',
                  when((df['Initial'] == 'Miss') & (df['Age'].isNull()),
                      22).otherwise(df['Age']))
df = df.withColumn('Age',
                  when((df['Initial'] == 'Other') & (df['Age'].isNull()),
                      46).otherwise(df['Age']))
df = df.withColumn('Age',
                  when((df['Initial'] == 'Master') & (df['Age'].isNull()),
                      5).otherwise(df['Age']))
df = df.withColumn('Age',
                  when((df['Initial'] == 'Mr') & (df['Age'].isNull()),
                      33).otherwise(df['Age']))
df = df.withColumn('Age',
                  when((df['Initial'] == 'Mrs') & (df['Age'].isNull()),
                      36).otherwise(df['Age']))

 

๊ฒฐ์ธก์น˜๊ฐ€ ์กด์žฌํ•˜๋Š” ์นผ๋Ÿผ์ด Age ์ด์™ธ์— Embarked ์—๋„ ์žˆ์—ˆ๋Š”๋ฐ Embarked์˜ ๊ฒฐ์ธก์น˜๋ฅผ ํ•œ ๋ฒˆ ์‚ดํŽด๋ณด์ž.

 

# Embarked ๋ณ€์ˆ˜์—๋„ ๊ฒฐ์ธก์น˜๊ฐ€ 2๊ฐœ ์žˆ์—ˆ๋Š”๋ฐ ๋ฌด์—‡์ธ์ง€ ํ™•์ธํ•ด๋ณด๊ธฐ
df.groupBy('Embarked').count().show()

 

Embarked์˜ ๊ฒฐ์ธก์น˜๋Š” 2๊ฐœ์ด๋‹ค.

 

๊ฒฐ์ธก์น˜๊ฐ€ 2๊ฐœ๋ฐ–์— ๋˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๊ฒฐ์ธก์น˜๋Š” Embarked์˜ ์ตœ๋นˆ๊ฐ’์ธ 'S' ๊ฐ’์œผ๋กœ ๋Œ€์ฒดํ•ด์ฃผ์ž. ๊ฒฐ์ธก์น˜๋ฅผ ๋Œ€์ฒดํ•˜๋Š” ๋˜ ๋‹ค๋ฅธ ๋ฉ”์†Œ๋“œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค. df.na.fill({'column' : 'value'})

 

# Embarked์˜ ๊ฒฐ์ธก์น˜๋Š” ์ตœ๋นˆ๊ฐ’์ธ 'S'๋กœ ๋Œ€์ฒดํ•ด์ฃผ๊ธฐ
df = df.na.fill({"Embarked": "S"})
# ๊ฒฐ์ธก์น˜๊ฐ€ ์ฑ„์›Œ์กŒ๋Š”์ง€ ๋‹ค์‹œ ํ™•์ธ
df.groupBy('Embarked').count().show()

 

๊ฒฐ์ธก์น˜๋ฅผ ์ž˜ ๋Œ€์ฒดํ–ˆ๋‹ค!

4. Feature Engineering ํ•˜๊ธฐ

์œ„ 3๋ฒˆ์ด๋ž‘ ์ผ๋ถ€ ์ค‘๋ณต๋˜๋Š” ๋‚ด์šฉ์ผ ์ˆ˜๋„ ์žˆ๋‹ค. 3๋ฒˆ ๋ชฉ์ฐจ์—์„œ Name ์นผ๋Ÿผ์„ ์ด์šฉํ•ด Initial ์ด๋ผ๋Š” ์ƒˆ๋กœ์šด ํŒŒ์ƒ๋ณ€์ˆ˜๋ฅผ ์ƒ์„ฑํ•œ ๊ฒƒ์ฒ˜๋Ÿผ withcolumn, when, otherwise ๋ฅผ ์‚ฌ์šฉํ•ด Feature Engineering ์„ ์ˆ˜ํ–‰ํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค.

 

# Family size๋ผ๋Š” ํŒŒ์ƒ๋ณ€์ˆ˜ ์ƒ์„ฑ
df = df.withColumn("Family_Size",
                  col('SibSp')+col('Parch')) # df['SibSp']๋„ ๊ฐ€๋Šฅ!

# Alone์ด๋ผ๋Š” Binary ํŒŒ์ƒ๋ณ€์ˆ˜ ์ƒ์„ฑํ•˜๋Š”๋ฐ, ์šฐ์„  0์œผ๋กœ ๋‹ค ํ•ด๋†“๊ธฐ
df = df.withColumn('Alone', lit(0))
# ์กฐ๊ฑด์— ๋งž๊ฒŒ Alone ๋ณ€์ˆ˜๊ฐ’ ๋ณ€๊ฒฝ
df = df.withColumn('Alone',
                  when(col('Family_Size') == 0, 1)\
                  .otherwise(col('Alone')))

 

๋‹ค์Œ์€ ๋ฌธ์ž์—ด๋กœ ๋˜์–ด์žˆ๋Š” ๋ณ€์ˆ˜์ธ Sex, Embarked, Initial ๊ฐ’์„ ์ˆซ์ž๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๋Š” ์ฆ‰, Label Encoding์„ ํ•ด์ฃผ๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž. StringIndexer() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค. ์ด๋ฆ„ ๊ทธ๋Œ€๋กœ String์„ Index(์ˆซ์ž)๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค. ์ž์„ธํ•œ ์‚ฌ์šฉ๋ฒ•์€ ํ•˜๋‹จ์˜ ์ฝ”๋“œ๋ฅผ ์ฐธ๊ณ ํ•˜์ž.

 

convert_cols = ['Sex', 'Embarked', 'Initial']

# ์ถ”ํ›„์— IndexToStringํ• ๋ ค๋ฉด indexer ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋จ! 
indexer = [StringIndexer(inputCol=col,
                         outputCol=col+'_index').fit(df) for col in convert_cols]
for i in indexer:
    print(i)
    print('-'*80)

print(type(indexer))

 

์นผ๋Ÿผ ๋ณ„๋กœ StringIndexer๊ฐ€ ๋งŒ๋“ค์–ด์ง„๋‹ค.

 

์œ„ ๊ฒฐ๊ณผ๊ฐ’ ํ™”๋ฉด์—์„œ uid๋ฅผ ๋ณด๋ฉด ์„ธ ๊ฐœ๊ฐ€ ๋ชจ๋‘ ๊ฐ๊ธฐ ๋‹ค๋ฅธ ๊ฐ’์„ ๊ฐ–๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ์ฆ‰, ์นผ๋Ÿผ ๋ณ„๋กœ ๊ณ ์œ ํ•œ StringIndexer๊ฐ€ ๋งŒ๋“ค์–ด์กŒ์Œ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ์ด์ œ fit() ๊ณผ transform() ๋ฉ”์†Œ๋“œ๋ฅผ ์ด์šฉํ•ด Label Encoding ์„ ์ˆ˜ํ–‰ํ•ด๋ณด์ž.

 

# Pipeline์„ ์ด์šฉํ•ด stage์—๋‹ค๊ฐ€ ์‹คํ–‰ ๊ณผ์ • ๋‹ด์•„ ๋„˜๊ธฐ๊ธฐ
pipeline = Pipeline(stages=indexer)
df = pipeline.fit(df).transform(df)

5. ๋ถˆํ•„์š”ํ•œ ์นผ๋Ÿผ๋“ค ์‚ญ์ œ ํ›„ ์ตœ์ข… Feature๋“ค์„ Vector๋กœ ๋ณ€ํ™˜ํ•˜๊ธฐ

PySpark์—์„œ ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ์„ ์‚ญ์ œํ•˜๋ ค๋ฉด Pandas์™€๋Š” ์•ฝ๊ฐ„ ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉํ•ด์•ผ ํŽธ๋ฆฌํ•˜๋‹ค. ์šฐ์„  2๊ฐ€์ง€ ๋ฐฉ๋ฒ•์˜ ์ฐจ์ด์ ์„ ์•Œ์•„๋ณด์ž.

 

  • Pandas : df.drop(['columnA', 'columnB', 'columnC'], axis=1)
  • PySpark : df.drop('columnA', 'columnB', 'columnC')

 

์ฆ‰, drop() ๋ฉ”์†Œ๋“œ ์•ˆ์— [](๋ฆฌ์ŠคํŠธ)๊ฐ€ ๋“ค์–ด๊ฐ€๋Š”์ง€ ์—ฌ๋ถ€์˜ ์ฐจ์ด๋‹ค. PySpark๋Š” ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ์„ ์‚ญ์ œํ•  ๋•Œ๋„ [](๋ฆฌ์ŠคํŠธ)๋กœ ๊ฐ์‹ธ์ฃผ์–ด์„  ์•ˆ๋œ๋‹ค. ๋”ฐ๋ผ์„œ Native Python์˜ ๊ธฐ๋Šฅ ์ค‘ ํ•˜๋‚˜์ธ *(unpacking) ์„ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

unpacking์„ ์‚ฌ์šฉํ•˜๋Š” ์ด์œ ๋Š” ๋ฌผ๋ก  ๋ถˆํ•„์š”ํ•œ ์นผ๋Ÿผ์ด ํ•œ, ๋‘๊ฐœ ์ •๋„ ์ผ ๋•Œ๋Š” ์ง์ ‘ ์ •์˜ํ•ด์ฃผ์–ด๋„ ๋˜์ง€๋งŒ ๋งŒ์•ฝ 100๊ฐœ, 200๊ฐœ๋กœ ๋Š˜์–ด๋‚œ๋‹ค๋ฉด ๋ถˆํ•„์š”ํ•œ ์นผ๋Ÿผ๋“ค๋งŒ ๋ฆฌ์ŠคํŠธ๋กœ ์ถ”์ถœํ•œ ํ›„ unpacking์„ ์ˆ˜ํ–‰ํ•˜๋ฉด ์ผ์ผ์ด ์ž…๋ ฅํ•˜๋Š” ์‹œ๊ฐ„์„ ์ค„์ผ ์ˆ˜ ์žˆ์„ ๊ฒƒ์ด๋‹ค.

 

un_cols = ["PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial"]

df = df.drop(*un_cols)
print("์‚ญ์ œ ํ›„ ๋‚จ์€ ์นผ๋Ÿผ๋“ค:", df.columns)

 

์ด์ œ ๋ถˆํ•„์š”ํ•œ ๋ณ€์ˆ˜๋“ค๋„ ์‚ญ์ œํ–ˆ๊ณ  ์ตœ์ข… ๋‚จ์€ Feature๋“ค์„ Vector๋กœ ๋ณ€ํ™˜์‹œ์ผœ ๋จธ์‹ ๋Ÿฌ๋‹ ๋ชจ๋ธ์— ์ž…๋ ฅ์‹œํ‚ฌ ์ค€๋น„๋ฅผ ํ•ด๋ณด์ž. VectorAssembler() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋˜๋Š”๋ฐ, ํŒŒ๋ผ๋ฏธํ„ฐ ์ธ์ž๊ฐ€ StringIndexer()๋ž‘ ๋น„์Šทํ•˜์ง€๋งŒ ์„ธ๋ถ€์ ์œผ๋กœ ๋‹ค์Œ๊ณผ ๊ฐ™์€ 2๊ฐ€์ง€ ์ฐจ์ด์ ์ด ์กด์žฌํ•œ๋‹ค.

 

  1. ํŒŒ๋ผ๋ฏธํ„ฐ ์ธ์ž ์ค‘ inputCol ๋์— s๊ฐ€ ๋ถ™์€ inputCols ์ด๋‹ค.
  2. VectorAssembler๋ฅผ ์ •์˜ํ•ด์ฃผ๊ณ  ์ˆ˜ํ–‰ํ•ด ์ค„ ๋•Œ fit์„ ํ•˜์ง€ ์•Š๊ณ  ๋ฐ”๋กœ transform์„ ์ˆ˜ํ–‰ํ•ด์ค€๋‹ค.
feature = VectorAssembler(inputCols = df.columns[1:],
                         outputCol='features')
feature_vector = feature.transform(df) # ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜
print('feature type:', type(feature))
print('feature_vector type', type(feature_vector))

feature_vector.limit(3).toPandas()

 

Feature๋“ค์„ Vector๋กœ ๋ณ€ํ™˜ํ•˜๊ธฐ

 

์œ„ ๊ฒฐ๊ณผ์˜ ๋นจ๊ฐ„์ƒ‰ ๋„ค๋ชจ์นธ์„ ๋ณด๊ฒŒ ๋˜๋ฉด ๋ชจ๋“  Feature๋“ค์— ๋Œ€ํ•ด ํ•˜๋‚˜์˜ ๋ฒกํ„ฐ๋กœ ๋งŒ๋“  ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ๊ฒฐ๊ณผ๊ฐ’์„ ํ™•์ธํ•˜๋‹ค๊ฐ€ ํŠน์ดํ•œ ์ ์„ ๋ฐœ๊ฒฌํ–ˆ๋‹ค. features ๊ฐ’๋“ค ์ค‘ ์–ด๋–ค ๊ฒƒ๋“ค์€ Tuple๋กœ ๊ฐ์‹ธ์ง„ ๋ฒกํ„ฐ๊ฐ€, ์–ด๋–ค ๊ฒƒ๋“ค์€ List๋กœ ๊ฐ์‹ธ์ง„ ๋ฒกํ„ฐ๊ฐ€ ๋“ค์–ด์žˆ์—ˆ๋‹ค. ๊ตฌ๊ธ€๋ง์„ ํ•ด๋ด๋„ ๋ชจ๋ฅด๊ธฐ์— StackOverflow์— ์งˆ๋ฌธ์„ ์˜ฌ๋ ค๋†“๊ณ  ๋‹ต๋ณ€์„ ํ˜„์žฌ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์žˆ๋‹ค..(ํ˜น์‹œ ์•„์‹œ๋Š” ๋ถ„ ์žˆ๋‹ค๋ฉด ๋Œ“๊ธ€๋กœ ๋‹ต๋ณ€ํ•ด์ฃผ์‹œ๋ฉด ๋„ˆ๋ฌด๋‚˜๋„ ๊ฐ์‚ฌํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค!)

6. Train, Test ๋ฐ์ดํ„ฐ ๋ถ„ํ• 

์ด์ œ Feature๋“ค์„ ๋ฒกํ„ฐํ™” ์‹œ์ผฐ๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šต, ํ…Œ์ŠคํŠธ์šฉ์œผ๋กœ ๋ถ„ํ• ํ•ด๋ณด์ž. randomSplit([train_ratio, test_ratio]) ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

titanic_df = feature_vector.select(['features', 'Survived'])

# split train, test
(train_df, test_df) = titanic_df.randomSplit([0.8, 0.2], seed=42)

7. ๋จธ์‹ ๋Ÿฌ๋‹ ๋ถ„๋ฅ˜ ๋ชจ๋ธ ๋งŒ๋“ค๊ธฐ

์ด์ œ ์ด์ง„ ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ ๋งŒ๋“ค์–ด๋ณด์ž. PySpark์—์„œ๋Š” ParamGridBuilder() ๋ฅผ ํ†ตํ•ด ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹์„ ์ˆ˜ํ–‰ํ•˜๋ฉด์„œ TrainValidationSplit() ๋ฉ”์†Œ๋“œ๋กœ ๊ต์ฐจ๊ฒ€์ฆ์„ ๋™์‹œ์— ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

# ๋ถ„๋ฅ˜ ๋ชจ๋ธ 
from pyspark.ml.classification import LogisticRegression
# ํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ & ๊ต์ฐจ ๊ฒ€์ฆ
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.tuning import CrossValidator
# ํŒŒ์ดํ”„๋ผ์ธ
from pyspark.ml import Pipeline
# ๋ฉ”ํŠธ๋ฆญ ์–ป๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# ROC AUC(Sklearn)
from sklearn.metrics import roc_curve, auc


# ๋ชจ๋ธ ์ •์˜
lr = LogisticRegression(labelCol='Survived')

# ํŠœ๋‹ํ•  ํŒŒ๋ผ๋ฏธํ„ฐ grid ์ •์˜
                                    # model.parameter ์‹์œผ๋กœ ์ •์˜
paramGrid = ParamGridBuilder().addGrid(lr.regParam,
                                      (0.01, 0.1))\
                              .addGrid(lr.maxIter,
                                      (5, 10))\
                              .addGrid(lr.tol,
                                      (1e-4, 1e-5))\
                              .addGrid(lr.elasticNetParam,
                                      (0.25, 0.75))\
                              .build()

# ๊ต์ฐจ๊ฒ€์ฆ ์ •์˜ - Pipeline์‹์œผ๋กœ ์ •์˜
tvs = TrainValidationSplit(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='Survived'),
                          trainRatio=0.8)
# ํ•™์Šต์€ fit์œผ๋กœ!
model = tvs.fit(train_df)
# ํ‰๊ฐ€๋Š” transform์œผ๋กœ!
model_prediction = model.transform(test_df)

# ๋ฉ”ํŠธ๋ฆญ ํ‰๊ฐ€
print('Accuracy:',
     MulticlassClassificationEvaluator(labelCol='Survived',
                                      metricName='accuracy').evaluate(model_prediction))
print('Precision:',
     MulticlassClassificationEvaluator(labelCol='Survived',
                                      metricName='weightedPrecision').evaluate(model_prediction))

 

Logistic Regression์˜ ๋ถ„๋ฅ˜ ์„ฑ๋Šฅ

 

ํ•„์ž๋Š” ๋˜ ๋‹ค๋ฅธ ๋ถ„๋ฅ˜ ๋ฉ”ํŠธ๋ฆญ์œผ๋กœ AUC Score๋ฅผ ๊ณ„์‚ฐํ•˜๊ธฐ ์œ„ํ•ด ๋ฐ์ดํ„ฐ๊ฐ€ Positive(1)๋กœ ์˜ˆ์ธก๋  ํ™•๋ฅ ๊ฐ’์ด ํ•„์š”ํ–ˆ๋‹ค. ๊ทธ๋ž˜์„œ ์œ„ ์ฝ”๋“œ ์ค‘ model_prediction ๊ฐ์ฒด์— ํ• ๋‹น๋œ ๊ฐ’๋“ค์„ ๋กœ๋“œํ•ด ๋ณด์•˜๋‹ค.

 

model_prediction.show(10)

 

model_prediction ๋ณ€์ˆ˜์— ๋ฐ˜ํ™˜๋œ ๊ฐ’๋“ค

 

์œ„ ๊ฒฐ๊ณผ๊ฐ’ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์—์„œ ๋ณ€์ˆ˜๊ฐ€ ์˜๋ฏธํ•˜๋Š” ๋ฐ”๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

  • featrues : (10, [0,1,2,4,5], ... ) : 10๊ฐœ์˜ feature๊ฐ€ ์กด์žฌํ•˜๊ณ  ๊ฐ feature์˜ ๊ฐ’๋“ค์€ 0,1,2,4,5 ์ด๋‹ค. ์ด์— ๋Œ€ํ•œ ์„ค๋ช…์€ StackOverflow๋ฅผ ์ฐธ์กฐํ•ด๋ณด์ž.
  • rawPrediction : ํ•ด๋‹น feature๋ฅผ ํšŒ๊ท€ ๋ชจ๋ธ์— ๋„ฃ์—ˆ์„ ๋•Œ ๊ณ„์‚ฐ๋˜์–ด ๋‚˜์˜ค๋Š” Rawํ•œ ๊ฒฐ๊ณผ๊ฐ’
  • probability : rawPrediction ๊ฐ’์— ๋กœ์ง€์Šคํ‹ฑ ํ•จ์ˆ˜๋ฅผ ์ ์šฉํ•œ ํ›„ ๋ณ€ํ•œ๋œ ๊ฐ’. ์ฆ‰ 0๊ณผ 1์‚ฌ์ด์˜ ํ™•๋ฅ ๊ฐ’์œผ๋กœ ๋งคํ•‘๋œ ๊ฐ’
  • prediction : probability๊ฐ€ ํŠน์ • ์ž„๊ณ„๊ฐ’ ๊ธฐ์ค€์— ์˜ํ•ด 1 ๋˜๋Š” 0์œผ๋กœ ๋ถ„๋ฅ˜๋œ ํด๋ž˜์Šค(label)

8. ROC Curve ์‹œ๊ฐํ™”ํ•˜๊ณ  AUC Score ๊ณ„์‚ฐํ•˜๊ธฐ

์ด์ œ ์œ„ ํ…Œ์ด๋ธ”์—์„œ probability ์™€ Survived ์นผ๋Ÿผ๋“ค๋งŒ ๋ฝ‘์•„์„œ AUC ๊ณ„์‚ฐ์— ํ•„์š”ํ•œ label ๊ฐ’๊ณผ Positive(1)๋กœ ์˜ˆ์ธก๋  ํ™•๋ฅ ๊ฐ’๋งŒ์„ ๋ฝ‘์•„๋ณด์ž.

 

# Evaluate ROC metric
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark import SparkContext

# SparkContext๋ฅผ ๋งŒ๋“ค๊ธฐ
sc = SparkContext.getOrCreate()

# ROC ์ ์ˆ˜์ธ AUC๋ฅผ ๊ณ„์‚ฐ ์œ„ํ•ด Logistic๋ฅผ ์ ์šฉํ•ด ๋‚˜์˜จ ํ™•๋ฅ ๊ฐ’๊ณผ ๋ ˆ์ด๋ธ”๋งŒ ๊ฐ€์ ธ์˜ค๊ธฐ
results = model_prediction.select(['probability', 'Survived'])

# ํ™•๋ฅ ๊ฐ’ - ๋ ˆ์ด๋ธ” set ์ค€๋น„์‹œํ‚ค๊ธฐ
# collect()๋กœ ๋ชจ๋“  ๋ฐ์ดํ„ฐ row retrieve(๋ฐ˜ํ™˜) - ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜
results_collect = results.collect()

# named tuple ํ˜•์‹์ด๊ธฐ ๋•Œ๋ฌธ์— key ๊ฐ’์œผ๋กœ ์›ํ•˜๋Š” ๊ฐ’์„ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ๋‹ค!
print(results_collect[0])
print()
print('probability:', results_collect[0].probability)
print('Survived:', results_collect[0].Survived)

 

๊ฒฐ๊ณผํ™”๋ฉด

 

์œ„ ๊ฒฐ๊ณผํ™”๋ฉด์—์„œ ํŒŒ๋ž€์ƒ‰ ๋„ค๋ชจ์นธ์€ Negative(0)์œผ๋กœ ์˜ˆ์ธก ๋  ํ™•๋ฅ , ๋นจ๊ฐ„์ƒ‰ ๋„ค๋ชจ์นธ์€ Positive(1)๋กœ ์˜ˆ์ธก๋  ํ™•๋ฅ ์„ ์˜๋ฏธํ•œ๋‹ค. ๋”ฐ๋ผ์„œ AUC ๊ณ„์‚ฐ์„ ์œ„ํ•ด ํ•„์š”ํ•œ ๋ถ€๋ถ„์€ ๋นจ๊ฐ„์ƒ‰ ๋„ค๋ชจ์นธ๊ณผ Survived ๊ฐ’, ์ด 2๊ฐ€์ง€๋‹ค. ์ด์ œ ํ•„์š”ํ•œ ๊ฐ’์ด ์–ด๋””์— ์œ„์น˜ํ•œ์ง€ ์•Œ๊ฒŒ ๋˜์—ˆ์œผ๋‹ˆ ์ด๋ฅผ ์ด์šฉํ•ด ROC Curve ์™€ AUC ๊ณ„์‚ฐ์„ ํ•ด๋ณด์ž.

 

# Evaluate ROC metric
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark import SparkContext

# SparkContext๋ฅผ ๋งŒ๋“ค๊ธฐ
sc = SparkContext.getOrCreate()

# ROC ์ ์ˆ˜์ธ AUC๋ฅผ ๊ณ„์‚ฐ ์œ„ํ•ด Logistic๋ฅผ ์ ์šฉํ•ด ๋‚˜์˜จ ํ™•๋ฅ ๊ฐ’๊ณผ ๋ ˆ์ด๋ธ”๋งŒ ๊ฐ€์ ธ์˜ค๊ธฐ
results = model_prediction.select(['probability', 'Survived'])

# ํ™•๋ฅ ๊ฐ’ - ๋ ˆ์ด๋ธ” set ์ค€๋น„์‹œํ‚ค๊ธฐ
# collect()๋กœ ๋ชจ๋“  ๋ฐ์ดํ„ฐ row retrieve(๋ฐ˜ํ™˜) - ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜
results_collect = results.collect()
results_list = [(float(i.probability[1]),
                 float(i.Survived)) for i in results_collect]
# ์—ฌ๋Ÿฌ๊ฐœ์˜ ํŠœํ”Œ์ด ๋‹ด๊ธด list๋ฅผ RDD ์ž๋ฃŒ๊ตฌ์กฐ๋กœ ๋ณ€๊ฒฝ
scoreAndLabels = sc.parallelize(results_list)
# ROC metric ๊ณ„์‚ฐํ•˜๊ธฐ
metrics = metric(scoreAndLabels)
auc = metrics.areaUnderROC

# Visualize ROC Curve
from sklearn.metrics import roc_curve, auc

# roc_curve ๋Š” ์‹ค์ œ๊ฐ’, 1๋กœ์˜ ์˜ˆ์ธกํ™•๋ฅ ๊ฐ’์„ ์ธ์ž๋กœ ๋„ฃ์–ด์ฃผ๋ฉด FPR, TPR, ์ž„๊ณ—๊ฐ’์„ ๋ฐ˜ํ™˜ํ•ด์คŒ
fpr = []
tpr = []
roc_auc = []

y_test = [i[1] for i in results_list]
y_proba = [i[0] for i in results_list]

fpr, tpr, _ = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)

plt.figure()
# x์ถ•์—” Fall-out(FPR), y์ถ•์—” Recall(TPR)
plt.plot(fpr, tpr, label='ROC Curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title("Area under the ROC Curve")
plt.legend(loc='lower right')
plt.show()

 

ROC Curve ์™€ AUC Score ๊ณ„์‚ฐ

 

์ด๋ ‡๊ฒŒ Titanic ๋ฐ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ•œ ๋จธ์‹ ๋Ÿฌ๋‹ ๋ถ„๋ฅ˜ ๋ชจ๋ธ์„ PySpark๋ฅผ ํ™œ์šฉํ•ด ๋งŒ๋“ค๊ณ  ์„ฑ๋Šฅํ‰๊ฐ€๊นŒ์ง€ ์ง„ํ–‰ํ•ด๋ณด์•˜๋‹ค. ํ•ด๋‹น ํฌ์ŠคํŒ…์— ์‚ฌ์šฉ๋œ Logistic Regression ์ด์™ธ์— ์ถ”๊ฐ€์ ์œผ๋กœ ์‚ฌ์šฉํ•ด๋ณธ Random Forest, XGBoost ๋ชจ๋ธ์€ Kaggle ๋…ธํŠธ๋ถ ์›๋ณธ์ด๋‚˜ ํ•„์ž์˜ Github ์ฝ”๋“œ๋ฅผ ์‚ดํŽด๋ณด๋ฉด ๋„์›€์ด ๋  ๊ฒƒ์ด๋‹ค. ๊ธฐ๋ณธ์ ์ธ ๋ชจ๋ธ ๋นŒ๋”ฉ, ํ‰๊ฐ€ ํ”„๋ ˆ์ž„์€ ๋™์ผํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ชจ๋ธ ํŒŒ๋ผ๋ฏธํ„ฐ๋งŒ ๋ณ€๊ฒฝํ•˜๋ฉด ๋œ๋‹ค.

๋ฐ˜์‘ํ˜•