๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] Spark์˜ Dataframe API๋ฅผ ์•Œ์•„๋ณด์ž!(2)

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ํ•ด๋‹น ํฌ์ŠคํŒ…์€ ์ŠคํŒŒํฌ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ์ฑ…๊ณผ ์ธํ”„๋Ÿฐ์˜ ์ŠคํŒŒํฌ ๋จธ์‹ ๋Ÿฌ๋‹ ์™„๋ฒฝ ๊ฐ€์ด๋“œ ๊ฐ•์˜๋กœ ๊ณต๋ถ€ํ•œ ํ›„ ๋ฐฐ์šด ๋‚ด์šฉ์„ ์ €๋งŒ์˜ ๋ฐฉ์‹์œผ๋กœ ์žฌ๊ตฌ์„ฑํ•œ ๊ฒƒ์ž„์„ ์•Œ๋ฆฝ๋‹ˆ๋‹ค. ํŠนํžˆ, ์ฐธ๊ณ ํ•œ ์ธํ”„๋Ÿฐ ๊ฐ•์˜์˜ ๊ฐ•์˜ ์ž๋ฃŒ๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•˜์Œ์„ ํ•„ํžˆ ์•Œ๋ฆฝ๋‹ˆ๋‹ค!

 

Apache Spark๋ฅผ Python์œผ๋กœ ์ด์šฉํ•˜๋Š” PySpark์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž

 


์ €๋ฒˆ ํฌ์ŠคํŒ…์— ์ด์–ด PySpark์˜ dataframe API ์‚ฌ์šฉ ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์—ฐ์ด์–ด ์•Œ์•„๋ณด์ž.

1. ์กฐ๊ฑด์— ๋งž๋Š” ๋ฐ์ดํ„ฐ๋งŒ ์ถ”์ถœํ•˜๊ธฐ

ํŠน์ • ์กฐ๊ฑด์— ๋งž๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•˜๋Š” ๋ฉ”์†Œ๋“œ๋กœ๋Š” filter() ๋ฉ”์†Œ๋“œ๊ฐ€ ์กด์žฌํ•œ๋‹ค. ์ด ๋•Œ, filter() ๋ฉ”์†Œ๋“œ์—๋Š” ์ŠคํŒŒํฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ Column Type ๊ฐ์ฒด๋ฅผ ๋„ฃ์–ด์ฃผ์–ด์•ผ ํ•œ๋‹ค. ์—ฌ๊ธฐ์„œ Column Type ๊ฐ์ฒด๋ž€๊ฒŒ ์ž˜ ์™€๋‹ฟ์ง€ ์•Š์„ ํ…๋ฐ, ์šฐ๋ฆฌ๊ฐ€ ์ €๋ฒˆ ์‹œ๊ฐ„์— select() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•ด์„œ ํŠน์ • ์นผ๋Ÿผ์„ ์ถ”์ถœํ•  ๋•Œ, ์•„๋ž˜์™€ ๊ฐ™์€ 5๊ฐ€์ง€ ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉํ•œ๋‹ค๊ณ  ํ–ˆ๋‹ค.

 

spark_df.select('Age').limit(3).show()
spark_df.select(['Age']).limit(3).show()
spark_df.select(spark_df['Age']).limit(3).show()
spark_df.select(spark_df.Age).limit(3).show()
spark_df.select(col('Age')).limit(3).show()

 

์œ„ ๋ฐฉ๋ฒ•๋“ค ์ค‘ 1,2๋ฒˆ์งธ ๋ฐฉ๋ฒ•์€ Column Type ๊ฐ์ฒด๊ฐ€ ์•„๋‹ˆ๊ณ  3,4,5 ๋ฒˆ์งธ ๋ฐฉ๋ฒ•์€ Column Type ๊ฐ์ฒด๋ฅผ ์˜๋ฏธํ•œ๋‹ค. ํ˜น์‹œ ๋ชจ๋ฅด๋‹ˆ 3,4,5 ๋ฒˆ์งธ ๋ฐฉ๋ฒ•์„ ๋”ฐ๋กœ ์ฝ”๋“œ๋กœ type์„ ํ™•์ธํ•ด๋ณด๋ฉด ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

 

๊ทธ๋ฆผ์„ ๋ณด๋ฉด pyspark.sql.column.Column ์ด๋ผ๋Š” ํƒ€์ž…์ด๋ผ๊ณ  ๋‚˜์˜จ๋‹ค. ๋‹ค์‹œ filter() ๋ฉ”์†Œ๋“œ๋กœ ๋Œ์•„์™€์„œ, filter() ๋ฉ”์†Œ๋“œ ์•ˆ์—๋Š” ์œ„์ฒ˜๋Ÿผ Column ํƒ€์ž…์˜ ํ˜•ํƒœ๋กœ ์กฐ๊ฑด์„ ๋ช…์‹œํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค. ์•„๋ž˜์ฒ˜๋Ÿผ ๋ง์ด๋‹ค.

 

path = '/Users/younghun/Desktop/gitrepo/data/titanic/train.csv'

spark_df = spark.read.csv(path, header=True, inferSchema=True)

# filter ๋ฉ”์†Œ๋“œ์•ˆ์—๋Š” Column type์„ ๋„ฃ์–ด์ฃผ์–ด์•ผ ํ•จ!
spark_df.filter(col('Age') > 25).limit(5).show()
spark_df.filter(spark_df['Age'] > 25).limit(5).show()
spark_df.filter(spark_df.Age > 25).limit(5).show()

# error ๋ฐœ์ƒ!
spark_df.filter('Age' > 25).limit(5).show()

 

๊ทธ๋ฆฌ๊ณ  ์œ„ filter() ๋ฉ”์†Œ๋“œ๋Š” where() ๋ฉ”์†Œ๋“œ๋กœ ๋˜‘๊ฐ™์ด ๋Œ€์ฒดํ•  ์ˆ˜ ์žˆ๋‹ค. ์—ฌ๊ธฐ์„œ where() ์€ ์‰ฝ๊ฒŒ ๋งํ•˜๋ฉด SQL์—์„œ ์ž์ฃผ ์‚ฌ์šฉํ•˜๋Š” WHERE ์กฐ๊ฑด๋ฌธ ๋•Œ์˜ ํ‚ค์›Œ๋“œ where() ์ด๋ผ๊ณ  ์ƒ๊ฐํ•˜๋ฉด ๋œ๋‹ค.

์ฐธ๊ณ ๋กœ filter(), where() ๋ฉ”์†Œ๋“œ ๋‘˜ ๋‹ค ์œ„์™€ ๊ฐ™์ด Column Type ๊ฐ์ฒด์ฒ˜๋Ÿผ ๋ช…์‹œํ•˜์ง€ ์•Š๊ณ  ์Œ๋”ฐ์˜ดํ‘œ๋กœ ๊ฐ์‹ธ์„œ SQL ์ฟผ๋ฆฌ ํ˜•ํƒœ๋กœ ๋ช…์‹œํ•ด์ฃผ์–ด๋„ ์ •์ƒ ๋™์ž‘ํ•œ๋‹ค. ์•„๋ž˜์ฒ˜๋Ÿผ ๋ง์ด๋‹ค.

 

spark_df.filter("Age > 25").limit(5).show()
spark_df.where("Age > 25").limit(5).show()

 

๊ทธ๋ฆฌ๊ณ  ์กฐ๊ฑด์„ ๋‘ ๊ฐ€์ง€ ์ด์ƒ ๋ช…์‹œํ•˜๋ฉด์„œ AND, OR ๊ณผ ๊ฐ™์€ ๋…ผ๋ฆฌ์—ฐ์‚ฐ์ž๋Š” ํŒ๋‹ค์Šค์ฒ˜๋Ÿผ ๊ฐ๊ฐ &, | ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค. ์‚ฌ์šฉ๋ฒ•์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

from pyspark.sql.functions import col

cond1 = col('Age') > 25
cond2 = col('Embarked') != 'S'

spark_df.filter(cond1 & cond2).limit(5).show()
spark_df.where(cond1 | cond2).limit(5).show()

 

์ถ”๊ฐ€์ ์œผ๋กœ Column Type ๊ฐ์ฒด๋ฅผ ๋ช…์‹œํ•˜๋ฉด์„œ SQL์˜ LIKE ๊ตฌ๋ฌธ ๊ฐ™์€ ๊ฒƒ๋“ค์„ ์ถ”๊ฐ€ ๋ฉ”์†Œ๋“œ๋กœ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

spark_df.filter(col('Name').like('%Mr.%')).limit(5).show()
spark_df.where("Name LIKE '%Mr.%'").limit(5).show()  # SQL ์ฟผ๋ฆฌ ํ˜•ํƒœ๋กœ ๋ช…์‹œ๋„ ๊ฐ€๋Šฅ!

2. ํŠน์ • ์นผ๋Ÿผ ๊ฐ’์„ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ ์ •๋ ฌํ•˜๊ธฐ

์ด๋ฒˆ์—” ์ŠคํŒŒํฌ์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์—์„œ ํŠน์ • ์นผ๋Ÿผ ๊ฐ’์— ๋”ฐ๋ผ ๋ฐ์ดํ„ฐ Row๋ฅผ ์˜ค๋ฆ„(๋˜๋Š” ๋‚ด๋ฆผ)์ฐจ์ˆœ ์ •๋ ฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณด์ž. ํฌ๊ฒŒ orderBy() ๋ฉ”์†Œ๋“œ, sort() ๋ฉ”์†Œ๋“œ๊ฐ€ ์žˆ๋Š”๋ฐ, ์‚ฌ์šฉ ๋ฐฉ๋ฒ•์€ ๋™์ผํ•˜๋‹ค. ๋จผ์ € orderBy() ๋ฉ”์†Œ๋“œ ์‚ฌ์šฉ ์˜ˆ์‹œ์ด๋‹ค. sort() ๋ฉ”์†Œ๋“œ๋Š” ์•„๋ž˜์˜ orderBy() ๋ฅผ ๋Œ€์ฒดํ•˜๊ธฐ๋งŒ ํ•˜๋ฉด ๋œ๋‹ค.

 

# orderBy -> ๋‹จ์ผ ์นผ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌ
spark_df.orderBy("PassengerId", ascending=False).limit(5).show()
spark_df.orderBy(col('PassengerId'), ascending=False).limit(5).show()
spark_df.orderBy(col('PassengerId').desc()).limit(5).show()

# orderBy -> ๋ณต์ˆ˜ ์นผ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ์ •๋ ฌ
spark_df.orderBy("PassengerId", "Age", ascending=[True, False]).limit(5).show()
spark_df.orderBy(col("PassengerId"), col("Age"), ascending=[True, False]).limit(5).show()
spark_df.orderBy(col("PassengerId").asc(), col("Age").desc()).limit(5).show()

 

์ฐธ๊ณ ๋กœ ํŒ๋‹ค์Šค์—์„œ๋„ sort_values() ๋ฉ”์†Œ๋“œ์—์„œ ์œ„์™€ ๊ฐ™์ด ๋ณต์ˆ˜ ์นผ๋Ÿผ ๊ธฐ์ค€์ผ ๋•Œ, ๊ฐ ์นผ๋Ÿผ๋งˆ๋‹ค ์˜ค๋ฆ„์ฐจ์ˆœ, ๋‚ด๋ฆผ์ฐจ์ˆœ ์ •๋ ฌํ•  ์ˆ˜ ์žˆ๋Š”์ง€๋Š” ์ฒ˜์Œ์•Œ์•˜๋‹ค. ์•„๋ž˜์ฒ˜๋Ÿผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

pandas_df.sort_values(by=['Pclass', 'Name'], ascending=[True, False]).head()

 

๊ทธ๋Ÿฌ๋ฉด ์ง€๊ธˆ๊นŒ์ง€ ๋ฐฐ์›Œ๋ณธ ๋‚ด์šฉ๋“ค์„ ์‚ด์ง ์‘์šฉํ•ด์„œ ์•„๋ž˜์™€ ๊ฐ™์€ ์ฝ”๋“œ๋ฅผ ์งœ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

 

select_cols = ["PassengerId", "Survived", "Name", "Age", "Sex"]
filter_cond = col("Age") > 35

spark_df.select(*select_cols) \
        .filter(filter_cond) \
        .orderBy("Age", "PassengerId", ascending=[False, True]) \
        .show()

3. ํŠน์ • ์นผ๋Ÿผ์œผ๋กœ ๋ฐ์ดํ„ฐ ๊ทธ๋ฃนํ•‘ํ•˜๊ธฐ

์ด๋ฒˆ์—” groupBy() ๋ผ๋Š” ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•ด์„œ ํŠน์ • ์นผ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋ฃนํ•‘์‹œ์ผœ๋ณด์ž. ํŒ๋‹ค์Šค์˜ groupby() ๋ฉ”์†Œ๋“œ์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ๊ทธ๋ฃนํ•‘ํ•  ์นผ๋Ÿผ์„ ์ง€์ •ํ•ด์ฃผ๊ณ  ๊ทธ๋ฆฌ๊ณ  ์ง‘๊ณ„ํ•  ํ•จ์ˆ˜์— ์ง‘๊ณ„ํ•  ์นผ๋Ÿผ์„ ๋ช…์‹œํ•ด์ฃผ๋ฉด ๋œ๋‹ค. ๋‹จ, ์ง‘๊ณ„ํ•  ์นผ๋Ÿผ์„ ๋ช…์‹œํ•ด์ค„ ๋•Œ๋Š” ๋ฌธ์ž์—ด ํ˜•ํƒœ๋กœ ์นผ๋Ÿผ ์ด๋ฆ„์„ ๋„ฃ์–ด์ฃผ๋„๋ก ํ•˜์ž. ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๋งŒ์•ฝ ์ง‘๊ณ„ํ•  ์นผ๋Ÿผ ์ด๋ฆ„์„ ๋ช…์‹œํ•ด์ฃผ์ง€ ์•Š์œผ๋ฉด ๋ชจ๋“  ์นผ๋Ÿผ์— ๋Œ€ํ•ด ์ง‘๊ณ„๋ฅผ ํ•ด๋ฒ„๋ฆฌ๊ฒŒ ๋œ๋‹ค.(๋‹จ, count() ๋ฉ”์†Œ๋“œ๋Š” ์นผ๋Ÿผ ์ด๋ฆ„์„ ๋ช…์‹œํ•ด์ฃผ์ง€ ์•Š์•„๋„ ํ–‰์˜ ๊ฐœ์ˆ˜๋งŒ ๋”ฑ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ๋˜์–ด ์žˆ๋‹ค)

 

spark_df.groupBy("Survived").max("Age").show()
spark_df.groupBy(col("Survived")).max("Age").show()
spark_df.groupBy(spark_df.Survived).max().show()  # ๋ชจ๋“  ์นผ๋Ÿผ์— ๋Œ€ํ•ด max ์ง‘๊ณ„ ๋จ

 

์ด๋ฒˆ์—” ์—ฌ๋Ÿฌ ์นผ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ๊ทธ๋ฃนํ•‘ํ•ด๋ณด์ž.

 

# ์—ฌ๋Ÿฌ ์นผ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ๊ทธ๋ฃนํ•‘
spark_df.groupBy("Sex", "Survived").max("Age").show()
spark_df.groupBy(["Sex", "Survived"]).max("Age").show()  # ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ๋กœ ๋„ฃ์–ด๋„ ์ •์ƒ๋™์ž‘ํ•จ

 

๊ทธ๋ ‡๋‹ค๋ฉด ์ง‘๊ณ„ํ•  ์นผ๋Ÿผ์ด ์—ฌ๋Ÿฌ๊ฐ€์ง€์ธ๋ฐ, ๊ฐ ์นผ๋Ÿผ๋งˆ๋‹ค ์„œ๋กœ ๋‹ค๋ฅธ ์ง‘๊ณ„ ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์‹ถ์„ ๋• ์–ด๋–ป๊ฒŒ ํ• ๊นŒ? ๊ทธ ๋•Œ๋Š” agg() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

spark_df.groupBy(col("Sex"), col("Survived")) \
        .agg(max("Age").alias("Max_Age"),
            min("Age").alias("Min_Age"),
            avg("Fare").alias("Avg_Fare")) \
        .show()

4. ํŠน์ • ์นผ๋Ÿผ์„ ๋‚ด ๋ง˜๋Œ€๋กœ ๊ฐ€์ง€๊ณ  ๋†€๊ธฐ

์ด๋ฒˆ์—๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์ด ๊ธฐ์กด์— ๊ฐ–๊ณ  ์žˆ๋˜ ์นผ๋Ÿผ(๋“ค)์„ ํ™œ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž. ๋Œ€ํ‘œ์ ์œผ๋กœ๋Š” withColumn() ์ด๋ผ๋Š” ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ• ํ…๋ฐ, ์ด ๋ฉ”์†Œ๋“œ๋Š” ํฌ๊ฒŒ 3๊ฐ€์ง€ ์—ญํ• ์„ ํ•œ๋‹ค. ์ฒซ ๋ฒˆ์งธ๋Š” ๊ธฐ์กด ์นผ๋Ÿผ์œผ๋กœ๋ถ€ํ„ฐ ์–ด๋– ํ•œ '๋ณ€ํ™˜'์„ ์ทจํ•ด์„œ ์ƒˆ๋กœ์šด ์นผ๋Ÿผ์„ ์ƒ์„ฑํ•˜๋Š” ๊ฒƒ, ๋‘ ๋ฒˆ์งธ๋Š” ๊ธฐ์กด ์นผ๋Ÿผ๊ฐ’์„ ๋ณ€๊ฒฝํ•˜๋Š” ๊ฒƒ, ์„ธ ๋ฒˆ์งธ๋Š” ๊ธฐ์กด ์นผ๋Ÿผ ๊ฐ’ ํƒ€์ž…์„ ๋ฐ”๊พธ๋Š” ๊ฒƒ์ด๋‹ค.

 

๊ทธ๋Ÿฐ๋ฐ withColumn() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์ƒˆ๋กญ๊ฒŒ ๋งŒ๋“ค ์นผ๋Ÿผ ์ด๋ฆ„ ์ฆ‰, ์ฒซ ๋ฒˆ์งธ ์ธ์ž์—๋Š” ๋ฌด์กฐ๊ฑด ๋ฌธ์ž์—ด ํ˜•ํƒœ๋กœ ์ž…๋ ฅํ•ด์ฃผ์–ด์•ผ ํ•˜๊ณ , ๊ธฐ์กด ์นผ๋Ÿผ์„ ๋„ฃ๋Š” ๋‘ ๋ฒˆ์งธ ์ธ์ž์—๋Š” ๋ฐ˜๋“œ์‹œ col('column') ๋˜๋Š” df['column'] , df.column ๊ณผ ๊ฐ™์ด Column Type ํ˜•ํƒœ๋กœ ๋„ฃ์–ด์ฃผ์–ด์•ผ ํ•จ์„ ์žŠ์ง€ ๋ง์ž.

 

๊ฐ€์žฅ ๋จผ์ € ์ƒˆ๋กœ์šด ์นผ๋Ÿผ์„ ์ƒ์„ฑํ•ด๋ณด๋„๋ก ํ•˜์ž.

 

from pyspark.sql.functions import avg

# ์ŠคํŒŒํฌ์—์„œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๋ณต์‚ฌ๋ณธ์„ ๋งŒ๋“ค๊ธฐ ์œ„ํ•ด์„œ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ์ˆ˜ํ–‰
copy_df = spark_df.select("*")

# Age์—๋‹ค๊ฐ€ ํ‰๊ท ๋‚˜์ด๋ฅผ ๋”ํ•ด๋ณด์ž
avg_age = copy_df.select(avg(col("Age"))).first()[0]
new_age_df = copy_df.withColumn("New_Age", col("Age") + avg_age)
new_age_df.limit(5).show()

 

๋‘ ๋ฒˆ์งธ๋Š” ๊ธฐ์กด ์นผ๋Ÿผ๊ฐ’์„ ๋ณ€๊ฒฝํ•˜๋Š” ๋ฐฉ๋ฒ•์ธ๋ฐ, ์ฒ˜์Œ์—” ์•„๋ž˜์™€ ๊ฐ™์ด ๋™์ž‘ํ•  ๊ฑฐ๋ผ๊ณ  ์ƒ๊ฐํ•  ์ˆ˜ ์žˆ๋‹ค. ํ•˜์ง€๋งŒ ์•„๋ž˜ ์ฝ”๋“œ๋Š” ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

 

copy_df = spark_df.withColumn("Age", 0)
copy_df.show()

 

์ด ๋•Œ, ์—๋Ÿฌ ๋ฉ”์„ธ์ง€๊ฐ€ col should be Column ์ด๋ผ๊ณ  ๋‚˜์˜ค๊ฒŒ ๋˜๋Š”๋ฐ, ์ด๊ฒƒ์€ ์œ„์—์„œ 0 ๊ฐ’์„ Column Type ์œผ๋กœ ๋งŒ๋“ค์–ด์ฃผ์–ด์•ผ ํ•œ๋‹ค๋Š” ์˜๋ฏธ์ด๋‹ค. ์ด๋ ‡๊ฒŒ ์ƒ์ˆ˜๊ฐ’ ๋˜๋Š” ๋ฌธ์ž์—ด์„ Column Type ์œผ๋กœ ๋งŒ๋“ค์–ด์ฃผ๊ธฐ ์œ„ํ•ด์„œ๋Š” pyspark.sql.functions ์˜ lit ์ด๋ผ๋Š” ๋ฉ”์†Œ๋“œ๋ฅผ ์ž„ํฌํŠธํ•ด์„œ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค. ์—ฌ๊ธฐ์„œ lit ์€ literal ์ด๋ผ๋Š” ๋‹จ์–ด์˜ ์ค„์ž„๋ง์ด๋‹ค.

 

from pyspark.sql.functions import lit

copy_df = spark_df.withColumn("Age", lit(0))
copy_df.show()

 

๊ทธ ์ด์™ธ์— ์นผ๋Ÿผ ๊ฐ’์ด ๋ฌธ์ž์—ด์ผ ๋•Œ, ํ•ด๋‹น ๋ฌธ์ž์—ด ์ผ๋ถ€๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” substring ๋ฉ”์†Œ๋“œ๋‚˜ ํŠน์ • ๊ตฌ๋ถ„์ž๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ชผ๊ฐœ์„œ ๊ฐ€์ ธ์˜ค๋Š” split ๋ฉ”์†Œ๋“œ๋„ ์กด์žฌํ•œ๋‹ค.

 

๋‹ค์Œ์€ ํŠน์ • ์นผ๋Ÿผ์˜ ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ๋ณ€๊ฒฝํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค.

 

copy_df = spark_df.withColumn("Age", col("Age").cast("integer"))
copy_df.show()

5. ํŠน์ • ์นผ๋Ÿผ ๋˜๋Š” Row๋ฅผ ์‚ญ์ œํ•˜๊ธฐ

ํŠน์ • ์นผ๋Ÿผ์„ ์‚ญ์ œํ•˜๋Š” ๋ฐฉ๋ฒ•์€ drop() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•˜๋ฉด ๋˜๋ฏ€๋กœ ๊ฐ„๋‹จํ•˜๋‹ค. ์นผ๋Ÿผ๋ช…์€ ๋ฌธ์ž์—ด ํ˜•ํƒœ๋กœ ๋„ฃ์–ด์ฃผ๊ฑฐ๋‚˜ Column Type ํ˜•ํƒœ๋กœ ๋ช…์‹œํ•ด์ฃผ์–ด๋„ ๋œ๋‹ค. ํ•˜์ง€๋งŒ ํ•œ ๋ฒˆ์— ์—ฌ๋Ÿฌ๊ฐœ์˜ ์นผ๋Ÿผ์„ ์‚ญ์ œํ•  ๋•Œ๋Š” ์—ฌ๋Ÿฌ ์นผ๋Ÿผ์„ ๋ฌธ์ž์—ด ํ˜•ํƒœ๋กœ ์—ด๊ฑฐํ•˜๋Š” ์‹์œผ๋กœ ํ•ด์ฃผ์–ด์•ผ๋งŒ ํ•œ๋‹ค. ๋ฆฌ์ŠคํŠธ์— ๋„ฃ์–ด์ฃผ๊ฑฐ๋‚˜ ๋˜๋Š” Column Type ํ˜•ํƒœ๋กœ ๋ช…์‹œํ•ด์ฃผ๋ฉด ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

 

# ํŠน์ • ์นผ๋Ÿผ์„ ์‚ญ์ œํ•˜๊ธฐ
spark_df.drop("Pclass").show()
spark_df.drop(col("Pclass")).show()

# ํ•œ ๋ฒˆ์— ์—ฌ๋Ÿฌ๊ฐœ ์นผ๋Ÿผ ์‚ญ์ œํ•˜๊ธฐ
spark_df.drop("Pclass", "Name").show()

# ์•„๋ž˜์˜ ๋‘ ๊ฒฝ์šฐ๋Š” error ๋ฐœ์ƒ
spark_df.drop(["Pclass", "Name"]).show()  
spark_df.drop(col("Pclass"), col("Name")).show()

 

๋‹ค์Œ์€ ๊ฒฐ์ธก์น˜๊ฐ€ ์žˆ๋Š” Row๋ฅผ ์‚ญ์ œํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž. ๋Œ€ํ‘œ์ ์œผ๋กœ dropna() ๋ฉ”์†Œ๋“œ์™€ na.drop() ๋ฉ”์†Œ๋“œ๊ฐ€ ์กด์žฌํ•œ๋‹ค. ๋‘ ๋ฉ”์†Œ๋“œ ๋ชจ๋‘ subset ์ด๋ผ๋Š” ์ธ์ž์— ๋ฌธ์ž์—ด(ํ•˜๋‚˜์˜ ์นผ๋Ÿผ์ผ ๊ฒฝ์šฐ), ํŠœํ”Œ ๋˜๋Š” ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ๋กœ ๋„ฃ์–ด์ค„ ์ˆ˜ ์žˆ๋Š”๋ฐ, ์ด subset ์€ Row๋ฅผ ์‚ญ์ œํ•  ๋•Œ, ์–ด๋Š ์นผ๋Ÿผ์— ๊ฒฐ์ธก์น˜๊ฐ€ ์žˆ๋Š” Row๋ฅผ ์‚ญ์ œํ• ์ง€๋ฅผ ๊ฒฐ์ •ํ•˜๋Š” ๊ธฐ์ค€์ด ๋œ๋‹ค.

 

๋งŒ์ผ ์•„๋ž˜์™€ ๊ฐ™์€ ์ฝ”๋“œ๊ฐ€ ์žˆ๋‹ค๋ฉด, Age, Embarked ์นผ๋Ÿผ ๊ฐ’์ด ๋ชจ๋‘ ๊ฒฐ์ธก์น˜์ธ Row๋“ค๋งŒ ์ œ๊ฑฐํ•˜๋ผ๋Š” ์˜๋ฏธ์ด๋‹ค.

 

spark_df.dropna(subset=["Age", "Embarked"]).show()

6. ๊ฒฐ์ธก์น˜๊ฐ€ ์žˆ๋Š” Row๋“ค ์ฒดํฌํ•˜๊ธฐ

๋‹ค์Œ์€ ๊ฒฐ์ธก์น˜๊ฐ€ ์žˆ๋Š” Row๋“ค์„ ์ฒดํฌํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ํŒ๋‹ค์Šค์˜ ๊ฒฝ์šฐ isnull().sum() ๊ณผ ๊ฐ™์€ ๋งค์šฐ ํŽธํ•œ ๋ฉ”์†Œ๋“œ๊ฐ€ ์žˆ์ง€๋งŒ, ์ŠคํŒŒํฌ์—์„œ๋Š” ์ด๋ ‡๊ฒŒ ํ•  ์ˆ˜ ์—†๋‹ค. ํŠน์ • ์นผ๋Ÿผ๋งˆ๋‹ค ๊ฒฐ์ธก์น˜๊ฐ€ ์žˆ๋Š”์ง€ ์—†๋Š”์ง€ ์ฒดํฌํ•˜๋Š” ๋ฉ”์†Œ๋“œ์ธ isNull() ๋ฉ”์†Œ๋“œ ๋˜๋Š” isnan() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•ด์•ผ ํ•œ๋‹ค. ๋จผ์ € isNull() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

spark_df.filter(col("Age").isNull()).show()

 

๋ฐ˜๋ฉด์—, isnan() ๋ฉ”์†Œ๋“œ๋Š” pyspark.sql.functions ํด๋ž˜์Šค์—์„œ ์ž„ํฌํŠธํ•ด์„œ ์•„๋ž˜์ฒ˜๋Ÿผ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

 

from pyspark.sql.functions import isnan

spark_df.where(isnan(col("Age"))).show()

 

๊ทธ๋Ÿฐ๋ฐ, ์—ฌ๊ธฐ์„œ ์ŠคํŒŒํฌ์—์„œ Null ๊ณผ NaN ๊ฐ’ ๋‘˜ ๋‹ค ๊ฒฐ์ธก์น˜์ธ์ง€ ์•„๋‹ˆ๋ฉด Null๋งŒ ๊ฒฐ์ธก์น˜์ธ์ง€ ํ—ท๊ฐˆ๋ฆด ์ˆ˜ ์žˆ๋‹ค. ๊ฒฐ๋ก ์€ ์ตœ์‹  ์ŠคํŒŒํฌ ๋ฒ„์ „์—์„œ๋Š” Null ๊ฐ’๋งŒ์„ ๊ฒฐ์ธก์น˜๋กœ ๊ฐ„์ฃผํ•˜๋ฏ€๋กœ NaN ๊ฐ’์„ ๋งˆ์ฃผํ•˜๊ฒŒ ๋˜๋ฉด ์ž์—ฐ์Šค๋ ˆ Null ๊ฐ’์œผ๋กœ ๋ฐ”๊พธ์–ด๋ฒ„๋ฆฐ๋‹ค.

 

NaN์€ ๊ตฌ์ฒด์ ์œผ๋กœ Not a Number์˜ ์ค„์ž„๋ง๋กœ, ํŒŒ์ด์ฌ์˜ None ํƒ€์ž…์„ ๋„˜ํŒŒ์ด ํ˜•ํƒœ๋กœ ๋งŒ๋“ค์–ด ๋ฒ„๋ฆฌ๋ฉด NaN ๊ฐ’์ด ๋˜๋ฒ„๋ฆฐ๋‹ค.

 

import numpy as np
import pandas as pd

data = {"name": ["jo", "lee"],
       "age": [28, None]}
pd.DataFrame(data)

 

์œ„ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด None ๊ฐ’์ด NaN ๊ฐ’์œผ๋กœ ํŒ๋‹ค์Šค ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ๋“ค์–ด๊ฐ€ ์žˆ๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

 

๋‹ค์Œ์€ ๊ฐ ์นผ๋Ÿผ๋ณ„๋กœ ๊ฒฐ์ธก์น˜ ๊ฐœ์ˆ˜๊ฐ€ ๋ช‡ ๊ฐœ๊ฐ€ ์กด์žฌํ•˜๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ์ด ๋•Œ, ํ™œ์šฉํ•˜๋Š” ๋ฉ”์†Œ๋“œ๋Š” ํ–‰ ๊ฐœ์ˆ˜๋ฅผ ์„ธ๋Š” count() ์™€ SQL์—์„œ์˜ CASE WHEN ์—ญํ• ์„ ํ•˜๋Š” when() ๋ฉ”์†Œ๋“œ์ด๋‹ค.

 

from pyspark.sql.functions import count, when

cond = [count(when(col(c).isNull(), c)).alias(f"{c}_null_cnt") for c in spark_df.columns]
spark_df.select(cond).show()

 

์›๋ž˜ count() ๋ฉ”์†Œ๋“œ๋Š” Null์ธ Row๋ฅผ ์นด์šดํŠธํ•˜์ง€ ์•Š๋Š” ๊ฒƒ์ด ์ •์„์ด๋‹ค. ๊ทธ๋Ÿฐ๋ฐ ์œ„์—์„œ ๊ฒฐ์ธก์น˜ ๊ฐœ์ˆ˜๋ฅผ ์–ด๋–ป๊ฒŒ ์…€ ์ˆ˜ ์žˆ์„๊นŒ ํ•˜๋Š” ์˜๋ฌธ์ด ๋“ค ์ˆ˜ ์žˆ๋‹ค.(๊ทธ๋ž˜์„œ ํ•„์ž๊ฐ€ ์งˆ๋ฌธ์„ ํ•˜๊ธฐ๋„ ํ–ˆ๋‹ค..) ์ด์œ ๋Š” when() ๋ฉ”์†Œ๋“œ์˜ ํŠน์„ฑ ๋•Œ๋ฌธ์ธ๋ฐ, when() ๋ฉ”์†Œ๋“œ ์•ˆ์—์„œ c ๋ผ๋Š” ์นผ๋Ÿผ์ด ๊ฒฐ์ธก์น˜ ์ผ ๋•Œ, c ๋ผ๋Š” ๊ฐ’์œผ๋กœ ๋Œ€์ฒดํ•˜๋ผ๋Š” ์˜๋ฏธ์ธ๋ฐ, ์ด ๋Œ€์ฒดํ•˜๋Š” c ๋ผ๋Š” ๊ฐ’์ด ์นผ๋Ÿผ c ์— ๋“ค์–ด์žˆ๋Š” ๊ฐ’(value)์„ ์˜๋ฏธํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹Œ ์นผ๋Ÿผ ์ด๋ฆ„ c ๋ฌธ์ž์—ด ์ž์ฒด ๊ฐ’์„ ์˜๋ฏธํ•œ๋‹ค. ๋”ฐ๋ผ์„œ when() ๋ฉ”์†Œ๋“œ๋Š” ์‹ค์งˆ์ ์œผ๋กœ c ๋ผ๋Š” ์นผ๋Ÿผ์— ๊ฒฐ์ธก์น˜๊ฐ€ ์žˆ๋Š” ๊ฐ’๋“ค์„ 'c' ๋ฌธ์ž์—ด๋กœ ๊ฐ„์ฃผํ•œ ํ›„, ๋ฌธ์ž์—ด์ด c ์ธ Row ๊ฐœ์ˆ˜๋ฅผ ์…ˆ์œผ๋กœ์จ ๊ฒฐ์ธก์น˜ ๊ฐœ์ˆ˜๋ฅผ ์…€ ์ˆ˜ ์žˆ๊ฒŒ ๋˜๋Š” ๊ฒƒ์ด๋‹ค.

7. ๊ฒฐ์ธก์น˜ ๋Œ€์ฒดํ•˜๊ธฐ

๋‹ค์Œ์€ ๊ฒฐ์ธก์น˜๋ฅผ ์–ด๋–ค ๊ฐ’์œผ๋กœ ๋Œ€์ฒดํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค. ํฌ๊ฒŒ fillna() ๋ฉ”์†Œ๋“œ์™€ na.fill() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ๋‘ ๋ฉ”์†Œ๋“œ ๋ชจ๋‘ value ์ธ์ž์— ๋Œ€์ฒดํ•  ๊ฐ’์„ ๋„ฃ์–ด์ค„ ์ˆ˜ ์žˆ๊ณ  subset ์ธ์ž๋กœ ๊ฒฐ์ธก์น˜๋ฅผ ๋Œ€์ฒดํ•  ์นผ๋Ÿผ์ด ๋ฌด์—‡์ธ์ง€ ๋„ฃ์–ด์ค„ ์ˆ˜ ์žˆ๋‹ค. ์ธ์ž๋กœ ์•„๋ฌด๊ฒƒ๋„ ๋„ฃ์ง€ ์•Š์œผ๋ฉด ๋ชจ๋“  ์นผ๋Ÿผ์˜ ๊ฒฐ์ธก์น˜์— ๋Œ€์ฒดํ•  ๊ฐ’์„ ๋„ฃ์–ด์ค€๋‹ค.

 

์ฐธ๊ณ ๋กœ value ์ธ์ž์— ์ •์ˆ˜ํ˜•์œผ๋กœ ๋„ฃ์–ด์ฃผ๋ฉด ์ •์ˆ˜ํ˜• ํƒ€์ž…์„ ๊ฐ–๊ณ  ์žˆ๋Š” ์นผ๋Ÿผ๋“ค์˜ ๊ฒฐ์ธก์น˜๋ฅผ ์•Œ์•„์„œ ์ฐพ์•„์„œ ๋Œ€์ฒดํ•ด์ฃผ๊ณ  value ์ธ์ž์— ๋ฌธ์ž์—ด์„ ๋„ฃ์–ด์ฃผ๋ฉด ๋ฌธ์ž์—ด ํƒ€์ž…์„ ๊ฐ–๊ณ  ์žˆ๋Š” ์นผ๋Ÿผ๋“ค์˜ ๊ฒฐ์ธก์น˜๋ฅผ ์•Œ์•„์„œ ์ฐพ์•„ ๋Œ€์ฒดํ•ด์ค€๋‹ค.

 

from pyspark.sql.functions import avg

avg_age = spark_df.select(avg("Age")).first()[0]

spark_df.fillna(value=avg_age, subset="Age").show()
spark_df.na.fill(value=avg_age, subset=["Age"]).show()

 

๋˜ ๊ฒฐ์ธก์น˜๊ฐ€ ์กด์žฌํ•˜๋Š” ์นผ๋Ÿผ์— ๋งž๊ฒŒ ์„œ๋กœ ๋‹ค๋ฅธ ๋Œ€์ฒด๊ฐ’์„ ๋ถ€์—ฌํ•˜๋ ค๋ฉด ๋”•์…”๋„ˆ๋ฆฌ ํ˜•ํƒœ๋กœ ์ •์˜ํ•ด์„œ ๋ถ€์—ฌํ•ด์ค˜๋„ ๋œ๋‹ค.

 

from pyspark.sql.functions import avg

avg_age = spark_df.select(avg("Age")).first()[0]
embarked = "ZZZ"

dictionary = {"Age": avg_age, "Embarked": embarked}

spark_df.fillna(value=dictionary).show()

8. ์ผ๋ฐ˜ UDF๋ฅผ ์ŠคํŒŒํฌ UDF๋กœ ๋ณ€ํ™˜ ํ›„ ์ ์šฉํ•˜๊ธฐ

์ผ๋ฐ˜ UDF๋ž€, ์–ด๋ ค์šด ๊ฒƒ์ด ์•„๋‹ˆ๋ผ ์šฐ๋ฆฌ๊ฐ€ ์ผ๋ฐ˜์ ์œผ๋กœ ํŒŒ์ด์ฌ์œผ๋กœ ์ •์˜ํ•˜๋Š” '์ผ๋ฐ˜' ํ•จ์ˆ˜๋ฅผ ์˜๋ฏธํ•œ๋‹ค. ํŒ๋‹ค์Šค์—์„œ๋Š” ์ด๋ ‡๊ฒŒ ์ผ๋ฐ˜ UDF๋ฅผ ํ™œ์šฉํ•ด์„œ ํŒ๋‹ค์Šค์˜ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ๋˜๋Š” Series์— apply,lambda ํ•จ์ˆ˜๋ฅผ ํ™œ์šฉํ•ด์„œ ์ ์šฉํ•ด ๋ณธ ์ ์ด ์žˆ๋‹ค. ์ด์™€ ๋น„์Šทํ•˜๊ฒŒ ์ŠคํŒŒํฌ์—์„œ๋„ UDF๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ, ์‚ฌ์šฉ ๋ฐฉ๋ฒ•์€ ์•„๋ž˜์™€ ๊ฐ™๋‹ค.

 

์šฐ์„ , ์ผ๋ฐ˜ UDF ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•˜์ž. ํ•ด๋‹น ์˜ˆ์‹œ์—์„œ๋Š” ์•„๋ž˜์˜ UDF๋ฅผ ์‚ฌ์šฉํ•˜๋ ค ํ•œ๋‹ค.

 

def change_gender(gender):
    if gender is None:
        return None
    if gender == 'male':
        return 'super_male'
    else:
        return 'super_female'

 

๋‹ค์Œ์€ udf ๋ฉ”์†Œ๋“œ๋ฅผ ๋”ฐ๋กœ ์ž„ํฌํŠธํ•ด์„œ ์ผ๋ฐ˜ UDF๋ฅผ ์ŠคํŒŒํฌ์šฉ UDF๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค. udf() ๋ฉ”์†Œ๋“œ๋ฅผ ํ™œ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค. udf(lambda x: ์ผ๋ฐ˜_udf, return_type) ์ธ๋ฐ, ๊ฐœ์ธ์ ์œผ๋กœ ํŠน์ดํ•˜๊ฒŒ ๋™์  ์–ธ์–ด์ธ ํŒŒ์ด์ฌ์—์„œ ๋ฏธ๋ฆฌ ํ•จ์ˆ˜ ๋ฐ˜ํ™˜ ํƒ€์ž…์„ ์ •์˜ํ•ด์ฃผ๋Š” ์ ์ด ํŠน์ดํ•˜๋‹ค๊ณ  ๋Š๊ผˆ๋‹ค.

 

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# spark UDF๋กœ ๋ณ€ํ™˜
spark_udf = udf(lambda x: change_gender(x), StringType())
# spark UDF๋ฅผ spark dataframe์— ์ ์šฉ
spark_df.withColumn("New_Gender", spark_udf(col("Sex"))).show()

 

๋ฐ˜์‘ํ˜•