๐ ํด๋น ํฌ์คํ ์ ์คํํฌ ์๋ฒฝ ๊ฐ์ด๋ ์ฑ ๊ณผ ์ธํ๋ฐ์ ์คํํฌ ๋จธ์ ๋ฌ๋ ์๋ฒฝ ๊ฐ์ด๋ ๊ฐ์๋ก ๊ณต๋ถํ ํ ๋ฐฐ์ด ๋ด์ฉ์ ์ ๋ง์ ๋ฐฉ์์ผ๋ก ์ฌ๊ตฌ์ฑํ ๊ฒ์์ ์๋ฆฝ๋๋ค. ํนํ, ์ฐธ๊ณ ํ ์ธํ๋ฐ ๊ฐ์์ ๊ฐ์ ์๋ฃ๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉํ์ง ์์์์ ํํ ์๋ฆฝ๋๋ค!
์์ ์ ์ํ์น ์คํํฌ์ ๋ํ ๊ฐ๋ ๊ธ๊ณผ PySpark๋ฅผ ํ์ฉํด์ ๋ฌธ์ ์ ํ์ ๋ง๊ฒ ๊ฐ๋จํ ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ํํ ๋ฆฌ์ผ ํ์์ผ๋ก ๋ฐฐ์ ์๋ค. ๊ทธ ์ดํ ์ด๋ ์ ๋ ์๊ฐ์ด ์ง๋ ํ, ์ํ์น ์คํํฌ์ ๋ฒ์ ๋ ๋ง์ด ์ ๋ฐ์ดํธ ๋์๊ณ ๊ฐ์ธ์ ์ผ๋ก ๋ถ์ฐ ์์คํ ์ ๋ํ ์ญ๋์ ํค์๋ณด๊ณ ์ ์ฑ ๊ณผ ๊ฐ์๋ฅผ ํตํด PySpark๋ฅผ ๋ค์ ๋ฐฐ์ฐ๊ณ ์๋ค.
ํ์๋ PySpark๋ฅผ ํ์ฉํ๋ ๋ชฉ์ ์ด ๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฒ๋ฆฌํ๊ณ ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ๊ฐ๋ฐํ๋ ๋ฐฉ๋ฒ์ ๋ฐฐ์๋ณด๋ ๊ฒ์์ ์๋ฆฐ๋ค. ๊ทธ๋์ ์ด๋ฒ ์ฒซ ํฌ์คํ ์์ ์์๋ณผ PySpark API๋ ๋ฐ๋ก DataFrame API์ด๋ค. Spark์ DataFrame API๋ ํ๋ค์ค DataFrame๊ณผ ๋น์ทํ๊ธฐ๋ ํ์ง๋ง ํ์ฐํ ์ฐจ์ด์ ์ ์กด์ฌํ๋ API์ด๋ค. ๋ง์น "SQL ํ์ ์ด DataFrame" ์ด๋ผ๊ณ ๋ ํ ์ ์๋ค.
๊ทธ๋ฌ๋ฉด ์ด์ ๋ํ์ ์ธ ๋ฒค์น๋งํฌ ๋ฐ์ดํฐ์ธ ํ์ดํ๋ ๋ฐ์ดํฐ๋ฅผ ํ์ฉํด์ ์คํํฌ์ ๋ฐ์ดํฐํ๋ ์ API๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ณด๋๋ก ํ์. ์ฐธ๊ณ ๋ก ์์ผ๋ก ์ฌ์ฉํ PySpark ๋ฒ์ ์ 3.2.0
์ด๋ฉฐ Jupyter Notebook์์ PySpark๋ฅผ ์ค์ตํ์๋ค.
1. ๋ฐ์ดํฐ ๋ก๋ ๋ฐ ๋ฏธ๋ฆฌ๋ณด๊ธฐ
๋ฐ์ดํฐ๊ฐ ํ์ฌ ๋ก์ปฌ์์ csv
๋ฐ์ดํฐ ํํ๋ก ์ ์ฅ๋์ด ์๊ธฐ ๋๋ฌธ์ Spark์ read.csv
๋ฉ์๋๋ฅผ ํ์ฉํด์ ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ ์ ์๋ค.
path = '/Users/younghun/Desktop/gitrepo/data/titanic/train.csv'
spark_df = spark.read.csv(path, header=True, inferSchema=True)
spark.show()
์ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด show()
๋ฉ์๋๋ ๋ก๋ํ ์คํํฌ ๋ฐ์ดํฐํ๋ ์์ ์์ 20๊ฐ Row๋ฅผ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ํ ์ ์๋ค. ๋ฌผ๋ก show()
๋ฉ์๋ ์ธ์์ $n$(์ซ์)๋ฅผ ๋ฃ์ด์ฃผ์ด ์ํ๋ ๊ฐ์๋งํผ ์์ $n$๊ฐ์ Row๋ฅผ ๋ฏธ๋ฆฌ๋ณด๊ธฐ ํ ์ ์๋ค.
๊ทธ๋ฆฌ๊ณ show()
๋ฉ์๋ ์์๋ค๊ฐ limit()
์ด๋ผ๋ ๋ฉ์๋๋ก ๋ฐ์ดํฐํ๋ ์์ ์์ Row ๊ฐ์๋ฅผ ๋ฏธ๋ฆฌ ์ถ์ถํ ์๋ ์๋ค. limit()
๋ฉ์๋์ ์ธ์๋ num
์ผ๋ก ์๋์ฒ๋ผ ์ค ์ ์๋ค.
path = '/Users/younghun/Desktop/gitrepo/data/titanic/train.csv'
spark_df = spark.read.csv(path, header=True, inferSchema=True)
spark_df.limit(num=5).show()
๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ผ๋ก ํ๋ค์ค์ ๋ง์ฐฌ๊ฐ์ง๋ก head()
๋ฉ์๋๊ฐ ์กด์ฌํ๋ค. ํ์ง๋ง head()
๋ฉ์๋๋ ํ๋ค์ค ๋์ ๋ฌ๋ฆฌ ์คํํฌ์์๋ Row ๊ฐ์ฒด๋ฅผ ๋ฐํํ๋ค. ๋ง์ฝ head()
๋ฉ์๋์ ์๋ฌด๋ฐ ์ธ์๋ ์ฃผ์ง ์์ผ๋ฉด ์๋์ ๊ฐ์ด ๊ฐ์ฅ ์์์ Row ๊ฐ์ฒด 1๊ฐ๋ง์ ๋ฐํํ๋ค.
path = '/Users/younghun/Desktop/gitrepo/data/titanic/train.csv'
spark_df = spark.read.csv(path, header=True, inferSchema=True)
spark_df.head()
ํน์ดํ ์ ์ ์คํค๋ง(์นผ๋ผ ์ด๋ฆ)์ ๊ฐ ๋ชจ๋ ๋ฐํ๋๋ค. ๊ทธ๋์ ์ Row ๊ฐ์ฒด๋ฅผ ํ์ด์ฌ์ ๋ค์๋ ํํ์ฒ๋ผ Key ๊ฐ์ ์ฐธ์กฐํ์ฌ ๊ทธ์ ํด๋นํ๋ Value ๊ฐ์ ์ป์ด๋ผ ์ ์๋ค. ์๋์ฒ๋ผ ๋ง์ด๋ค.
path = '/Users/younghun/Desktop/gitrepo/data/titanic/train.csv'
spark_df = spark.read.csv(path, header=True, inferSchema=True)
row = spark_df.head()
print(row)
print()
print('row์ PassengerId:', row['PassengerId'], row.PassengerId)
๊ทธ๋ฆฌ๊ณ ๋ ํน์ดํ ์ ์ head()
๋ฉ์๋ ์ธ์์ 1์ ํฌํจํ ์ฆ, ์ซ์๋ฅผ ์ธ์๋ก ๋ฃ์ด์ฃผ๊ฒ ๋๋ฉด ๋ฐ๋์ ๋ฐํ์ Row ๊ฐ์ฒด(๋ค)๊ฐ ๋ด๊ธด ๋ฆฌ์คํธ๋ฅผ ๋ฐํํ๊ฒ ๋๋ค. ์๋ ๊ทธ๋ฆผ์ฒ๋ผ ๋ง์ด๋ค.
path = '/Users/younghun/Desktop/gitrepo/data/titanic/train.csv'
spark_df = spark.read.csv(path, header=True, inferSchema=True)
# 1. head()์ ์ธ์๋ก ์๋ฌด๊ฒ๋ ์ ์ค ๋
row = spark_df.head()
print('row:', row)
print()
# 2. head()์ ์ธ์๋ก 1์ ์ค ๋
row2 = spark_df.head(1)
print('row2:', row2)
print()
# 3. head()์ ์ธ์๋ก 3์ ์ค ๋
row3 = spark_df.head(3)
print('row3:', row3)
2. ๋ฐ์ดํฐ ๋ช ์ธ์ ์ดํด๋ณด๊ธฐ
๋ค์์ PySpark๋ก ๋ก๋ํ ๋ฐ์ดํฐ์ ๋ช
์ธ์๋ฅผ ๋ณด๊ธฐ ์ํ ๋ฉ์๋๋ค์ด๋ค. ๋ช
์ธ์๋, ๋ฐ์ดํฐ์ ์ด๋ค ์นผ๋ผ์ด ์๊ณ ๊ทธ ์นผ๋ผ ๊ฐ์ ํ์
์ ๋ฌด์์ธ์ง์ ๋ํด ์ ๋ณด๋ฅผ ์ ๊ณตํ๋ ๊ฒ์ด๋ค. ํ๋ค์ค๋ก ์น๋ฉด dataframe.info()
๋๋ dataframe.describe()
์ ๋๊ฐ ๋๊ฒ ๋ค.
๊ทธ๋ฐ๋ฐ, PySpark๋ ํ๋ค์ค์ ๋ฌ๋ฆฌ ๋ฌธ์์ด ํํ์ ์นผ๋ผ์๋ ํ๊ท ๊ฐ, ํ์คํธ์ฐจ ๋ฑ๊ณผ ๊ฐ์ ์์ฝํต๊ณ๋๊ฐ์ ๋ถํ์ํ๊ฒ ์ ๊ณตํ๋ค. ๋ฌผ๋ก Null
๊ฐ์ผ๋ก ํํ๋๋ค. ๊ทธ๋์ PySpark์์๋ ๋ฐ์ดํฐํ๋ ์์์ ์์นํ ํ์
๋ง ๊ณ ๋ฅธ ํ describe()
๋ฉ์๋๋ฅผ ํ์ฉํด ์๋์ ๊ฐ์ด ๋ฐ์ดํฐ ๋ช
์ธ์ ์ ๋ณด๋ฅผ ์ ๊ณต๋ฐ์ ์ ์๋ค. ์ฐธ๊ณ ๋ก PySpark์์์ describe()
๋ฉ์๋๋ ์คํํฌ์ ๋ฐ์ดํฐํ๋ ์ ๊ฐ์ฒด๋ฅผ ๋ฐํํ๋ฏ๋ก ๋ฐ๋์ ๋ด์ฉ๋ฌผ์ ๋ณด๊ธฐ ์ํด์๋ show()
์ ๊ฐ์ด ๋ฏธ๋ฆฌ๋ณด๊ธฐ ๋ฉ์๋๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค๋ ๊ฒ๋ ์์ง๋ง์.
num_cols = [name for name, dtype in spark_df.dtypes if dtype != 'string']
spark_df.select(*num_cols).describe().show() # ๊ทธ๋ฅ ๋ฆฌ์คํธ ํํ๋ก num_cols๋ง ๋ฃ์ด๋ ๋์ํ๊ธด ํจ
3. ๋ฐ์ดํฐํ๋ ์์ shape ์ป๊ธฐ
๋ค์์ PySpark์ ๋ฐ์ดํฐํ๋ ์์ ํ, ์ด ๊ฐ์๋ฅผ ๊ตฌํ๋ ์ฆ, shape๋ฅผ ๊ตฌํ๋ ๋ฐฉ๋ฒ์ด๋ค. ํ๋ค์ค๋ dataframe.shape
์ฝ๋๋ง ์น๋ฉด ํํ ํํ๋ก (ํ์ ๊ฐ์, ์ด์ ๊ฐ์)
๋์์ง๋ง, ์คํํฌ์์๋ ๊ทธ๋ ์ง ๋ชปํ๋ค. ์ด์ ๋ํ ์ด์ ๋ฅผ ์์๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ๋ค. ์ฐ์ ํ๋ค์ค๋ ํ๋์ ์๋ฒ ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋ํ ๋ฐ์ดํฐํ๋ ์์ ์ฌ๋ฆฌ๊ฒ ๋๋ค. ๋ฉ๋ชจ๋ฆฌ์ ๋ฐ์ดํฐํ๋ ์ ๋ชจ๋๊ฐ ์ ์ฅ๋์ด ์๊ธฐ ๋๋ฌธ์ shape ๋ผ๋ ์์ฑ ๊ฐ์ ์ด์ฉํด์ ํ๊ณผ ์ด์ ๊ฐ์๋ฅผ ๊ฐ๋จํ๊ฒ ๊ตฌํ ์ ์๋ค.
๋ฐ๋ฉด์, ์คํํฌ๋ ์๋ค์ํผ ์๋ฒ์ ์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ผ๋ก ๋ถ์ฐํ์ฌ ๋ฐ์ดํฐํ๋ ์์ ๋๋์ด ์ ์ฅํ๋ ํํ์ด๋ค. ๋ฐ๋ผ์, ํ๋ค์ค์๋ ๋ฌ๋ฆฌ ํ๋์ ์๋ฒ ๋ฉ๋ชจ๋ฆฌ์ ๋ฐ์ดํฐํ๋ ์ ์ ์ฒด๋ฅผ ์ ์ฅํ๋ ๋ฐฉ์์ด ์๋๊ธฐ ๋๋ฌธ์ ์คํํฌ์์๋ shape๋ผ๋ ์์ฑ ๊ฐ์ ์ด์ฉํด ๊ฐ๋จํ๊ฒ ํ๊ณผ ์ด์ ๊ฐ์๋ฅผ ๊ตฌํ ์ ์๋ค.
๊ทธ๋์ ์คํํฌ๋ ์ฝ๊ฐ ๊ท์ฐฎ์์ง๋ผ๋ ์๋์ ๊ฐ์ ๋ฐฉ์์ ํ์ฉํด์ ๋ฐ์ดํฐํ๋ ์์ ํ, ์ด ๊ฐ์๋ฅผ ๊ฐ๊ฐ ๊ตฌํด์ค๋ค.
# ํ ๊ฐ์ ๊ตฌํ๊ธฐ
print('ํ ๊ฐ์:', spark_df.count())
# ์ด ๊ฐ์ ๊ตฌํ๊ธฐ
print('์ด ๊ฐ์:', len(spark_df.columns))
4. ๋ฐ์ดํฐํ๋ ์์์ ํน์ ์นผ๋ผ๋ค ์ถ์ถํ๊ธฐ
์ด์ PySpark์์ ๋ฐ์ดํฐํ๋ ์์ ์ํ๋ ์นผ๋ผ์ ์ถ์ถํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ณด๋๋ก ํ์. ์ด๋ฒ ๋ชฉ์ฐจ์์ ์์๋ณผ ์นผ๋ผ ์ถ์ถ ๋ฐฉ๋ฒ์ select()
๋ฉ์๋๋ฅผ ํ์ฉํ๋ ๋ฐฉ๋ฒ์ด๋ค. ์ฐธ๊ณ ๋ก ๋๋ถ๋ถ์ ๋ค๋ฅธ ๋ฉ์๋๋ค๋ ๋ง์ฐฌ๊ฐ์ง์ง๋ง select()
๋ฉ์๋๋ dataframe
๊ฐ์ฒด ์์ฒด๋ฅผ ๋ฐํํ๊ธฐ ๋๋ฌธ์ ๋ด์ฉ๋ฌผ์ ๋ณด๊ธฐ ์ํด์๋ ๋ฐ๋์ show()
๋ฉ์๋์ ๊ฐ์ ๊ฒ๋ค์ ์ถ๊ฐ๋ก ์์ฑํด์ฃผ์ด์ผ ํ๋ค.
์๋ ์ฝ๋๋ฅผ ๋ณด๋ฉด ์๊ฒ ์ง๋ง, ํน์ ์นผ๋ผ์ ์ถ์ถํ ๋, ๋ค์ํ ๋ฐฉ๋ฒ์ผ๋ก ์์ฑํ ์ ์๋ค. ์๋ ์ฝ๋๋ฅผ ๋ฉด๋ฐํ ์ดํด๋ณด๋ฉด์ 5๊ฐ์ง ๋ฐฉ๋ฒ์ด ๋ชจ๋ ์ ์ฉ๋ ์ ์๋ค๋ ๊ฒ๋ ์์๋์. ์์ ์คํํฌ ๋ฒ์ ผ์์๋ ์๋ ์นผ๋ผ ์ด๋ฆ์ด ๋ด๊ธด ๋ฆฌ์คํธ๋ฅผ unpacking ํด์ ๋ฃ์ด์ฃผ์ด์ผ ํ์ง๋ง ์ต๊ทผ ๋ฒ์ ์ ๋ฐ์ดํธ๊ฐ ๋๋ฉด์ ๋ฆฌ์คํธ๋ฅผ ๊ทธ๋ฅ ๋ฃ์ด์ฃผ์ด๋ ์ ์์ ์ผ๋ก ๋์ํ๋ค.
from pyspark.sql.functions import col
# ๋ชจ๋ ์นผ๋ผ ์ถ์ถํ๊ธฐ
spark_df.select('*').limit(3).show()
# Age๋ผ๋ ์นผ๋ผ ํ๋๋ง ์ถ์ถํ๊ธฐ
spark_df.select('Age').limit(3).show()
spark_df.select(['Age']).limit(3).show()
spark_df.select(spark_df['Age']).limit(3).show()
spark_df.select(spark_df.Age).limit(3).show()
spark_df.select(col('Age')).limit(3).show() # ์ํฌํธํ col ๋ฉ์๋ ์ฌ์ฉ
# ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ ๋์์ ์ถ์ถํ๊ธฐ
spark_df.select('Name', 'Age').limit(3).show()
spark_df.select(['Name', 'Age']).limit(3).show()
๋ค์์ผ๋ก๋ pyspark.sql.functions
์์ ์ ๊ณตํ๋ ๋ช ๊ฐ์ง ํจ์๋ค์ ์ํฌํธํด์ ํน์ ์นผ๋ผ์ ๋ณ๊ฒฝํ ์๋ ์๋ค.
from pyspark.sql.functions import max, min, avg
from pyspark.sql.functions import upper, lower
spark_df.select(max('Age').alias('Max_Age')).show()
spark_df.select(min('Pclass').alias('Min_Pclass')).show()
spark_df.select(avg('Age').alias('Avg_Age')).show()
spark_df.select(upper('Name').alias('Upper_Name')).limit(3).show()
# ๋ชจ๋ ์นผ๋ผ๊ณผ ๋ณํ์ ์ค ์นผ๋ผ ๋์์ ์ถ์ถ
spark_df.select('*', lower(col('Name')).alias('Lower_Name')).limit(3).show()