๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] PySpark๋กœ Regression ๋ชจ๋ธ ๋งŒ๋“ค๊ธฐ

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ๋ณธ ํฌ์ŠคํŒ…์€ PySpark๋ฅผ ํ™œ์šฉํ•œ Kaggle Notebook์„ ํ•„์‚ฌํ•˜๋ฉด์„œ ๋ฐฐ์šฐ๊ฒŒ ๋œ ์ฝ”๋“œ ๋‚ด์šฉ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํฌ์ŠคํŒ…ํ–ˆ์Œ์„ ์•Œ๋ ค๋“œ๋ฆฝ๋‹ˆ๋‹ค. ๋˜ํ•œ ์•ž์œผ๋กœ ์†Œ๊ฐœ๋  PySpark์˜ ๋ฌธ๋ฒ•์— ๋Œ€ํ•ด์„œ ์ƒ์†Œํ•˜์‹œ๋‹ค๋ฉด ์—ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ•ด ๊ฐ„๋‹จํ•œ ์˜ˆ์‹œ๋ฅผ ํ†ตํ•ด ์ดํ•ด๋ฅผ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” PySpark ๊ทธ ์ค‘์—์„œ๋„ Spark SQL ๊ณผ Spark MLlib์„ ํ™œ์šฉํ•œ ๋จธ์‹ ๋Ÿฌ๋‹ ํšŒ๊ท€ ๋ชจ๋ธ์„ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์†Œ๊ฐœํ•˜๋ ค๊ณ  ํ•œ๋‹ค. ํ™œ์šฉํ•œ ๋ฐ์ดํ„ฐ๋Š” California Housing ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ–ˆ๋‹ค. ๋ฐ์ดํ„ฐ๋Š” ์—ฌ๊ธฐ๋ฅผ ๋ˆŒ๋Ÿฌ ํ•˜๋‹จ์˜ Input ๋ชฉ์ฐจ๋ฅผ ๋ณด๋ฉด ๋‹ค์šด๋กœ๋“œ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

 

PySpark๋กœ ์ฃผํƒ ๊ฐ€๊ฒฉ์„ ์˜ˆ์ธกํ•˜๋Š” Regression ๋ชจ๋ธ์„ ๋งŒ๋“ค์–ด๋ณด์ž!

 

์ด์ „ ํฌ์ŠคํŒ…์—์„œ ์‹ค์Šตํ•ด๋ดค๋˜ ๋ถ„๋ฅ˜ ๋ชจ๋ธ๊ณผ ํฐ ํ”„๋ ˆ์ž„์ด ๋‹ค๋ฅด์ง„ ์•Š๋‹ค. ๋”ฐ๋ผ์„œ ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” ์ˆ˜์น˜ํ˜• ๋ณ€์ˆ˜์— ์ฃผ๋กœ ์ ์šฉํ•˜๋Š” ๊ธฐ์ˆ ํ†ต๊ณ„๋Ÿ‰ ๊ฐ’์„ ๋ณด๋Š” ๋ฐฉ๋ฒ•์ด๋‚˜ Scaling ๋ฐฉ๋ฒ•์„ PySpark๋กœ ์–ด๋–ป๊ฒŒ ํ•˜๋Š”์ง€์— ๋Œ€ํ•ด ์ดˆ์ ์„ ๋งž์ถ”์–ด ์‹ค์Šตํ•ด ๋‚˜๊ฐ€๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค.

1. ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์™€ ๋ฐ์ดํ„ฐ ๋กœ๋“œ

ํ•ญ์ƒ ๊ทธ๋ž˜์™”๋“ฏ์ด ๋ถ„์„ ๋ฐ ๋ชจ๋ธ๋ง์„ ํ•˜๊ธฐ ์œ„ํ•ด ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋“ค๊ณผ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•ด๋ณด์ž. ๋จผ์ € ํ•„์š”ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋“ค์ด๋‹ค.

 

import os
import pandas as pd
import numpy as np

# pyspark for objecy, sql
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

# pyspark for ML
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler

import seaborn as sns
import matplotlib.pyplot as plt

# Setting for visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid',
       rc={'figure.figsize': (18, 4)})
rcParams['figure.figsize'] = 18, 4

# ๋…ธํŠธ๋ถ ์žฌ์‹คํ–‰์„ ๋Œ€๋น„ํ•˜๊ธฐ ์œ„ํ•ด ๋žœ๋ค ์‹œ๋“œ ์„ค์ •ํ•ด๋†“๊ธฐ
rnd_seed = 23
np.random.seed = rnd_seed
np.random.set_state = rnd_seed

 

๊ทธ๋Ÿฐ๋ฐ ์ด๋ฒˆ ๋ฐ์ดํ„ฐ๋Š” ํŠน์ดํ•˜๊ฒŒ ๋ฐ์ดํ„ฐ๊ฐ€ data ํ™•์žฅ๋ช…์œผ๋กœ ๋œ ํŒŒ์ผ์ด๋ฉฐ ๋ฐ์ดํ„ฐ์˜ header ์ฆ‰, ์นผ๋Ÿผ ๋„ค์ž„๋“ค์ด ์—†๊ธฐ ๋•Œ๋ฌธ์— ์‚ฌ์šฉ์ž๊ฐ€ ์ง์ ‘ ์Šคํ‚ค๋งˆ๋ฅผ ์ •์˜ํ•ด์ฃผ๊ณ  ํ”„๋กœ๊ทธ๋ž˜๋ฐ์œผ๋กœ ๋ช…์‹œํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค. ์ด์— ๋Œ€ํ•œ ๋ถ€๋ถ„์€ ์ด์ „ ํฌ์ŠคํŒ… 4๋ฒˆ ๋ชฉ์ฐจ์—์„œ ๋‹ค๋ฃจ์—ˆ์œผ๋‹ˆ ์ฐธ๊ณ  ๋ฐ”๋ž€๋‹ค.

 

## ๋ฐ์ดํ„ฐ ์Šค๋ฏธ์นด ์ง์ ‘ ๋ช…์‹œํ•ด์ฃผ๊ธฐ ##
data_path = '/Users/younghun/Desktop/gitrepo/data/cal_housing.data'
# data์˜ ์นผ๋Ÿผ๋ช…์„ ์Šคํ‚ค๋งˆ๋กœ ์ •์˜ํ•ด์ฃผ๊ธฐ
schema_string = 'long lat medage totrooms totbdrms pop houshlds medinc medhv'

fields = [StructField(field, FloatType(), True)for field in schema_string.split()]
schema = StructType(fields)

## ๋ฐ์ดํ„ฐ ๋กœ๋“œ๋ฅผ ์œ„ํ•œ SparkSession ๋งŒ๋“ค์–ด์ฃผ๊ธฐ ##
spark = SparkSession.builder.master('local[2]')\
        .appName('Linear-Regression-California-Housing')\
        .getOrCreate()

# ๋ฐ์ดํ„ฐํŒŒ์ผ ๋กœ๋“œ
# cache ๋ฉ”์†Œ๋“œ๋ฅผ ์ด์šฉํ•ด์„œ ๋ฉ”๋ชจ๋ฆฌ์— keepํ•ด๋†“๊ธฐ
housing_df = spark.read.csv(path=data_path, schema=schema).cache()

# ์ƒ์œ„ 5๊ฐœ ํ–‰ ๋ฏธ๋ฆฌ ๋ณด๊ธฐ -> ํ•˜๋‚˜์˜ Row๊ฐ€ namedTuple ํ˜•ํƒœ๋กœ ๋˜์–ด ์žˆ์Œ..!
housing_df.take(5)

 

์œ„ ์ฝ”๋“œ์—์„œ ํŠน์ดํ•œ ๋ถ€๋ถ„์€ take()๋ผ๋Š” ๋ถ€๋ถ„์„ ์‚ฌ์šฉํ•œ ๊ฒƒ์ธ๋ฐ, ์ด์ „์— ์‚ฌ์šฉํ–ˆ๋˜ limit๊ณผ show์™€๋Š” ๋‹ฌ๋ฆฌ ๋ฐ์ดํ„ฐ ๋ฏธ๋ฆฌ๋ณด๊ธฐ๋ฅผ Python named tuple ํ˜•ํƒœ์ธ DataFrame์˜ Row ํ˜•์‹์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด์—ฌ์ค€๋‹ค.

 

take๋Š” Row ํƒ€์ž…์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฏธ๋ฆฌ๋ณด์—ฌ์ค€๋‹ค.

 

๋ฐ˜๋ฉด์— show ๋ฉ”์†Œ๋“œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋ณด์—ฌ์ค€๋‹ค.

 

show๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฏธ๋ฆฌ๋ณด์—ฌ์ค€๋‹ค.

2. ๋ฐ์ดํ„ฐ ํƒ์ƒ‰

์ด์ „ ์‹ค์Šต์—์„œ ์ด๋ฏธ ์‚ฌ์šฉํ•ด๋ณด์•˜๋˜ ๋ฉ”์†Œ๋“œ๋“ค์€ ์„ค๋ช…์„ ์ƒ๋žตํ•˜๊ณ  ์ƒˆ๋กญ๊ฒŒ ๋“ฑ์žฅํ•˜๋Š” ๋ฉ”์†Œ๋“œ์— ๋Œ€ํ•ด์„œ๋งŒ ์„ค๋ช…ํ•˜๋ ค๊ณ  ํ•œ๋‹ค.

 

  • df.sort('columnA', ascending=True) : columnA ๊ฐ’์„ ๊ธฐ์ค€์œผ๋กœ ์˜ค๋ฆ„์ฐจ์ˆœ์œผ๋กœ ์ •๋ ฌํ•œ๋‹ค. ๋””ํดํŠธ๊ฐ’์€ True๋กœ False๋ฉด ๋‚ด๋ฆผ์ฐจ์ˆœ์œผ๋กœ ์ •๋ ฌํ•œ๋‹ค. ์ด๋Š” Pandas API์˜ df.sort_values(by='columnA', ascending=True) ์™€ ๋น„์Šทํ•˜๋‹ค.
  • df.describe() : ์ˆ˜์น˜ํ˜• ๋ณ€์ˆ˜๋“ค์˜ ๊ธฐ์ˆ ํ†ต๊ณ„๋Ÿ‰์„ ๋ณด์—ฌ์ฃผ๋Š” ๋ฉ”์†Œ๋“œ๋กœ Pandas API ํ˜•ํƒœ์™€ ๋งค์šฐ ๋น„์Šทํ•˜๋‹ค. ๋‹จ, PySpark์—์„œ๋Š” ๊ธฐ์ˆ ํ†ต๊ณ„๋Ÿ‰ ๊ฐ’์„ ์ง์ ‘ ๊ด€์ฐฐํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด show()์™€ ๊ฐ™์€ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ๋ฉ”์†Œ๋“œ๋ฅผ ๋ถ™์—ฌ df.describe().show(10) ์ด๋Ÿฐ ์‹์œผ๋กœ ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

# ๊ทธ๋ฃนํ•‘ํ•ด์„œ ์ง‘๊ณ„ํ•ด๋ณด๊ธฐ
result_df = housing_df.groupBy('medage').count()
# ๊ฐ’ ์ •๋ ฌ ๊ธฐ์ค€ ์„ค์ •ํ•ด์ฃผ๊ณ  ๋‚ด๋ฆผ์ฐจ์ˆœ์œผ๋กœ ์ •๋ ฌ
result_df.sort('medage', ascending=False).show(5)

# ์ˆ˜์น˜ํ˜• ๋ณ€์ˆ˜๋“ค ๊ธฐ์ˆ ํ†ต๊ณ„๋Ÿ‰ ์‚ดํŽด๋ณด๊ธฐ
housing_df.describe().show(10)

 

๋‹ค์Œ์€ ์ˆ˜์น˜ํ˜• ๋ณ€์ˆ˜ ๊ฐ’๋“ค์˜ ์ž๋ฆฟ์ˆ˜๋ฅผ ์ค„์—ฌ์ฃผ๋Š” ๊ฒƒ๊ณผ ๊ฐ™์ด SQL์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์—ฌ๋Ÿฌ๊ฐ€์ง€ ํ•จ์ˆ˜๊ฐ€ ๋‹ด๊ฒจ์žˆ๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•ด๋ณด์ž. ํ•˜๋‹จ์˜ ์ฝ”๋“œ์—์„œ F๋Š” pyspark.sql.functions๋ฅผ F๋กœ alias ์‹œํ‚จ ๊ฒƒ์ด๋‹ค.

 

# ๋ฐ˜ํ™˜๋˜๋Š” ํ†ต๊ณ„๋Ÿ‰ ๊ฐ’๋“ค์„ ์†Œ์ˆ˜์  ์ œ๊ฑฐํ•˜๊ฑฐ๋‚˜ ํ•˜๋Š” ๋“ฑ ์ปค์Šคํ„ฐ๋งˆ์ด์ง•ํ•ด์„œ ์ถœ๋ ฅ
(housing_df.describe()).select('summary',
                              F.round('medage', 4).alias('medage'),
                              F.round('totrooms', 4).alias('totrooms'),
                              F.round('totbdrms', 4).alias('totbdrms'),
                              F.round('pop', 4).alias('pop'),
                              F.round('houshlds', 4).alias('houshlds'),
                              F.round('medinc', 4).alias('medinc'),
                              F.round('medhv', 4).alias('medhv')).show(10)

 

round ํ•จ์ˆ˜๋ฅผ ํ™œ์šฉํ•ด ๊น”๋”ํ•˜๊ฒŒ ์ˆ˜์น˜๊ฐ’์„ ์ •๋ฆฌํ•ด์ฃผ์—ˆ๋‹ค.

3. Feature Engineering

๋‹ค์Œ์€ ๊ธฐ์กด ๋ณ€์ˆ˜์—์„œ ํŒŒ์ƒ๋ณ€์ˆ˜๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์ž‘์—…์ธ๋ฐ, ์ด์ „์— ํ–ˆ๋˜ ๊ฒƒ์ฒ˜๋Ÿผ withColumn ๊ตฌ๋ฌธ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ƒˆ๋กœ์šด ํŒŒ์ƒ๋ณ€์ˆ˜๋ฅผ ์ƒ์„ฑํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

 

# ํŒŒ์ƒ๋ณ€์ˆ˜ ์ƒ์„ฑํ•˜๊ธฐ
housing_df = (housing_df.withColumn('rmsperhh',
                                   F.round(col('totrooms')/col('houshlds'), 2))\
                        .withColumn('popperhh',
                                   F.round(col('pop')/col('houshlds'), 2))\
                        .withColumn('bdrmsperrm',
                                   F.round(col('totbdrms')/col('totrooms'), 2)))
housing_df.show(5)

 

์ƒˆ๋กœ์šด ํŒŒ์ƒ๋ณ€์ˆ˜ ์ƒ์„ฑํ•˜๊ธฐ

 

์ด์ œ ํ•„์š”ํ•œ ๋ณ€์ˆ˜๋“ค๋งŒ ๊ณจ๋ผ์„œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ƒˆ๋กœ ํ• ๋‹นํ•œ ํ›„ ์ˆ˜์น˜ํ˜• ๋ณ€์ˆ˜๋“ค์— Scaling์„ ์ ์šฉ์‹œ์ผœ๋ณด์ž. ํŠน์ดํ•œ ์ ์€ Scaling์„ ์ ์šฉ์‹œํ‚ค๊ธฐ ์ด์ „์— ํ•„์š”ํ•œ Feature๋“ค์„ Vector๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ์ž‘์—…์ด ์„ ํ–‰๋˜์–ด์•ผ ํ•œ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

 

# ์‚ฌ์šฉํ•˜์ง€ ์•Š์„ ๋ณ€์ˆ˜ ์ œ์™ธํ•˜๊ณ  ํ•„์š”ํ•œ ๋ณ€์ˆ˜๋“ค๋งŒ select
housing_df = housing_df.select('medhv',
                              'totbdrms',
                              'pop',
                              'houshlds',
                              'medinc',
                              'rmsperhh',
                              'popperhh',
                              'bdrmsperrm')

 featureCols = ['totbdrms', 'pop', 'houshlds', 'medinc',
              'rmsperhh', 'popperhh', 'bdrmsperrm']

# VectorAssembler๋กœ feature vector๋กœ ๋ณ€ํ™˜
assembler = VectorAssembler(inputCols=featureCols, outputCol='features')
assembled_df = assembler.transform(housing_df)
assembled_df.show(10, truncate=True)

 

VectorAssembler๋กœ ์ˆ˜์น˜ํ˜• Feature๋“ค์„ Vector๋กœ ๋ณ€ํ™˜ํ–ˆ๋‹ค.

 

์ด์ œ ์œ„ ์‚ฌ์ง„์˜ ๋นจ๊ฐ„์ƒ‰ ๋ฐ•์Šค์•ˆ์— ์žˆ๋Š” ์ˆ˜์น˜ํ˜• Feature๋“ค์„ Scaling ์‹œ์ผœ๋ณด์ž.

 

# ์œ„์—์„œ ๋งŒ๋“  Feature vector์ธ 'features' ๋„ฃ๊ธฐ
standardScaler = StandardScaler(inputCol='features',
                               outputCol='features_scaled')
# fit, transform
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)

# scaling๋œ ํ”ผ์ฒ˜๋“ค๋งŒ ์ถ”์ถœํ•ด๋ณด๊ธฐ
scaled_df.select('features', 'features_scaled').show(10,
                                                    truncate=True)

 

Scaling ํ•˜๋Š” API๋„ PySpark์—์„œ ์ œ๊ณตํ•˜๋Š”๋ฐ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์Šค์ผ€์ผ๋ง ๋ฐฉ๋ฒ•์ด ์žˆ์ง€๋งŒ ์—ฌ๊ธฐ์„œ๋Š” ํ‘œ์ค€ํ™”์ธ StandardScaler๋ฅผ ์‚ฌ์šฉํ•ด๋ณด์ž. ๋ฉ”์†Œ๋“œ ์‚ฌ์šฉ ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค. StandardScaler(inputCol='Raw Features', outputCol='Scaled Features') ๋กœ ์Šค์ผ€์ผ๋ง์„ ์ •์˜ํ•ด์ฃผ๊ณ  fit(dataframe)๊ณผ transform(dataframe)์œผ๋กœ ์Šค์ผ€์ผ๋ง์„ ์ˆ˜ํ–‰ํ•ด์ฃผ์ž.

 

์›๋ณธ Feature์™€ ์Šค์ผ€์ผ๋ง๋œ Feature

4. ๋ฐ์ดํ„ฐ ๋ถ„ํ• ๊ณผ ํšŒ๊ท€ ๋ชจ๋ธ๋ง

๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šต 8, ํ…Œ์ŠคํŠธ 2์˜ ๋น„์œจ๋กœ ๋‚˜๋ˆ„์–ด์ฃผ๊ณ  ํšŒ๊ท€ ๋ชจ๋ธ์„ ๋งŒ๋“ค์–ด ํ•™์Šต์‹œ์ผœ๋ณด์ž. ํšŒ๊ท€ ๋ชจ๋ธ์€ ์—ฌ๋Ÿฌ๊ฐ€์ง€๊ฐ€ ์žˆ์ง€๋งŒ ์—ฌ๊ธฐ์„œ๋Š” ์˜ค๋ฒ„ํ”ผํŒ…์„ ์˜ˆ๋ฐฉํ•˜๊ธฐ ์œ„ํ•œ ์ •๊ทœํ™” ํ•ญ์ด ํฌํ•จ๋˜์–ด ์žˆ๋Š” Elastic Net ํšŒ๊ท€ ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•ด๋ณด์ž.

 

# ๋ฐ์ดํ„ฐ ๋ถ„ํ• 
train_data, test_data = scaled_df.randomSplit([0.8, 0.2], seed=rnd_seed)

# Elastic Net ๋ชจ๋ธ ์ •์˜
lr = LinearRegression(featuresCol='features_scaled',
                     labelCol='medhv',
                     predictionCol='predmedhv',
                     maxIter=10,
                     regParam=0.3,
                     elasticNetParam=0.8,
                     standardization=False)
# ๋ชจ๋ธ ํ•™์Šต
linearModel = lr.fit(train_data)

5. ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ์˜ˆ์ธก

์ด์ œ ํ•™์Šต์„ ๋ชจ๋‘ ๋งˆ์ณค์œผ๋‹ˆ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ๋กœ ์˜ˆ์ธก์„ ํ•˜๊ณ  RMSE, MAE, R2 Score ๋ฉ”ํŠธ๋ฆญ๋“ค๋กœ ์„ฑ๋Šฅ ํ‰๊ฐ€๋ฅผ ํ•ด๋ณด์ž. PySpark์—์„œ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ํ‰๊ฐ€๋Š” transform(test_dataframe)์œผ๋กœ ์ˆ˜ํ–‰ํ•ด์ค€๋‹ค.

 

# transoform ์‚ฌ์šฉํ•˜๋ฉด ๋ชจ๋ธ ์ •์˜ํ•  ๋•Œ ์„ค์ •ํ•œ "์˜ˆ์ธก ๊ฐ’ ๋ณ€์ˆ˜"๋ฅผ ์ƒˆ๋กœ ๋งŒ๋“ค์–ด ์ƒ์„ฑํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๋ฐ˜ํ™˜
predictions = linearModel.transform(test_data)

print(type(predictions)) # ์˜ˆ์ธก ๊ฒฐ๊ณผ๊ฐ’์ด PySpark์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜๋จ!

 

๋ณ€์ˆ˜ prediction์ด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜๋˜๋Š”๋ฐ ์ด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์ค‘์— ์˜ˆ์ธก๊ฐ’์ธ predmedhv์™€ ์ •๋‹ต์ธ medhv ์นผ๋Ÿผ๋งŒ์„ ์ถ”์ถœํ•ด๋ณด์ž. ๊ฒฐ๊ณผํ™”๋ฉด์„ ๋ณด๋ฉด ๋จธ์‹ ๋Ÿฌ๋‹ ๋ชจ๋ธ์˜ ์˜ˆ์ธก ์„ฑ๋Šฅ์ด ๊ทธ๋ฆฌ ๋†’์•„๋ณด์ด์ง„ ์•Š๋Š”๋‹ค..

 

# predictions ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์—์„œ y๊ฐ’๊ณผ ์˜ˆ์ธก๊ฐ’๋งŒ ์ถ”์ถœํ•ด ๋น„๊ต
pred_labels = predictions.select('predmedhv', 'medhv')
pred_labels.show()

 

์˜ˆ์ธก๊ฐ’๊ณผ ์‹ค์ œ๊ฐ’ ๋น„๊ต

 

๊ทธ๋ ‡๋‹ค๋ฉด ์˜ˆ์ธก๊ฐ’๊ณผ ์‹ค์ œ๊ฐ’์ด ์–ผ๋งˆ๋‚˜ ์ฐจ์ด๋‚˜๋Š”์ง€ RMSE, MAE๋ฅผ ๊ณ„์‚ฐํ•ด๋ณด๊ณ  ๋งŒ๋“ค์–ด์ง„ ํšŒ๊ท€์‹์ด ์ฃผ์–ด์ง„ ๋ฐ์ดํ„ฐ๋ฅผ ์–ผ๋งˆ๋‚˜ ์ž˜ ์„ค๋ช…ํ•˜๋Š”์ง€ ๋‚˜ํƒ€๋‚ด๋Š” R2 Score(๊ฒฐ์ •๊ณ„์ˆ˜)๊ฐ’์„ ์‚ดํŽด๋ณด์ž.

 

print(f"RMSE:{linearModel.summary.rootMeanSquaredError}")
print(f"MAE: {linearModel.summary.meanAbsoluteError}")
print(f"R2 score: {linearModel.summary.r2}")

 

์˜ˆ์ธก ์„ฑ๋Šฅ์ด ๊ทธ๋ฆฌ ๋†’์ง€ ์•Š๋‹ค.

 

์ถ”๊ฐ€์ ์œผ๋กœ ๋ฉ”ํŠธ๋ฆญ์„ ์–ป์„ ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์€ ์œ„ ์ฝ”๋“œ ๋ง๊ณ ๋„ RegressionEvaluator๋กœ๋„ ๊ณ„์‚ฐํ•  ์ˆ˜ ์žˆ๋‹ค. ํ•ด๋‹น ๋ฉ”์†Œ๋“œ์˜ ์‚ฌ์šฉ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

  • RegressionEvaluator(predictionCol, labelCol, metricName)
    • predictionCol : ์˜ˆ์ธก๊ฐ’
    • labelCol : ์‹ค์ œ๊ฐ’
    • metricName : ์ธก์ •ํ•  ๋ฉ”ํŠธ๋ฆญ ์ด๋ฆ„(ex. RMSE, MAE ๋“ฑ๋“ฑ)

 

# RMSE
evaluator = RegressionEvaluator(predictionCol='predmedhv',
                               labelCol='medhv',
                               metricName='rmse')
# MAE                            
evaluator = RegressionEvaluator(predictionCol='predmedhv',
                               labelCol='medhv',
                               metricName='mae')
# R2 Score
evaluator = RegressionEvaluator(predictionCol='predmedhv',
                               labelCol='medhv',
                               metricName='r2')

print(f"RMSE: {evaluator.evaluate(pred_labels)}")
print(f"MAE: {evaluator.evaluate(pred_labels)}")  
print(f"R2 score: {evaluator.evaluate(pred_labels)}")

 

๋˜ ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์€ RegressionMetrics ๋ฉ”์†Œ๋“œ์ธ๋ฐ ์ด๋Š” ํŠน์ดํ•˜๊ฒŒ ์˜ˆ์ธก๊ฐ’์„ RDD ์ž๋ฃŒ๊ตฌ์กฐ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ  ๋„ฃ์–ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

# ์˜ˆ์ธก๊ฐ’์ธ pred_labels๋ฅผ RDD ์ž๋ฃŒ๊ตฌ์กฐ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ  ๋„ฃ์–ด์•ผ ํ•จ
metrics = RegressionMetrics(pred_labels.rdd)

print("RMSE:", metrics.rootMeanSquaredError)
print("MAE:", metrics.meanAbsoluteError)
print("R2 score:", metrics.r2)
๋ฐ˜์‘ํ˜•