๐ ํด๋น ํฌ์คํ ์ ์คํํฌ ์๋ฒฝ ๊ฐ์ด๋ ์ฑ ๊ณผ ์ธํ๋ฐ์ ์คํํฌ ๋จธ์ ๋ฌ๋ ์๋ฒฝ ๊ฐ์ด๋ ๊ฐ์๋ก ๊ณต๋ถํ ํ ๋ฐฐ์ด ๋ด์ฉ์ ์ ๋ง์ ๋ฐฉ์์ผ๋ก ์ฌ๊ตฌ์ฑํ ๊ฒ์์ ์๋ฆฝ๋๋ค. ํนํ, ์ฐธ๊ณ ํ ์ธํ๋ฐ ๊ฐ์์ ๊ฐ์ ์๋ฃ๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉํ์ง ์์์์ ํํ ์๋ฆฝ๋๋ค!
์ ๋ฒ ํฌ์คํ ์ ์ด์ด PySpark์ dataframe API ์ฌ์ฉ ๋ฐฉ๋ฒ์ ๋ํด ์ฐ์ด์ด ์์๋ณด์.
1. ์กฐ๊ฑด์ ๋ง๋ ๋ฐ์ดํฐ๋ง ์ถ์ถํ๊ธฐ
ํน์ ์กฐ๊ฑด์ ๋ง๋ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ๊ธฐ ์ํด ์ฌ์ฉํ๋ ๋ฉ์๋๋ก๋ filter()
๋ฉ์๋๊ฐ ์กด์ฌํ๋ค. ์ด ๋, filter()
๋ฉ์๋์๋ ์คํํฌ ๋ฐ์ดํฐํ๋ ์์ Column Type
๊ฐ์ฒด๋ฅผ ๋ฃ์ด์ฃผ์ด์ผ ํ๋ค. ์ฌ๊ธฐ์ Column Type
๊ฐ์ฒด๋๊ฒ ์ ์๋ฟ์ง ์์ ํ
๋ฐ, ์ฐ๋ฆฌ๊ฐ ์ ๋ฒ ์๊ฐ์ select()
๋ฉ์๋๋ฅผ ํ์ฉํด์ ํน์ ์นผ๋ผ์ ์ถ์ถํ ๋, ์๋์ ๊ฐ์ 5๊ฐ์ง ๋ฐฉ๋ฒ์ ์ฌ์ฉํ๋ค๊ณ ํ๋ค.
spark_df.select('Age').limit(3).show()
spark_df.select(['Age']).limit(3).show()
spark_df.select(spark_df['Age']).limit(3).show()
spark_df.select(spark_df.Age).limit(3).show()
spark_df.select(col('Age')).limit(3).show()
์ ๋ฐฉ๋ฒ๋ค ์ค 1,2๋ฒ์งธ ๋ฐฉ๋ฒ์ Column Type
๊ฐ์ฒด๊ฐ ์๋๊ณ 3,4,5 ๋ฒ์งธ ๋ฐฉ๋ฒ์ Column Type
๊ฐ์ฒด๋ฅผ ์๋ฏธํ๋ค. ํน์ ๋ชจ๋ฅด๋ 3,4,5 ๋ฒ์งธ ๋ฐฉ๋ฒ์ ๋ฐ๋ก ์ฝ๋๋ก type์ ํ์ธํด๋ณด๋ฉด ์๋์ ๊ฐ๋ค.
๊ทธ๋ฆผ์ ๋ณด๋ฉด pyspark.sql.column.Column
์ด๋ผ๋ ํ์
์ด๋ผ๊ณ ๋์จ๋ค. ๋ค์ filter()
๋ฉ์๋๋ก ๋์์์, filter()
๋ฉ์๋ ์์๋ ์์ฒ๋ผ Column ํ์
์ ํํ๋ก ์กฐ๊ฑด์ ๋ช
์ํด์ฃผ์ด์ผ ํ๋ค. ์๋์ฒ๋ผ ๋ง์ด๋ค.
path = '/Users/younghun/Desktop/gitrepo/data/titanic/train.csv'
spark_df = spark.read.csv(path, header=True, inferSchema=True)
# filter ๋ฉ์๋์์๋ Column type์ ๋ฃ์ด์ฃผ์ด์ผ ํจ!
spark_df.filter(col('Age') > 25).limit(5).show()
spark_df.filter(spark_df['Age'] > 25).limit(5).show()
spark_df.filter(spark_df.Age > 25).limit(5).show()
# error ๋ฐ์!
spark_df.filter('Age' > 25).limit(5).show()
๊ทธ๋ฆฌ๊ณ ์ filter()
๋ฉ์๋๋ where()
๋ฉ์๋๋ก ๋๊ฐ์ด ๋์ฒดํ ์ ์๋ค. ์ฌ๊ธฐ์ where()
์ ์ฝ๊ฒ ๋งํ๋ฉด SQL์์ ์์ฃผ ์ฌ์ฉํ๋ WHERE ์กฐ๊ฑด๋ฌธ ๋์ ํค์๋ where()
์ด๋ผ๊ณ ์๊ฐํ๋ฉด ๋๋ค.
์ฐธ๊ณ ๋ก filter()
, where()
๋ฉ์๋ ๋ ๋ค ์์ ๊ฐ์ด Column Type
๊ฐ์ฒด์ฒ๋ผ ๋ช
์ํ์ง ์๊ณ ์๋ฐ์ดํ๋ก ๊ฐ์ธ์ SQL ์ฟผ๋ฆฌ ํํ๋ก ๋ช
์ํด์ฃผ์ด๋ ์ ์ ๋์ํ๋ค. ์๋์ฒ๋ผ ๋ง์ด๋ค.
spark_df.filter("Age > 25").limit(5).show()
spark_df.where("Age > 25").limit(5).show()
๊ทธ๋ฆฌ๊ณ ์กฐ๊ฑด์ ๋ ๊ฐ์ง ์ด์ ๋ช
์ํ๋ฉด์ AND
, OR
๊ณผ ๊ฐ์ ๋
ผ๋ฆฌ์ฐ์ฐ์๋ ํ๋ค์ค์ฒ๋ผ ๊ฐ๊ฐ &
, |
๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค. ์ฌ์ฉ๋ฒ์ ์๋์ ๊ฐ๋ค.
from pyspark.sql.functions import col
cond1 = col('Age') > 25
cond2 = col('Embarked') != 'S'
spark_df.filter(cond1 & cond2).limit(5).show()
spark_df.where(cond1 | cond2).limit(5).show()
์ถ๊ฐ์ ์ผ๋ก Column Type
๊ฐ์ฒด๋ฅผ ๋ช
์ํ๋ฉด์ SQL์ LIKE ๊ตฌ๋ฌธ ๊ฐ์ ๊ฒ๋ค์ ์ถ๊ฐ ๋ฉ์๋๋ก ํ์ฉํ ์ ์๋ค.
spark_df.filter(col('Name').like('%Mr.%')).limit(5).show()
spark_df.where("Name LIKE '%Mr.%'").limit(5).show() # SQL ์ฟผ๋ฆฌ ํํ๋ก ๋ช
์๋ ๊ฐ๋ฅ!
2. ํน์ ์นผ๋ผ ๊ฐ์ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ ์ ๋ ฌํ๊ธฐ
์ด๋ฒ์ ์คํํฌ์ ๋ฐ์ดํฐํ๋ ์์์ ํน์ ์นผ๋ผ ๊ฐ์ ๋ฐ๋ผ ๋ฐ์ดํฐ Row๋ฅผ ์ค๋ฆ(๋๋ ๋ด๋ฆผ)์ฐจ์ ์ ๋ ฌํ๋ ๋ฐฉ๋ฒ์ ์์๋ณด์. ํฌ๊ฒ orderBy()
๋ฉ์๋, sort()
๋ฉ์๋๊ฐ ์๋๋ฐ, ์ฌ์ฉ ๋ฐฉ๋ฒ์ ๋์ผํ๋ค. ๋จผ์ orderBy()
๋ฉ์๋ ์ฌ์ฉ ์์์ด๋ค. sort()
๋ฉ์๋๋ ์๋์ orderBy()
๋ฅผ ๋์ฒดํ๊ธฐ๋ง ํ๋ฉด ๋๋ค.
# orderBy -> ๋จ์ผ ์นผ๋ผ ๊ธฐ์ค์ผ๋ก ์ ๋ ฌ
spark_df.orderBy("PassengerId", ascending=False).limit(5).show()
spark_df.orderBy(col('PassengerId'), ascending=False).limit(5).show()
spark_df.orderBy(col('PassengerId').desc()).limit(5).show()
# orderBy -> ๋ณต์ ์นผ๋ผ ๊ธฐ์ค์ผ๋ก ์ ๋ ฌ
spark_df.orderBy("PassengerId", "Age", ascending=[True, False]).limit(5).show()
spark_df.orderBy(col("PassengerId"), col("Age"), ascending=[True, False]).limit(5).show()
spark_df.orderBy(col("PassengerId").asc(), col("Age").desc()).limit(5).show()
์ฐธ๊ณ ๋ก ํ๋ค์ค์์๋ sort_values()
๋ฉ์๋์์ ์์ ๊ฐ์ด ๋ณต์ ์นผ๋ผ ๊ธฐ์ค์ผ ๋, ๊ฐ ์นผ๋ผ๋ง๋ค ์ค๋ฆ์ฐจ์, ๋ด๋ฆผ์ฐจ์ ์ ๋ ฌํ ์ ์๋์ง๋ ์ฒ์์์๋ค. ์๋์ฒ๋ผ ์ฌ์ฉํ๋ฉด ๋๋ค.
pandas_df.sort_values(by=['Pclass', 'Name'], ascending=[True, False]).head()
๊ทธ๋ฌ๋ฉด ์ง๊ธ๊น์ง ๋ฐฐ์๋ณธ ๋ด์ฉ๋ค์ ์ด์ง ์์ฉํด์ ์๋์ ๊ฐ์ ์ฝ๋๋ฅผ ์ง๋ณผ ์ ์๋ค.
select_cols = ["PassengerId", "Survived", "Name", "Age", "Sex"]
filter_cond = col("Age") > 35
spark_df.select(*select_cols) \
.filter(filter_cond) \
.orderBy("Age", "PassengerId", ascending=[False, True]) \
.show()
3. ํน์ ์นผ๋ผ์ผ๋ก ๋ฐ์ดํฐ ๊ทธ๋ฃนํํ๊ธฐ
์ด๋ฒ์ groupBy()
๋ผ๋ ๋ฉ์๋๋ฅผ ํ์ฉํด์ ํน์ ์นผ๋ผ์ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๊ทธ๋ฃนํ์์ผ๋ณด์. ํ๋ค์ค์ groupby()
๋ฉ์๋์ ๋ง์ฐฌ๊ฐ์ง๋ก ๊ทธ๋ฃนํํ ์นผ๋ผ์ ์ง์ ํด์ฃผ๊ณ ๊ทธ๋ฆฌ๊ณ ์ง๊ณํ ํจ์์ ์ง๊ณํ ์นผ๋ผ์ ๋ช
์ํด์ฃผ๋ฉด ๋๋ค. ๋จ, ์ง๊ณํ ์นผ๋ผ์ ๋ช
์ํด์ค ๋๋ ๋ฌธ์์ด ํํ๋ก ์นผ๋ผ ์ด๋ฆ์ ๋ฃ์ด์ฃผ๋๋ก ํ์. ๊ทธ๋ ์ง ์์ผ๋ฉด ์๋ฌ๊ฐ ๋ฐ์ํ๋ค. ๊ทธ๋ฆฌ๊ณ ๋ง์ฝ ์ง๊ณํ ์นผ๋ผ ์ด๋ฆ์ ๋ช
์ํด์ฃผ์ง ์์ผ๋ฉด ๋ชจ๋ ์นผ๋ผ์ ๋ํด ์ง๊ณ๋ฅผ ํด๋ฒ๋ฆฌ๊ฒ ๋๋ค.(๋จ, count()
๋ฉ์๋๋ ์นผ๋ผ ์ด๋ฆ์ ๋ช
์ํด์ฃผ์ง ์์๋ ํ์ ๊ฐ์๋ง ๋ฑ ๋ฐํํ๋๋ก ๋์ด ์๋ค)
spark_df.groupBy("Survived").max("Age").show()
spark_df.groupBy(col("Survived")).max("Age").show()
spark_df.groupBy(spark_df.Survived).max().show() # ๋ชจ๋ ์นผ๋ผ์ ๋ํด max ์ง๊ณ ๋จ
์ด๋ฒ์ ์ฌ๋ฌ ์นผ๋ผ์ ๊ธฐ์ค์ผ๋ก ๊ทธ๋ฃนํํด๋ณด์.
# ์ฌ๋ฌ ์นผ๋ผ์ ๊ธฐ์ค์ผ๋ก ๊ทธ๋ฃนํ
spark_df.groupBy("Sex", "Survived").max("Age").show()
spark_df.groupBy(["Sex", "Survived"]).max("Age").show() # ๋ฆฌ์คํธ ํํ๋ก ๋ฃ์ด๋ ์ ์๋์ํจ
๊ทธ๋ ๋ค๋ฉด ์ง๊ณํ ์นผ๋ผ์ด ์ฌ๋ฌ๊ฐ์ง์ธ๋ฐ, ๊ฐ ์นผ๋ผ๋ง๋ค ์๋ก ๋ค๋ฅธ ์ง๊ณ ํจ์๋ฅผ ์ฌ์ฉํ๊ณ ์ถ์ ๋ ์ด๋ป๊ฒ ํ ๊น? ๊ทธ ๋๋ agg()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
spark_df.groupBy(col("Sex"), col("Survived")) \
.agg(max("Age").alias("Max_Age"),
min("Age").alias("Min_Age"),
avg("Fare").alias("Avg_Fare")) \
.show()
4. ํน์ ์นผ๋ผ์ ๋ด ๋ง๋๋ก ๊ฐ์ง๊ณ ๋๊ธฐ
์ด๋ฒ์๋ ๋ฐ์ดํฐํ๋ ์์ด ๊ธฐ์กด์ ๊ฐ๊ณ ์๋ ์นผ๋ผ(๋ค)์ ํ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ณด์. ๋ํ์ ์ผ๋ก๋ withColumn()
์ด๋ผ๋ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ ํ
๋ฐ, ์ด ๋ฉ์๋๋ ํฌ๊ฒ 3๊ฐ์ง ์ญํ ์ ํ๋ค. ์ฒซ ๋ฒ์งธ๋ ๊ธฐ์กด ์นผ๋ผ์ผ๋ก๋ถํฐ ์ด๋ ํ '๋ณํ'์ ์ทจํด์ ์๋ก์ด ์นผ๋ผ์ ์์ฑํ๋ ๊ฒ, ๋ ๋ฒ์งธ๋ ๊ธฐ์กด ์นผ๋ผ๊ฐ์ ๋ณ๊ฒฝํ๋ ๊ฒ, ์ธ ๋ฒ์งธ๋ ๊ธฐ์กด ์นผ๋ผ ๊ฐ ํ์
์ ๋ฐ๊พธ๋ ๊ฒ์ด๋ค.
๊ทธ๋ฐ๋ฐ withColumn()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ ๋๋ ์๋กญ๊ฒ ๋ง๋ค ์นผ๋ผ ์ด๋ฆ ์ฆ, ์ฒซ ๋ฒ์งธ ์ธ์์๋ ๋ฌด์กฐ๊ฑด ๋ฌธ์์ด ํํ๋ก ์
๋ ฅํด์ฃผ์ด์ผ ํ๊ณ , ๊ธฐ์กด ์นผ๋ผ์ ๋ฃ๋ ๋ ๋ฒ์งธ ์ธ์์๋ ๋ฐ๋์ col('column')
๋๋ df['column']
, df.column
๊ณผ ๊ฐ์ด Column Type
ํํ๋ก ๋ฃ์ด์ฃผ์ด์ผ ํจ์ ์์ง ๋ง์.
๊ฐ์ฅ ๋จผ์ ์๋ก์ด ์นผ๋ผ์ ์์ฑํด๋ณด๋๋ก ํ์.
from pyspark.sql.functions import avg
# ์คํํฌ์์ ๋ฐ์ดํฐํ๋ ์ ๋ณต์ฌ๋ณธ์ ๋ง๋ค๊ธฐ ์ํด์๋ ์๋์ ๊ฐ์ด ์ํ
copy_df = spark_df.select("*")
# Age์๋ค๊ฐ ํ๊ท ๋์ด๋ฅผ ๋ํด๋ณด์
avg_age = copy_df.select(avg(col("Age"))).first()[0]
new_age_df = copy_df.withColumn("New_Age", col("Age") + avg_age)
new_age_df.limit(5).show()
๋ ๋ฒ์งธ๋ ๊ธฐ์กด ์นผ๋ผ๊ฐ์ ๋ณ๊ฒฝํ๋ ๋ฐฉ๋ฒ์ธ๋ฐ, ์ฒ์์ ์๋์ ๊ฐ์ด ๋์ํ ๊ฑฐ๋ผ๊ณ ์๊ฐํ ์ ์๋ค. ํ์ง๋ง ์๋ ์ฝ๋๋ ์๋ฌ๊ฐ ๋ฐ์ํ๋ค.
copy_df = spark_df.withColumn("Age", 0)
copy_df.show()
์ด ๋, ์๋ฌ ๋ฉ์ธ์ง๊ฐ col should be Column
์ด๋ผ๊ณ ๋์ค๊ฒ ๋๋๋ฐ, ์ด๊ฒ์ ์์์ 0
๊ฐ์ Column Type
์ผ๋ก ๋ง๋ค์ด์ฃผ์ด์ผ ํ๋ค๋ ์๋ฏธ์ด๋ค. ์ด๋ ๊ฒ ์์๊ฐ ๋๋ ๋ฌธ์์ด์ Column Type
์ผ๋ก ๋ง๋ค์ด์ฃผ๊ธฐ ์ํด์๋ pyspark.sql.functions
์ lit
์ด๋ผ๋ ๋ฉ์๋๋ฅผ ์ํฌํธํด์ ์ฌ์ฉํด์ผ ํ๋ค. ์ฌ๊ธฐ์ lit
์ literal ์ด๋ผ๋ ๋จ์ด์ ์ค์๋ง์ด๋ค.
from pyspark.sql.functions import lit
copy_df = spark_df.withColumn("Age", lit(0))
copy_df.show()
๊ทธ ์ด์ธ์ ์นผ๋ผ ๊ฐ์ด ๋ฌธ์์ด์ผ ๋, ํด๋น ๋ฌธ์์ด ์ผ๋ถ๋ฅผ ๊ฐ์ ธ์ค๋ substring ๋ฉ์๋๋ ํน์ ๊ตฌ๋ถ์๋ฅผ ๊ธฐ์ค์ผ๋ก ์ชผ๊ฐ์ ๊ฐ์ ธ์ค๋ split ๋ฉ์๋๋ ์กด์ฌํ๋ค.
๋ค์์ ํน์ ์นผ๋ผ์ ๋ฐ์ดํฐ ํ์ ์ ๋ณ๊ฒฝํ๋ ๋ฐฉ๋ฒ์ด๋ค.
copy_df = spark_df.withColumn("Age", col("Age").cast("integer"))
copy_df.show()
5. ํน์ ์นผ๋ผ ๋๋ Row๋ฅผ ์ญ์ ํ๊ธฐ
ํน์ ์นผ๋ผ์ ์ญ์ ํ๋ ๋ฐฉ๋ฒ์ drop()
๋ฉ์๋๋ฅผ ํ์ฉํ๋ฉด ๋๋ฏ๋ก ๊ฐ๋จํ๋ค. ์นผ๋ผ๋ช
์ ๋ฌธ์์ด ํํ๋ก ๋ฃ์ด์ฃผ๊ฑฐ๋ Column Type
ํํ๋ก ๋ช
์ํด์ฃผ์ด๋ ๋๋ค. ํ์ง๋ง ํ ๋ฒ์ ์ฌ๋ฌ๊ฐ์ ์นผ๋ผ์ ์ญ์ ํ ๋๋ ์ฌ๋ฌ ์นผ๋ผ์ ๋ฌธ์์ด ํํ๋ก ์ด๊ฑฐํ๋ ์์ผ๋ก ํด์ฃผ์ด์ผ๋ง ํ๋ค. ๋ฆฌ์คํธ์ ๋ฃ์ด์ฃผ๊ฑฐ๋ ๋๋ Column Type
ํํ๋ก ๋ช
์ํด์ฃผ๋ฉด ์๋ฌ๊ฐ ๋ฐ์ํ๋ค.
# ํน์ ์นผ๋ผ์ ์ญ์ ํ๊ธฐ
spark_df.drop("Pclass").show()
spark_df.drop(col("Pclass")).show()
# ํ ๋ฒ์ ์ฌ๋ฌ๊ฐ ์นผ๋ผ ์ญ์ ํ๊ธฐ
spark_df.drop("Pclass", "Name").show()
# ์๋์ ๋ ๊ฒฝ์ฐ๋ error ๋ฐ์
spark_df.drop(["Pclass", "Name"]).show()
spark_df.drop(col("Pclass"), col("Name")).show()
๋ค์์ ๊ฒฐ์ธก์น๊ฐ ์๋ Row๋ฅผ ์ญ์ ํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ณด์. ๋ํ์ ์ผ๋ก dropna()
๋ฉ์๋์ na.drop()
๋ฉ์๋๊ฐ ์กด์ฌํ๋ค. ๋ ๋ฉ์๋ ๋ชจ๋ subset
์ด๋ผ๋ ์ธ์์ ๋ฌธ์์ด(ํ๋์ ์นผ๋ผ์ผ ๊ฒฝ์ฐ), ํํ ๋๋ ๋ฆฌ์คํธ ํํ๋ก ๋ฃ์ด์ค ์ ์๋๋ฐ, ์ด subset
์ Row๋ฅผ ์ญ์ ํ ๋, ์ด๋ ์นผ๋ผ์ ๊ฒฐ์ธก์น๊ฐ ์๋ Row๋ฅผ ์ญ์ ํ ์ง๋ฅผ ๊ฒฐ์ ํ๋ ๊ธฐ์ค์ด ๋๋ค.
๋ง์ผ ์๋์ ๊ฐ์ ์ฝ๋๊ฐ ์๋ค๋ฉด, Age
, Embarked
์นผ๋ผ ๊ฐ์ด ๋ชจ๋ ๊ฒฐ์ธก์น์ธ Row๋ค๋ง ์ ๊ฑฐํ๋ผ๋ ์๋ฏธ์ด๋ค.
spark_df.dropna(subset=["Age", "Embarked"]).show()
6. ๊ฒฐ์ธก์น๊ฐ ์๋ Row๋ค ์ฒดํฌํ๊ธฐ
๋ค์์ ๊ฒฐ์ธก์น๊ฐ ์๋ Row๋ค์ ์ฒดํฌํ๋ ๋ฐฉ๋ฒ์ด๋ค. ํ๋ค์ค์ ๊ฒฝ์ฐ isnull().sum()
๊ณผ ๊ฐ์ ๋งค์ฐ ํธํ ๋ฉ์๋๊ฐ ์์ง๋ง, ์คํํฌ์์๋ ์ด๋ ๊ฒ ํ ์ ์๋ค. ํน์ ์นผ๋ผ๋ง๋ค ๊ฒฐ์ธก์น๊ฐ ์๋์ง ์๋์ง ์ฒดํฌํ๋ ๋ฉ์๋์ธ isNull()
๋ฉ์๋ ๋๋ isnan()
๋ฉ์๋๋ฅผ ํ์ฉํด์ผ ํ๋ค. ๋จผ์ isNull()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ์๋์ ๊ฐ๋ค.
spark_df.filter(col("Age").isNull()).show()
๋ฐ๋ฉด์, isnan()
๋ฉ์๋๋ pyspark.sql.functions
ํด๋์ค์์ ์ํฌํธํด์ ์๋์ฒ๋ผ ์ฌ์ฉํด์ผ ํ๋ค.
from pyspark.sql.functions import isnan
spark_df.where(isnan(col("Age"))).show()
๊ทธ๋ฐ๋ฐ, ์ฌ๊ธฐ์ ์คํํฌ์์ Null
๊ณผ NaN
๊ฐ ๋ ๋ค ๊ฒฐ์ธก์น์ธ์ง ์๋๋ฉด Null
๋ง ๊ฒฐ์ธก์น์ธ์ง ํท๊ฐ๋ฆด ์ ์๋ค. ๊ฒฐ๋ก ์ ์ต์ ์คํํฌ ๋ฒ์ ์์๋ Null
๊ฐ๋ง์ ๊ฒฐ์ธก์น๋ก ๊ฐ์ฃผํ๋ฏ๋ก NaN
๊ฐ์ ๋ง์ฃผํ๊ฒ ๋๋ฉด ์์ฐ์ค๋ Null
๊ฐ์ผ๋ก ๋ฐ๊พธ์ด๋ฒ๋ฆฐ๋ค.
NaN
์ ๊ตฌ์ฒด์ ์ผ๋ก Not a Number์ ์ค์๋ง๋ก, ํ์ด์ฌ์ None
ํ์
์ ๋ํ์ด ํํ๋ก ๋ง๋ค์ด ๋ฒ๋ฆฌ๋ฉด NaN
๊ฐ์ด ๋๋ฒ๋ฆฐ๋ค.
import numpy as np
import pandas as pd
data = {"name": ["jo", "lee"],
"age": [28, None]}
pd.DataFrame(data)
์ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด None
๊ฐ์ด NaN
๊ฐ์ผ๋ก ํ๋ค์ค ๋ฐ์ดํฐํ๋ ์์ ๋ค์ด๊ฐ ์๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
๋ค์์ ๊ฐ ์นผ๋ผ๋ณ๋ก ๊ฒฐ์ธก์น ๊ฐ์๊ฐ ๋ช ๊ฐ๊ฐ ์กด์ฌํ๋์ง ํ์ธํ๋ ๋ฐฉ๋ฒ์ด๋ค. ์ด ๋, ํ์ฉํ๋ ๋ฉ์๋๋ ํ ๊ฐ์๋ฅผ ์ธ๋ count()
์ SQL์์์ CASE WHEN ์ญํ ์ ํ๋ when()
๋ฉ์๋์ด๋ค.
from pyspark.sql.functions import count, when
cond = [count(when(col(c).isNull(), c)).alias(f"{c}_null_cnt") for c in spark_df.columns]
spark_df.select(cond).show()
์๋ count()
๋ฉ์๋๋ Null
์ธ Row๋ฅผ ์นด์ดํธํ์ง ์๋ ๊ฒ์ด ์ ์์ด๋ค. ๊ทธ๋ฐ๋ฐ ์์์ ๊ฒฐ์ธก์น ๊ฐ์๋ฅผ ์ด๋ป๊ฒ ์
์ ์์๊น ํ๋ ์๋ฌธ์ด ๋ค ์ ์๋ค.(๊ทธ๋์ ํ์๊ฐ ์ง๋ฌธ์ ํ๊ธฐ๋ ํ๋ค..) ์ด์ ๋ when()
๋ฉ์๋์ ํน์ฑ ๋๋ฌธ์ธ๋ฐ, when()
๋ฉ์๋ ์์์ c
๋ผ๋ ์นผ๋ผ์ด ๊ฒฐ์ธก์น ์ผ ๋, c
๋ผ๋ ๊ฐ์ผ๋ก ๋์ฒดํ๋ผ๋ ์๋ฏธ์ธ๋ฐ, ์ด ๋์ฒดํ๋ c
๋ผ๋ ๊ฐ์ด ์นผ๋ผ c
์ ๋ค์ด์๋ ๊ฐ(value)์ ์๋ฏธํ๋ ๊ฒ์ด ์๋ ์นผ๋ผ ์ด๋ฆ c
๋ฌธ์์ด ์์ฒด ๊ฐ์ ์๋ฏธํ๋ค. ๋ฐ๋ผ์ when()
๋ฉ์๋๋ ์ค์ง์ ์ผ๋ก c
๋ผ๋ ์นผ๋ผ์ ๊ฒฐ์ธก์น๊ฐ ์๋ ๊ฐ๋ค์ 'c'
๋ฌธ์์ด๋ก ๊ฐ์ฃผํ ํ, ๋ฌธ์์ด์ด c
์ธ Row ๊ฐ์๋ฅผ ์
์ผ๋ก์จ ๊ฒฐ์ธก์น ๊ฐ์๋ฅผ ์
์ ์๊ฒ ๋๋ ๊ฒ์ด๋ค.
7. ๊ฒฐ์ธก์น ๋์ฒดํ๊ธฐ
๋ค์์ ๊ฒฐ์ธก์น๋ฅผ ์ด๋ค ๊ฐ์ผ๋ก ๋์ฒดํ๋ ๋ฐฉ๋ฒ์ด๋ค. ํฌ๊ฒ fillna()
๋ฉ์๋์ na.fill()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์๋ค. ๋ ๋ฉ์๋ ๋ชจ๋ value
์ธ์์ ๋์ฒดํ ๊ฐ์ ๋ฃ์ด์ค ์ ์๊ณ subset
์ธ์๋ก ๊ฒฐ์ธก์น๋ฅผ ๋์ฒดํ ์นผ๋ผ์ด ๋ฌด์์ธ์ง ๋ฃ์ด์ค ์ ์๋ค. ์ธ์๋ก ์๋ฌด๊ฒ๋ ๋ฃ์ง ์์ผ๋ฉด ๋ชจ๋ ์นผ๋ผ์ ๊ฒฐ์ธก์น์ ๋์ฒดํ ๊ฐ์ ๋ฃ์ด์ค๋ค.
์ฐธ๊ณ ๋ก value
์ธ์์ ์ ์ํ์ผ๋ก ๋ฃ์ด์ฃผ๋ฉด ์ ์ํ ํ์
์ ๊ฐ๊ณ ์๋ ์นผ๋ผ๋ค์ ๊ฒฐ์ธก์น๋ฅผ ์์์ ์ฐพ์์ ๋์ฒดํด์ฃผ๊ณ value
์ธ์์ ๋ฌธ์์ด์ ๋ฃ์ด์ฃผ๋ฉด ๋ฌธ์์ด ํ์
์ ๊ฐ๊ณ ์๋ ์นผ๋ผ๋ค์ ๊ฒฐ์ธก์น๋ฅผ ์์์ ์ฐพ์ ๋์ฒดํด์ค๋ค.
from pyspark.sql.functions import avg
avg_age = spark_df.select(avg("Age")).first()[0]
spark_df.fillna(value=avg_age, subset="Age").show()
spark_df.na.fill(value=avg_age, subset=["Age"]).show()
๋ ๊ฒฐ์ธก์น๊ฐ ์กด์ฌํ๋ ์นผ๋ผ์ ๋ง๊ฒ ์๋ก ๋ค๋ฅธ ๋์ฒด๊ฐ์ ๋ถ์ฌํ๋ ค๋ฉด ๋์ ๋๋ฆฌ ํํ๋ก ์ ์ํด์ ๋ถ์ฌํด์ค๋ ๋๋ค.
from pyspark.sql.functions import avg
avg_age = spark_df.select(avg("Age")).first()[0]
embarked = "ZZZ"
dictionary = {"Age": avg_age, "Embarked": embarked}
spark_df.fillna(value=dictionary).show()
8. ์ผ๋ฐ UDF๋ฅผ ์คํํฌ UDF๋ก ๋ณํ ํ ์ ์ฉํ๊ธฐ
์ผ๋ฐ UDF๋, ์ด๋ ค์ด ๊ฒ์ด ์๋๋ผ ์ฐ๋ฆฌ๊ฐ ์ผ๋ฐ์ ์ผ๋ก ํ์ด์ฌ์ผ๋ก ์ ์ํ๋ '์ผ๋ฐ' ํจ์๋ฅผ ์๋ฏธํ๋ค. ํ๋ค์ค์์๋ ์ด๋ ๊ฒ ์ผ๋ฐ UDF๋ฅผ ํ์ฉํด์ ํ๋ค์ค์ ๋ฐ์ดํฐํ๋ ์ ๋๋ Series์ apply
,lambda
ํจ์๋ฅผ ํ์ฉํด์ ์ ์ฉํด ๋ณธ ์ ์ด ์๋ค. ์ด์ ๋น์ทํ๊ฒ ์คํํฌ์์๋ UDF๋ฅผ ํ์ฉํ ์ ์๋๋ฐ, ์ฌ์ฉ ๋ฐฉ๋ฒ์ ์๋์ ๊ฐ๋ค.
์ฐ์ , ์ผ๋ฐ UDF ํจ์๋ฅผ ์ ์ํ์. ํด๋น ์์์์๋ ์๋์ UDF๋ฅผ ์ฌ์ฉํ๋ ค ํ๋ค.
def change_gender(gender):
if gender is None:
return None
if gender == 'male':
return 'super_male'
else:
return 'super_female'
๋ค์์ udf ๋ฉ์๋๋ฅผ ๋ฐ๋ก ์ํฌํธํด์ ์ผ๋ฐ UDF๋ฅผ ์คํํฌ์ฉ UDF๋ก ๋ณํํด์ฃผ์ด์ผ ํ๋ค. udf()
๋ฉ์๋๋ฅผ ํ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค. udf(lambda x: ์ผ๋ฐ_udf, return_type)
์ธ๋ฐ, ๊ฐ์ธ์ ์ผ๋ก ํน์ดํ๊ฒ ๋์ ์ธ์ด์ธ ํ์ด์ฌ์์ ๋ฏธ๋ฆฌ ํจ์ ๋ฐํ ํ์
์ ์ ์ํด์ฃผ๋ ์ ์ด ํน์ดํ๋ค๊ณ ๋๊ผ๋ค.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# spark UDF๋ก ๋ณํ
spark_udf = udf(lambda x: change_gender(x), StringType())
# spark UDF๋ฅผ spark dataframe์ ์ ์ฉ
spark_df.withColumn("New_Gender", spark_udf(col("Sex"))).show()