๐ ๋ณธ ํฌ์คํ ์ Apache Spark 3.0.1 ๊ณต์ ๋ฌธ์๋ฅผ ์ง์ ํด์ํ์ฌ ํ์๊ฐ ์ดํดํ ๋ด์ฉ์ผ๋ก ์ฌ๊ตฌ์ฑํ์ต๋๋ค. ํน์ฌ๋ ์ปจํ ์ธ ์ค ํ๋ฆฐ ๋ด์ฉ์ด ์๋ค๋ฉด ์ ๊ทน์ ์ธ ํผ๋๋ฐฑ์ ํ์์ ๋๋ค! : )
์ด๋ฒ ํฌ์คํ ์์ ๋ค๋ฃจ์ด ๋ณผ ์ปจํ ์ธ ๋ ๋ฐ๋ก Spark๋ก SQL์ ์ด์ฉํ ์ ์๋ Spark SQL์ ๋ํ ๋ด์ฉ์ด๋ค. Spark๋ ์ด์ ํฌ์คํ ์์๋ ์ธ๊ธํ๋ค์ํผ ๊ธฐ๋ณธ์ ์ผ๋ก RDD ์๋ฃ๊ตฌ์กฐ๋ฅผ ๊ฐ๊ณ ์์ง๋ง Python์ Pandas ๋ผ์ด๋ธ๋ฌ๋ฆฌ์์ ์ ๊ณตํ๋ DataFrame๊ณผ ๋น์ทํ ๊ตฌ์กฐ์ DataFrame์ ์ง์ํ๊ณ ์๋ค.(์ด๋ฆ๋ DataFrame ์ผ๋ก ๋์ผํ๋ค.) ์ถํ์ ๋ค๋ฃจ๊ฒ ์ง๋ง Spark๋ Spark์ DataFrame์ Pandas์ DataFrame ํํ๋ก ๋ฐ๊พธ์ด์ฃผ๋ ๊ฒ๋ ์ง์ํ๋ค.
๊ทธ๋ผ ์ด์ Spark SQL์ ํตํด ๊ฐ๋จํ ์ค์ต์ ํด๋ณด์.
1. ๊ฐ์ฅ ๋จผ์ ํ ์ผ! SparkSession !
์ด์ ํฌ์คํ ์์๋ ๋งํ๋ค์ํผ ๊ฐ์ฅ ๋จผ์ ํด์ผํ ๊ฒ์ SparkContext๋ผ๋ ์คํํฌ ๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด ์ฃผ์ด์ผ ํ๋ค. SparkContext๋ฅผ ๋ง๋ค์ด ์ฃผ๊ธฐ ์ํด์ ์ฐ์ SparkSession์ ๋ง๋ค์ด ์ฃผ์. ๊ทธ๋ฆฌ๊ณ json ํ์ผ ํ์์ผ๋ก ๋์ด ์๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ ๋ฐ์ดํฐ์ ์คํค๋ง๋ฅผ ์ถ๋ ฅ์์ผ๋ณด์.
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('Python Spark SQL basic example')\
.config('spark.some.config.option', 'some-value')\
.getOrCreate()
### Create json file using spark
# sparkContext๋ก ๊ฐ์ฒด ์์ฑ
sc = spark.sparkContext
# json ํ์ผ ์ฝ์ด๋ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
peopleDF = spark.read.json(path)
# printSchema()๋ก jsonํ์ผ์ ์คํค๋ง ํํ ๋ณผ์ ์์
peopleDF.printSchema()
์ฐธ๊ณ ๋ก printSchema()
๋ ๋ฐ์ดํฐ๋ฅผ ์คํค๋ง ํํ๋ก ๋ณด์ฌ์ฃผ๋ ๊ฒ์ธ๋ฐ ๊ฒฐ๊ณผ ํ๋ฉด์ ๋ค์๊ณผ ๊ฐ๋ค.
2. SQL๋ก DataFrame ํํ๋ก ๋ฐ์ดํฐ ์ถ๋ ฅํด๋ณด๊ธฐ
๋ค์์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ธฐ๋ฅ์ค 'View' ๋ผ๋ ๊ฐ์์ ํ
์ด๋ธ์ ๋ง๋ค์ด ๋ฐ์ดํฐํ๋ ์์ ๋ง๋ค ์ ์๋ค.(๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ํด ๋ฐฐ์ด ์ ์ด ์๋ค๋ฉด View ๊ธฐ๋ฅ์ ๋ํด์ ์ต์ํ ๊ฒ์ด๋ค. ํ์๋ ์ ๊ณต ์์
๋ ๋ฐฐ์ ๋ ๊ฐ๋ฌผ๊ฐ๋ฌผํ ๊ธฐ์ต์ด...)
๊ฐ์์ ํ
์ด๋ธ์ ๋ง๋ค๊ธฐ ์ํด์๋ createOrReplaceTempView("๊ฐ์์ํ
์ด๋ธ ์ด๋ฆ")
์ ์ํํ๋ฉด ๋๋ค.
# ๋ฐ์ดํฐํ๋ ์์ ์ฌ์ฉํ๋ ์์์ view(๊ฐ์์ ํ
์ด๋ธ) ์์ฑ
peopleDF.createOrReplaceTempView("people")
# spark์์ ์ ๊ณตํ๋ sql ๋ฉ์๋๋ฅผ ์ด์ฉํด ์ฟผ๋ฆฌ ๋ ๋ฆฌ๊ธฐ
# ์ฟผ๋ฆฌ๋ฌธ์์ people ํ
์ด๋ธ์ ์์์ ๋ง๋ค์๋ view ํ
์ด๋ธ์!
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
์ฐธ๊ณ ๋ก ๋ง๋ค์ด์ง ๋ฐ์ดํฐํ๋ ์์ ๋ณด๊ธฐ ์ํด์๋ show()
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
View ํ
์ด๋ธ์ ์์ฑํ๋ ๋ฉ์๋ ์ค Global Temporary View๋ผ๋ ๊ฒ๋ ์๋ค. ์์์ ์ฌ์ฉํ๋ ์ผ๋ฐ์ ์ธ Temporary View ๊ฐ์ ํ
์ด๋ธ์ SaprkSession์ด ์ข
๋ฃ๋๋ฉด ์ญ์ ๋๋ค. ํ์ง๋ง ๋ชจ๋ SparkSession๋ค ๊ฐ์ View ๊ฐ์ ํ
์ด๋ธ์ ๊ณต์ ํ๊ฒ ํ๊ณ ์ถ๋ค๋ฉด Global Temporary View๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค. ํ ๊ฐ์ง ์ฃผ์ํด์ผ ํ ์ ์ด ์๋๋ฐ Global Temporary View ํ
์ด๋ธ์ ๋ง๋ค์ด์ฃผ๊ณ SQL๋ก ๋ฐ์ดํฐ๋ฅผ ์ถ์ถ ํ ๋ ํ
์ด๋ธ ์ด๋ฆ ์์ global\_temp
๋ผ๋ ํค์๋๋ฅผ ๋ถ์ฌ์ค์ผ ํจ์ ์์ง๋ง์!
# json ํ์ผ ์ฝ์ด๋ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)
# Global Temporary View ์์ฑ
df.createOrReplaceGlobalTempView('people')
# 'global_temp' ๋ผ๋ ํค์๋ ๊ผญ ๋ถ์ฌ์ฃผ์!
sqlDF = spark.sql('SELECT * FROM global_temp.people')
sqlDF.show()
์ด๊ฑด ๋ถ๊ฐ์ ์ธ ๋ด์ฉ์ด์ง๋ง json ํ์์ ๋ฐ์ดํฐ๋ฅผ RDD๋ก ๋ง๋ค์ด ์ฃผ๊ณ ๋ฐ์ดํฐํ๋ ์์ผ๋ก ์ฝ์ด์ค๋ ๋ฐฉ์๋ ์กด์ฌํ๋ค.
# ๋ํ ๋ฐ์ดํฐํ๋ ์์ RDD[String] ์๋ฃ๊ตฌ์กฐ๋ฅผ ์ด์ฉํด์ json ๋ฐ์ดํฐ์
์ ๋ฐ์ดํฐํ๋ ์์ผ๋ก ๋ง๋ค ์ ์์
jsonStrings = ['{"name": "Yin", "address":{"city":"Columbus", "state":"Ohio"}}']
# json -> RDDํ์์ผ๋ก ๋ง๋ค๊ธฐ
otherPeopleRDD = sc.parallelize(jsonStrings)
# jsonํ์ผ ์ฝ์ด์ค๊ธฐ
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
3. Spark์ DataFrame์ ์นผ๋ผ์ ์ ๊ทผํด๋ณด์!
DataFrame Operations ๋ผ๊ณ ๋ ํ๋ฉด Untyped Dataset Operations ๋ผ๊ณ ๋ ํ๋ค. Spark ๋ด๋ถ์์ Dataset ๊ณผ DataFrame์ ์ฐจ์ด๋ผ๊ณ ํ๋ค๋ฉด Typed/Untyped ์ฐจ์ด๋ผ๊ณ ํ๋ค. Typed dataset์ dataset์ผ๋ก ๋ฐ์์ค๋ ๋ฐ์ดํฐ์ ํํ๋ฅผ ๋ฏธ๋ฆฌ ์ ์ํด ๋์ ๊ฒ์ธ ๋ฐ๋ฉด Untyped dataset์ ํ๋ก๊ทธ๋จ์ด ๋ฐ์ดํฐ์ ํํ๋ฅผ ์ถ์ธกํด์ ๊ฐ์ ธ์ค๋ ๊ฒ์ ์๋ฏธํ๋ค. ์ด ๋ ๊ฐ๋ ์๋ฌ๋ฅผ ์ก์๋ด๋ ์๊ฐ ์ธก๋ฉด์์๋ ์ฐจ์ด์ ์ด ์กด์ฌํ๋๋ฐ ์์ธํ ๋ด์ฉ์ ์ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ์.
ํด๋น ํฌ์คํ
์์๋ Dataset์ด ์๋ DataFrame์ ๋ํ ์ฝ๋์์ ์์๋์. DataFrame์ ์นผ๋ผ์ ์ ๊ทผํ๊ธฐ ์ํ ๋ฐฉ๋ฒ์ ํฌ๊ฒ 2๊ฐ์ง๊ฐ ์กด์ฌํ๋ค. df.column
๊ณผ df\['column'\]
์ด ์๋๋ฐ ๋ ์ค ํ์์ ๋ฐฉ๋ฒ์ ๋ ๋ง์ด ์ฌ์ฉํ๋ค๊ณ ํ๋ค.(์ด๊ฒ๋ Pandas์ ๋น์ทํ ๋ฏ ํ๋ค!)
# json ํ์ผ ์ฝ์ด๋ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)
# name ์นผ๋ผ select ํด์ ์ดํด๋ณด๊ธฐ
df.select('name').show()
์ถ๊ฐ์ ์ผ๋ก 2๊ฐ ์ด์์ ์นผ๋ผ์ ์ถ์ถํ๋ฉด์ ๊ธฐ์กด์ ์นผ๋ผ์ ์ฐ์ฐ์ ๊ฐํด ํ์๋ณ์๋ฅผ ์์ฑํ์ฌ ์ถ์ถํ๋ ๊ฒ๋ ๊ฐ๋ฅํ๋ค.
# json ํ์ผ ์ฝ์ด๋ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)
# name ์นผ๋ผ select ํด์ ์ดํด๋ณด๊ธฐ
df.select(df['name'], df['age']+1).show()
๋ค์์ ํน์ ์กฐ๊ฑด์ ๋ง์กฑํ๋ ๋ฐ์ดํฐ๋ง ์ถ์ถํ๋ filter()
์ ํน์ ์นผ๋ผ์ผ๋ก ๊ทธ๋ฃนํ์ ์ํํด์ฃผ๋ groupBy()
๋ฉ์๋์ ๋ํ ์์ ์ฝ๋์ด๋ค.
# json ํ์ผ ์ฝ์ด๋ค์ด๊ธฐ
path = '/Users/younghun/Desktop/gitrepo/TIL/pyspark/people.json'
df = spark.read.json(path)
# age๊ฐ 20๋ณด๋ค ํฐ ๋ฐ์ดํฐ๋ง ์ถ์ถ
df.filter(df['age'] > 20).show()
# age ์นผ๋ผ์ผ๋ก ๊ทธ๋ฃนํ ํ๊ณ ๋ฐ์ดํฐ์ ๊ฐ์๋ฅผ ์ง๊ณํด์ค
df.groupBy('age').count().show()
4. DataFrame์ Schema๋ฅผ ํ๋ก๊ทธ๋๋ฐ์ค๋ฝ๊ฒ ๋ช ์ํด๋ณด์!
๋ก๋ํ ๋ฐ์ดํฐ csv
, txt
, excel
๋ฑ ์ฌ๋ฌ๊ฐ์ง ์ข
๋ฅ์ ํ์ผ์ ์คํค๋ง ์ฆ, ์นผ๋ผ๋ช
(ํค๋)๊ฐ ๊ธฐ์
๋์ด ์๋ ๊ฒฝ์ฐ๋ ์์ง๋ง ๊ทธ๋ ์ง ์๊ณ ์คํค๋ง๋ง ๋ฐ๋ก ๋ค๋ฅธ ํ์ผ์ ๋ด๊ฒจ์๊ฑฐ๋ ํ๋ก๊ทธ๋๋จธ๊ฐ ์ง์ ๋ช
์ํด์ฃผ์ด์ผ ํ๋ ๊ฒฝ์ฐ๋ ์๋ค. ์ด๋ฌํ ๊ฒฝ์ฐ๋ฅผ ๋๋นํด ๋ฐ์ดํฐ ํ์ผ ์ธ๋ถ์ ๋ช
์๋์ด ์๋ ์คํค๋ง๋ฅผ ๊ฐ์ ธ์ ๋ฐ์ดํฐํ๋ ์์ ํค๋์ ์ฝ์
ํด์ฃผ๋ ์ฝ๋๋ฅผ ์ง๋ณด์. ์ฐ์ ๋ฐ์ดํฐํ์ผ์ ๋ค์๊ณผ ๊ฐ๋ค.
์ด์ txt
ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๊ณ ๋์์ ์คํค๋ง๋ฅผ ๊ณต๋ฐฑ์ด ํฌํจ๋ ๋ฌธ์์ด๋ก ์ ์ํ ํ ์ด๋ฅผ ๋ฐ์ดํฐํ๋ ์์ ์ฝ์
ํด๋ณด์.
from pyspark.sql.types import *
# SparkContext ๊ฐ์ฒด ์์ฑ
sc = spark.sparkContext
# txt file ์ฝ์ด์ค๊ธฐ
lines = sc.textFile('./people.txt')
parts = lines.map(lambda l: l.split(','))
## Step 1 ## => value๋ค ์ฒ๋ฆฌ
# ๊ฐ ๋ผ์ธ์ tuple( , ) ํํ๋ก convert ํด์ฃผ๊ธฐ
people = parts.map(lambda p: (p[0], p[1].strip())) # name์์ ๊ณต๋ฐฑ strip
## Step 2 ## => Schema๋ค ์ฒ๋ฆฌ
# ๋ฌธ์์ด๋ก ์ธ์ฝ๋ฉ๋ ์คํค๋ง
schemaString = "name age"
# schemaString ์์๋ฅผ loop๋๋ฉด์ StructField๋ก ๋ง๋ค๊ธฐ
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
# StructField ์ฌ๋ฌ๊ฐ๊ฐ ์๋ ๋ฆฌ์คํธ๋ฅผ StrucType์ผ๋ก ๋ง๋ค๊ธฐ!
schema = StructType(fields)
## Step 3 ## => value์ schema ํ์ฉํด DataFrame ์์ฑ
# ์์์ ๋ง๋ schema๋ฅผ RDD์ schema๋ก ์ ์ฉ
schemaPeople = spark.createDataFrame(people, schema)
# View Table ์์ฑํด ์ฟผ๋ฆฌ ๋ ๋ ค์ ๋ฐ์ดํฐ ์ถ์ถํด๋ณด๊ธฐ
schemaPeople.createOrReplaceTempView('people')
results = spark.sql("SELECT * FROM people")
results.show()
'Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ ๊ตฌ์กฐ์ Sources (0) | 2021.04.23 |
---|---|
[PySpark] ์ปจํ ์ธ ๊ธฐ๋ฐ ์ํ ์ถ์ฒ ์์คํ ๋ง๋ค์ด๋ณด๊ธฐ (18) | 2021.02.15 |
[PySpark] PySpark๋ก Regression ๋ชจ๋ธ ๋ง๋ค๊ธฐ (0) | 2021.02.04 |
[PySpark] ํ์ดํ๋ ๋ฐ์ดํฐ๋ก ๋ถ๋ฅ ๋ชจ๋ธ ๋ง๋ค๊ธฐ (2) | 2021.02.03 |
[PySpark] Apache Spark ์ RDD ์๋ฃ๊ตฌ์กฐ (0) | 2021.01.30 |