๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] Spark SQL ํŠœํ† ๋ฆฌ์–ผ

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ๋ณธ ํฌ์ŠคํŒ…์€ Apache Spark 3.0.1 ๊ณต์‹ ๋ฌธ์„œ๋ฅผ ์ง์ ‘ ํ•ด์„ํ•˜์—ฌ ํ•„์ž๊ฐ€ ์ดํ•ดํ•œ ๋‚ด์šฉ์œผ๋กœ ์žฌ๊ตฌ์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค. ํ˜น์—ฌ๋‚˜ ์ปจํ…์ธ  ์ค‘ ํ‹€๋ฆฐ ๋‚ด์šฉ์ด ์žˆ๋‹ค๋ฉด ์ ๊ทน์ ์ธ ํ”ผ๋“œ๋ฐฑ์€ ํ™˜์˜์ž…๋‹ˆ๋‹ค! : )

 

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ ๋‹ค๋ฃจ์–ด ๋ณผ ์ปจํ…์ธ ๋Š” ๋ฐ”๋กœ Spark๋กœ SQL์„ ์ด์šฉํ•  ์ˆ˜ ์žˆ๋Š” Spark SQL์— ๋Œ€ํ•œ ๋‚ด์šฉ์ด๋‹ค. Spark๋Š” ์ด์ „ ํฌ์ŠคํŒ…์—์„œ๋„ ์–ธ๊ธ‰ํ–ˆ๋‹ค์‹œํ”ผ ๊ธฐ๋ณธ์ ์œผ๋กœ RDD ์ž๋ฃŒ๊ตฌ์กฐ๋ฅผ ๊ฐ–๊ณ  ์žˆ์ง€๋งŒ Python์˜ Pandas ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์—์„œ ์ œ๊ณตํ•˜๋Š” DataFrame๊ณผ ๋น„์Šทํ•œ ๊ตฌ์กฐ์˜ DataFrame์„ ์ง€์›ํ•˜๊ณ  ์žˆ๋‹ค.(์ด๋ฆ„๋„ DataFrame ์œผ๋กœ ๋™์ผํ•˜๋‹ค.) ์ถ”ํ›„์— ๋‹ค๋ฃจ๊ฒ ์ง€๋งŒ Spark๋Š” Spark์˜ DataFrame์„ Pandas์˜ DataFrame ํ˜•ํƒœ๋กœ ๋ฐ”๊พธ์–ด์ฃผ๋Š” ๊ฒƒ๋„ ์ง€์›ํ•œ๋‹ค.

 

https://techvidvan.com/tutorials/apache-spark-sql-dataframe/

 

๊ทธ๋Ÿผ ์ด์ œ Spark SQL์„ ํ†ตํ•ด ๊ฐ„๋‹จํ•œ ์‹ค์Šต์„ ํ•ด๋ณด์ž.

1. ๊ฐ€์žฅ ๋จผ์ € ํ•  ์ผ! SparkSession !

์ด์ „ ํฌ์ŠคํŒ…์—์„œ๋„ ๋งํ–ˆ๋‹ค์‹œํ”ผ ๊ฐ€์žฅ ๋จผ์ € ํ•ด์•ผํ•  ๊ฒƒ์€ SparkContext๋ผ๋Š” ์ŠคํŒŒํฌ ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค์–ด ์ฃผ์–ด์•ผ ํ•œ๋‹ค. SparkContext๋ฅผ ๋งŒ๋“ค์–ด ์ฃผ๊ธฐ ์œ„ํ•ด์„œ ์šฐ์„  SparkSession์„ ๋งŒ๋“ค์–ด ์ฃผ์ž. ๊ทธ๋ฆฌ๊ณ  json ํŒŒ์ผ ํ˜•์‹์œผ๋กœ ๋˜์–ด ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์„œ ๋ฐ์ดํ„ฐ์˜ ์Šคํ‚ค๋งˆ๋ฅผ ์ถœ๋ ฅ์‹œ์ผœ๋ณด์ž.

 

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName('Python Spark SQL basic example')\
        .config('spark.some.config.option', 'some-value')\
        .getOrCreate()

### Create json file using spark
# sparkContext๋กœ ๊ฐ์ฒด ์ƒ์„ฑ
sc = spark.sparkContext

# json ํŒŒ์ผ ์ฝ์–ด๋“ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
peopleDF = spark.read.json(path)

# printSchema()๋กœ jsonํŒŒ์ผ์˜ ์Šคํ‚ค๋งˆ ํ˜•ํƒœ ๋ณผ์ˆ˜ ์žˆ์Œ
peopleDF.printSchema()

 

์ฐธ๊ณ ๋กœ printSchema()๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์Šคํ‚ค๋งˆ ํ˜•ํƒœ๋กœ ๋ณด์—ฌ์ฃผ๋Š” ๊ฒƒ์ธ๋ฐ ๊ฒฐ๊ณผ ํ™”๋ฉด์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

printSchema() ์ถœ๋ ฅํ™”๋ฉด

2. SQL๋กœ DataFrame ํ˜•ํƒœ๋กœ ๋ฐ์ดํ„ฐ ์ถœ๋ ฅํ•ด๋ณด๊ธฐ

 

๋‹ค์Œ์€ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ๊ธฐ๋Šฅ์ค‘ 'View' ๋ผ๋Š” ๊ฐ€์ƒ์˜ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค์–ด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค.(๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๋Œ€ํ•ด ๋ฐฐ์šด ์ ์ด ์žˆ๋‹ค๋ฉด View ๊ธฐ๋Šฅ์— ๋Œ€ํ•ด์„œ ์ต์ˆ™ํ•  ๊ฒƒ์ด๋‹ค. ํ•„์ž๋„ ์ „๊ณต ์ˆ˜์—… ๋•Œ ๋ฐฐ์› ๋˜ ๊ฐ€๋ฌผ๊ฐ€๋ฌผํ•œ ๊ธฐ์–ต์ด...)

 

๊ฐ€์ƒ์˜ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค๊ธฐ ์œ„ํ•ด์„œ๋Š” createOrReplaceTempView("๊ฐ€์ƒ์˜ํ…Œ์ด๋ธ” ์ด๋ฆ„") ์„ ์ˆ˜ํ–‰ํ•˜๋ฉด ๋œ๋‹ค.

 

# ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์‚ฌ์šฉํ•˜๋Š” ์ž„์‹œ์˜ view(๊ฐ€์ƒ์˜ ํ…Œ์ด๋ธ”) ์ƒ์„ฑ
peopleDF.createOrReplaceTempView("people")

# spark์—์„œ ์ œ๊ณตํ•˜๋Š” sql ๋ฉ”์†Œ๋“œ๋ฅผ ์ด์šฉํ•ด ์ฟผ๋ฆฌ ๋‚ ๋ฆฌ๊ธฐ
# ์ฟผ๋ฆฌ๋ฌธ์—์„œ people ํ…Œ์ด๋ธ”์€ ์œ„์—์„œ ๋งŒ๋“ค์—ˆ๋˜ view ํ…Œ์ด๋ธ”์ž„!
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()

 

์ฐธ๊ณ ๋กœ ๋งŒ๋“ค์–ด์ง„ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋ณด๊ธฐ ์œ„ํ•ด์„œ๋Š” show() ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

๊ฒฐ๊ณผ ํ™”๋ฉด

 

View ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•˜๋Š” ๋ฉ”์†Œ๋“œ ์ค‘ Global Temporary View๋ผ๋Š” ๊ฒƒ๋„ ์žˆ๋‹ค. ์œ„์—์„œ ์‚ฌ์šฉํ–ˆ๋˜ ์ผ๋ฐ˜์ ์ธ Temporary View ๊ฐ€์ƒ ํ…Œ์ด๋ธ”์€ SaprkSession์ด ์ข…๋ฃŒ๋˜๋ฉด ์‚ญ์ œ๋œ๋‹ค. ํ•˜์ง€๋งŒ ๋ชจ๋“  SparkSession๋“ค ๊ฐ„์— View ๊ฐ€์ƒ ํ…Œ์ด๋ธ”์„ ๊ณต์œ ํ•˜๊ฒŒ ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด Global Temporary View๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค. ํ•œ ๊ฐ€์ง€ ์ฃผ์˜ํ•ด์•ผ ํ•  ์ ์ด ์žˆ๋Š”๋ฐ Global Temporary View ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค์–ด์ฃผ๊ณ  SQL๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœ ํ•  ๋•Œ ํ…Œ์ด๋ธ” ์ด๋ฆ„ ์•ž์— global\_temp ๋ผ๋Š” ํ‚ค์›Œ๋“œ๋ฅผ ๋ถ™์—ฌ์ค˜์•ผ ํ•จ์„ ์žŠ์ง€๋ง์ž!

 

# json ํŒŒ์ผ ์ฝ์–ด๋“ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)

# Global Temporary View ์ƒ์„ฑ
df.createOrReplaceGlobalTempView('people')

# 'global_temp' ๋ผ๋Š” ํ‚ค์›Œ๋“œ ๊ผญ ๋ถ™์—ฌ์ฃผ์ž!
sqlDF = spark.sql('SELECT * FROM global_temp.people')
sqlDF.show()

 

์ด๊ฑด ๋ถ€๊ฐ€์ ์ธ ๋‚ด์šฉ์ด์ง€๋งŒ json ํ˜•์‹์˜ ๋ฐ์ดํ„ฐ๋ฅผ RDD๋กœ ๋งŒ๋“ค์–ด ์ฃผ๊ณ  ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ์ฝ์–ด์˜ค๋Š” ๋ฐฉ์‹๋„ ์กด์žฌํ•œ๋‹ค.

 

# ๋˜ํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์€ RDD[String] ์ž๋ฃŒ๊ตฌ์กฐ๋ฅผ ์ด์šฉํ•ด์„œ json ๋ฐ์ดํ„ฐ์…‹์„ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Œ
jsonStrings = ['{"name": "Yin", "address":{"city":"Columbus", "state":"Ohio"}}']
# json -> RDDํ˜•์‹์œผ๋กœ ๋งŒ๋“ค๊ธฐ
otherPeopleRDD = sc.parallelize(jsonStrings)
# jsonํŒŒ์ผ ์ฝ์–ด์˜ค๊ธฐ
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

๊ฒฐ๊ณผ ํ™”๋ฉด

3. Spark์˜ DataFrame์˜ ์นผ๋Ÿผ์— ์ ‘๊ทผํ•ด๋ณด์ž!

DataFrame Operations ๋ผ๊ณ ๋„ ํ•˜๋ฉด Untyped Dataset Operations ๋ผ๊ณ ๋„ ํ•œ๋‹ค. Spark ๋‚ด๋ถ€์—์„œ Dataset ๊ณผ DataFrame์˜ ์ฐจ์ด๋ผ๊ณ  ํ•œ๋‹ค๋ฉด Typed/Untyped ์ฐจ์ด๋ผ๊ณ  ํ•œ๋‹ค. Typed dataset์€ dataset์œผ๋กœ ๋ฐ›์•„์˜ค๋Š” ๋ฐ์ดํ„ฐ์˜ ํ˜•ํƒœ๋ฅผ ๋ฏธ๋ฆฌ ์ •์˜ํ•ด ๋†“์€ ๊ฒƒ์ธ ๋ฐ˜๋ฉด Untyped dataset์€ ํ”„๋กœ๊ทธ๋žจ์ด ๋ฐ์ดํ„ฐ์˜ ํ˜•ํƒœ๋ฅผ ์ถ”์ธกํ•ด์„œ ๊ฐ€์ ธ์˜ค๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•œ๋‹ค. ์ด ๋‘ ๊ฐœ๋Š” ์—๋Ÿฌ๋ฅผ ์žก์•„๋‚ด๋Š” ์‹œ๊ฐ„ ์ธก๋ฉด์—์„œ๋„ ์ฐจ์ด์ ์ด ์กด์žฌํ•˜๋Š”๋ฐ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์—ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ•˜์ž.

 

ํ•ด๋‹น ํฌ์ŠคํŒ…์—์„œ๋Š” Dataset์ด ์•„๋‹Œ DataFrame์— ๋Œ€ํ•œ ์ฝ”๋“œ์ž„์„ ์•Œ์•„๋‘์ž. DataFrame์˜ ์นผ๋Ÿผ์— ์ ‘๊ทผํ•˜๊ธฐ ์œ„ํ•œ ๋ฐฉ๋ฒ•์€ ํฌ๊ฒŒ 2๊ฐ€์ง€๊ฐ€ ์กด์žฌํ•œ๋‹ค. df.column ๊ณผ df\['column'\] ์ด ์žˆ๋Š”๋ฐ ๋‘˜ ์ค‘ ํ›„์ž์˜ ๋ฐฉ๋ฒ•์„ ๋” ๋งŽ์ด ์‚ฌ์šฉํ•œ๋‹ค๊ณ  ํ•œ๋‹ค.(์ด๊ฒƒ๋„ Pandas์™€ ๋น„์Šทํ•œ ๋“ฏ ํ•˜๋‹ค!)

 

# json ํŒŒ์ผ ์ฝ์–ด๋“ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)

# name ์นผ๋Ÿผ select ํ•ด์„œ ์‚ดํŽด๋ณด๊ธฐ
df.select('name').show()

๊ฒฐ๊ณผ ํ™”๋ฉด

 

์ถ”๊ฐ€์ ์œผ๋กœ 2๊ฐœ ์ด์ƒ์˜ ์นผ๋Ÿผ์„ ์ถ”์ถœํ•˜๋ฉด์„œ ๊ธฐ์กด์˜ ์นผ๋Ÿผ์— ์—ฐ์‚ฐ์„ ๊ฐ€ํ•ด ํŒŒ์ƒ๋ณ€์ˆ˜๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ์ถ”์ถœํ•˜๋Š” ๊ฒƒ๋„ ๊ฐ€๋Šฅํ•˜๋‹ค.

 

# json ํŒŒ์ผ ์ฝ์–ด๋“ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)

# name ์นผ๋Ÿผ select ํ•ด์„œ ์‚ดํŽด๋ณด๊ธฐ
df.select(df['name'], df['age']+1).show()

๊ฒฐ๊ณผ ํ™”๋ฉด

 

๋‹ค์Œ์€ ํŠน์ • ์กฐ๊ฑด์„ ๋งŒ์กฑํ•˜๋Š” ๋ฐ์ดํ„ฐ๋งŒ ์ถ”์ถœํ•˜๋Š” filter() ์™€ ํŠน์ • ์นผ๋Ÿผ์œผ๋กœ ๊ทธ๋ฃนํ•‘์„ ์ˆ˜ํ–‰ํ•ด์ฃผ๋Š” groupBy() ๋ฉ”์†Œ๋“œ์— ๋Œ€ํ•œ ์˜ˆ์‹œ ์ฝ”๋“œ์ด๋‹ค.

 

# json ํŒŒ์ผ ์ฝ์–ด๋“ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)

# age๊ฐ€ 20๋ณด๋‹ค ํฐ ๋ฐ์ดํ„ฐ๋งŒ ์ถ”์ถœ
df.filter(df['age'] > 20).show()
# age ์นผ๋Ÿผ์œผ๋กœ ๊ทธ๋ฃนํ•‘ ํ•˜๊ณ  ๋ฐ์ดํ„ฐ์˜ ๊ฐœ์ˆ˜๋ฅผ ์ง‘๊ณ„ํ•ด์คŒ
df.groupBy('age').count().show()

4. DataFrame์˜ Schema๋ฅผ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์Šค๋Ÿฝ๊ฒŒ ๋ช…์‹œํ•ด๋ณด์ž!

๋กœ๋“œํ•  ๋ฐ์ดํ„ฐ csv, txt, excel ๋“ฑ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์ข…๋ฅ˜์˜ ํŒŒ์ผ์— ์Šคํ‚ค๋งˆ ์ฆ‰, ์นผ๋Ÿผ๋ช…(ํ—ค๋”)๊ฐ€ ๊ธฐ์ž…๋˜์–ด ์žˆ๋Š” ๊ฒฝ์šฐ๋„ ์žˆ์ง€๋งŒ ๊ทธ๋ ‡์ง€ ์•Š๊ณ  ์Šคํ‚ค๋งˆ๋งŒ ๋”ฐ๋กœ ๋‹ค๋ฅธ ํŒŒ์ผ์— ๋‹ด๊ฒจ์žˆ๊ฑฐ๋‚˜ ํ”„๋กœ๊ทธ๋ž˜๋จธ๊ฐ€ ์ง์ ‘ ๋ช…์‹œํ•ด์ฃผ์–ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ๋„ ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ๋ฅผ ๋Œ€๋น„ํ•ด ๋ฐ์ดํ„ฐ ํŒŒ์ผ ์™ธ๋ถ€์— ๋ช…์‹œ๋˜์–ด ์žˆ๋Š” ์Šคํ‚ค๋งˆ๋ฅผ ๊ฐ€์ ธ์™€ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ํ—ค๋”์— ์‚ฝ์ž…ํ•ด์ฃผ๋Š” ์ฝ”๋“œ๋ฅผ ์งœ๋ณด์ž. ์šฐ์„  ๋ฐ์ดํ„ฐํŒŒ์ผ์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

people.txt ํŒŒ์ผ ๋‚ด๋ถ€ ๋ฐ์ดํ„ฐ ๋ชจ์Šต

 

์ด์ œ txt ํŒŒ์ผ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋กœ๋“œํ•˜๊ณ  ๋™์‹œ์— ์Šคํ‚ค๋งˆ๋ฅผ ๊ณต๋ฐฑ์ด ํฌํ•จ๋œ ๋ฌธ์ž์—ด๋กœ ์ •์˜ํ•œ ํ›„ ์ด๋ฅผ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ์‚ฝ์ž…ํ•ด๋ณด์ž.

 

from pyspark.sql.types import *

# SparkContext ๊ฐ์ฒด ์ƒ์„ฑ
sc = spark.sparkContext

# txt file ์ฝ์–ด์˜ค๊ธฐ
lines = sc.textFile('./people.txt')
parts = lines.map(lambda l: l.split(','))

## Step 1 ## => value๋“ค ์ฒ˜๋ฆฌ
# ๊ฐ ๋ผ์ธ์„ tuple( , ) ํ˜•ํƒœ๋กœ convert ํ•ด์ฃผ๊ธฐ 
people = parts.map(lambda p: (p[0], p[1].strip())) # name์—์„œ ๊ณต๋ฐฑ strip

## Step 2 ## => Schema๋“ค ์ฒ˜๋ฆฌ
# ๋ฌธ์ž์—ด๋กœ ์ธ์ฝ”๋”ฉ๋œ ์Šคํ‚ค๋งˆ
schemaString = "name age"
# schemaString ์š”์†Œ๋ฅผ loop๋Œ๋ฉด์„œ StructField๋กœ ๋งŒ๋“ค๊ธฐ
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
# StructField ์—ฌ๋Ÿฌ๊ฐœ๊ฐ€ ์žˆ๋Š” ๋ฆฌ์ŠคํŠธ๋ฅผ StrucType์œผ๋กœ ๋งŒ๋“ค๊ธฐ!
schema = StructType(fields)

## Step 3 ## => value์™€ schema ํ™œ์šฉํ•ด DataFrame ์ƒ์„ฑ
# ์œ„์—์„œ ๋งŒ๋“  schema๋ฅผ RDD์˜ schema๋กœ ์ ์šฉ
schemaPeople = spark.createDataFrame(people, schema)

# View Table ์ƒ์„ฑํ•ด ์ฟผ๋ฆฌ ๋‚ ๋ ค์„œ ๋ฐ์ดํ„ฐ ์ถ”์ถœํ•ด๋ณด๊ธฐ
schemaPeople.createOrReplaceTempView('people')
results = spark.sql("SELECT * FROM people")
results.show()

๊ฒฐ๊ณผ ํ™”๋ฉด

 

๋ฐ˜์‘ํ˜•