๐ ๋ณธ ํฌ์คํ ์ Apache Spark 3.0.1 ๊ณต์ ๋ฌธ์๋ฅผ ์ง์ ํด์ํ์ฌ ํ์๊ฐ ์ดํดํ ๋ด์ฉ์ผ๋ก ์ฌ๊ตฌ์ฑํ์ต๋๋ค. ํน์ฌ๋ ์ปจํ ์ธ ์ค ํ๋ฆฐ ๋ด์ฉ์ด ์๋ค๋ฉด ์ ๊ทน์ ์ธ ํผ๋๋ฐฑ์ ํ์์ ๋๋ค! : )
์ด๋ฒ ํฌ์คํ ์์๋ ๋์ฉ๋ ๋ฐ์ดํฐ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํ ํตํฉ ๋ถ์ ์์ง์ธ Apache Spark์ Apache Spark์ ๊ธฐ๋ณธ ์๋ฃ ๊ตฌ์กฐ์ธ RDD ์๋ฃ๊ตฌ์กฐ์ ๋ํด ์๊ฐํ๋ ค ํ๋ค. Apache Spark๋ ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํ Spark SQL๊ณผ ๋จธ์ ๋ฌ๋์ ์ํ ML Lib, ๊ทธ๋ํ ์ฒ๋ฆฌ๋ฅผ ์ํ Graph X, ์ค์๊ฐ ์ฒ๋ฆฌ์ ๋ฐฉ๋ํ ์ฐ์ฐ์ ์ํ Structured Streaming ๋๊ตฌ๋ ์ ๊ณตํ๋ค. ๋ํ Apache Spark๋ ๋ณธ๋ Scala ์ธ์ด๋ก ๋ง๋ค์ด์ก์ง๋ง Java, R, Python ๋ฑ๊ณผ ๊ฐ์ ๋ค์ํ ํ๋ก๊ทธ๋๋ฐ ์ธ์ด API๋ฅผ ์ ๊ณตํ๋ค.
1. Apache Spark๋ Cluster mode ์ด๋ค!
Apache Spark๋ก ๊ตฌํํ๋ ์ฑ(Application)์ ํด๋ฌ์คํฐ์์ ๋ ๋ฆฝ์ ์ธ ํ๋ก์ธ์ค๋ก ์ด์ฉ๋๋ค. ๊ทธ๋ ๋ค๋ฉด ๊ตฌ์ฒด์ ์ผ๋ก ์ด๋ป๊ฒ ๋์ํ๋๊ฑธ๊น? ์ฐ์ ๊ณต์ ๋ฌธ์์ ์๊ฐ๋์ด ์๋ ๊ทธ๋ฆผ๋ถํฐ ์ดํด๋ณด์.
์ฐ์ Driver Program์ด๋, ์ํ์น ์คํํฌ๋ฅผ ์ฌ์ฉํด ํด๋ฌ์คํฐ๋ฅผ ์์ฑํ์ฌ ๋ง๋ SparkContext ๋ผ๋ ๊ฐ์ฒด๋ฅผ ์๋ฏธํ๋ค. ์ด SparkContext๋ ๋ช ๊ฐ์ง์ ํด๋ฌ์คํฐ ๋งค๋์ (์ธ๋ถ ์ข ๋ฅ์๋ ์คํํฌ ์์ฒด์ ๋ด์ฅ๋ ๊ฐ์ฅ ๋จ์ํ Standalone, ํ๋ก ๋งต๋ฆฌ๋์ค๋ฅผ ์ด์ํ ์ ์๋ Apache Mesos, ํ๋ก2์ ๋ฆฌ์์ค ๋งค๋์ ์ธ Hadoop YARN, ์ฑ ์คํ, ์ค์ผ์ผ๋ง์ ์๋ํํ๊ณ ์ฑ์ ์ปจํ ์ด๋ํ ํด ๊ด๋ฆฌํ ์ ์๋ Kubernetes๊ฐ ์๋ค.)์ ์ฐ๊ฒฐ์ด ๋๋ค. ์ด ํด๋ฌ์คํฐ ๋งค๋์ ๋ ์ฑ๋ค ๊ฐ์ ๋ฆฌ์์ค๋ฅผ ์ ๋ฌํด์ฃผ๋ ์ญํ ์ ํ๋ค.
ํด๋ฌ์คํฐ ๋งค๋์ ์ SparkContext๊ฐ ์ฐ๊ฒฐ๋๋ค๋ฉด ๊ฐ ํด๋ฌ์คํฐ ๋ด๋ถ์ Worker Node์์ Executor๋ฅผ ์ป๊ฒ ๋๋ค. ์ด Executor์ ์ฌ์ฉ์๊ฐ ๋ง๋ SparkContext(์ผ์ข ์ ์ฑ)๋ฅผ ์ํด ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ฑฐ๋ ์ฐ์ฐ์ ์คํํ๋ ํ๋ก์ธ์ค๋ฅผ ์๋ฏธํ๋ค.
์ด์ Spark๋ Python, R, Java์ ๊ฐ์ ํ๋ก๊ทธ๋๋ฐ ์ธ์ด๋ก ์ฝ๋๋ฅผ ์์ฑํ ํ์ผ(์ด ํ์ผ์ SparkContext๋ฅผ ์ด์ฉํด ๋ง๋ ํ์ผ์ด๋ค)์ ๊ฐ ํด๋ฌ์คํฐ ๋ด๋ถ์ ์๋ Executor๋ค์๊ฒ ์ ๋ฌํด์ค๋ค. ๋ง์ง๋ง์ผ๋ก ์ฝ๋๋ด๋ถ์ SparkContext๊ฐ Executor๋ค์๊ฒ ์์ฑ๋ ์ฝ๋์ task๋ฅผ ์ํํ๊ธฐ ์ํด ์ ๋ฌํ๋ค.
2. Pyspark ์์ํ๊ณ SparkContext ์์ฑํ๊ธฐ
Apache Spark์ ๊ธฐ๋ณธ์ ์ธ ์๋ฃ๊ตฌ์กฐ๋ RDD ์๋ฃ๊ตฌ์กฐ์ด๋ค. RDD๋ Resilient Distributed Dataset์ผ๋ก 'Resilient'์ ์ฌ์ ์ ์๋ฏธ๋ 'ํ๋ ฅ ์๋' ์ด๋ผ๋ ์๋ฏธ์ด๋ค. ์ฌ๊ธฐ์๋ RDD ์๋ฃ๊ตฌ์กฐ๊ฐ ์ํ์น ์คํํฌ์ ํด๋ฌ์คํฐ ๋ด๋ถ์ ๋ ธ๋๋ค ๊ฐ์ ๊ณต์ ๋์ด ๋ณ๋ ฌ์ ์ผ๋ก ์ฐ์ฐ๋ ์ ์๋ ์๋ฃ๊ตฌ์กฐ์ด๊ธฐ ๋๋ฌธ์ ๊ทธ๋ ๊ฒ ๋ถ๋ฆฐ๋ค.
RDD ์๋ฃ๊ตฌ์กฐ๋ฅผ ์ค์ตํด๋ณด๊ธฐ ์ ์ ํ์๋ Python ๊ธฐ๋ฐ์ธ Pyspark๋ฅผ ์ฌ์ฉํ์ผ๋ฏ๋ก Pyspark๋ฅผ ์ค์นํ๊ณ ์คํ ์ Jupyter Notebookbook์ผ๋ก ์คํ๋๋๋ก ํ๋ ๋ฐฉ๋ฒ์ ์ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ์.
3. RDD ์๋ฃ๊ตฌ์กฐ ๋ง๋ค๊ธฐ
RDD๋ฅผ ๋ง๋ค๊ธฐ ์ํด์๋ parallelize ๋ฉ์๋๋ฅผ ์ด์ฉํด ์ฑ(Driver program) ๋ด๋ถ์ ์ง์ ๋ง๋ค์ด ์ค๋ค.
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
res = distData.reduce(lambda a, b: a + b)
print(res)
์ฌ๊ธฐ์ parallelize() ๋ฉ์๋์์ partitions(=slices) ์ด๋ผ๋ optional ์ธ์๊ฐ ์๋ค. ์คํํฌ์ ํด๋ฌ์คํฐ๋ ๊ฐ ํํฐ์ ์์ ํ๋์ ์์ ์ ์ํํ๋ค. ๋ณดํต์ ํด๋ฌ์คํฐ์์ ๊ฐ CPU ๋น 2~4๊ฐ์ ํํฐ์ ์ ๊ฐ์ง๊ณ ์ํํ๋ค. ์ผ๋ฐ์ ์ผ๋ก ์คํํฌ๋ ์๋์ ์ผ๋ก ํํฐ์ ์ ๊ฐ์๋ฅผ ์ ์ ํ๊ฒ ์ธํ ํด์ฃผ์ง๋ง ์ฌ์ฉ์๊ฐ ์ง์ ์๋์ ์ผ๋ก ํ๋ผ๋ฏธํฐ๋ฅผ ์ง์ด ๋ฃ์ด ํํฐ์ ๊ฐ์๋ฅผ ์ปค์คํฐ๋ง์ด์ง ํด์ค ์ ์๋ค.
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, 10) # 10๊ฐ์ ํํฐ์
์ผ๋ก ์ํ!
res = distData.reduce(lambda a, b: a + b)
print(res)
4. ์ธ๋ถ ๋ฐ์ดํฐ์ ์ ๊ฐ์ ธ์ค๊ธฐ
pyspark๋ ๋ก์ปฌ์ ์๋ ํ์ผ, HDFS, ์นด์ฐ๋๋ผ HBase, ์๋ง์กด S3 ๋ฑ๊ณผ ๊ฐ์ ํ๋ก์ผ๋ก ์ง์๋๋ ์คํ ๋ฆฌ์ง๋ก๋ถํฐ ๋ถ์ฐ ๋ฐ์ดํฐ์ ์ ๋ถ๋ฌ์ฌ ์ ์๋ค. ๋ค์์ txt ํ์ฅ์ ํ์ผ๋ก ๋์ด์๋ ํ์ผ์ ๋ถ๋ฌ์ค๋ ์ฝ๋์ด๋ค.
distFile = sc.textFile('./people.txt')
# map๊ณผ reduce์ด์ฉํด txtํ์ผ ๋ด๋ถ ๊ฐ ๋ผ์ธ๋ง๋ค length ํฉ์ฐ ๊ฐ๋ฅ
distFile.map(lambda l: len(l)).reduce(lambda a, b: a + b)
textfile() ๋ฉ์๋ ์ด์ธ์ wholeTextFiles() ๋ ์๋๋ฐ, ์ด๋ ์ฉ๋์ด ์์ ์ฌ๋ฌ ํ ์คํธ ํ์ผ๋ค์ ํฌํจํ๋ ๋๋ ํ ๋ฆฌ๋ฅผ ์ฝ์ด (filename, content) ์์ผ๋ก ๊ฒฐ๊ณผ๊ฐ์ ๋ฐํํ ์ ์๋ค. ํ์ง๋ง ์ด๋ฌํ ํ์ผ์ ์ฝ๋ ๋ฐฉ๋ฒ์ ์ต๊ทผ์ Spark SQL์ read/write ๋ฉ์๋์ ์ํด ๋์ฒด๋๋ ๊ฒฝํฅ์ด ์๋ค.
5. RDD Operations(RDD ์ฐ์ฐ)
RDD ์ฐ์ฐ์๋ 2๊ฐ์ง ์ข ๋ฅ๊ฐ ์๋ค.
- Transformations : ๊ธฐ์กด์ ๋ฐ์ดํฐ์์ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ๋ง๋ค ๋ ์ฌ์ฉํ๋ค. ์๋ก๋ map ํจ์๊ฐ ์๋ค.
- Actions : ๋ฐ์ดํฐ์ ์์ ํน์ ํ ์ฐ์ฐ์ ์ํ ํ Driver program(์ฑ)์ ์ฐ์ฐ ๊ฒฐ๊ณผ๊ฐ์ ์ ๋ฌํ๋ค. ์๋ก๋ reduce ํจ์๊ฐ ์๋ค.
์ด ๋, Transformations์ Spark ๋ด๋ถ์์ ๋๋ฆฌ๋ค. ์๋ํ๋ฉด Transformations ์์ฒด๋ ์ฐ์ฐ์ ๋ฐ๋ก ์ํํ์ง ์๊ธฐ ๋๋ฌธ์ด๋ค. ํ์ง๋ง ์ด๋ค Transformations์ ์ทจํ ์ง ๊ธฐ์ต(cache)ํ๋ค. ๋ฐ๋ฉด์ Actions์ด ์ํ๋์ด์ผ ์ค์ ์ ์ธ ์ฐ์ฐ์ ์์ํ๋ค. ์ด๋ฌํ ํ๋ก์ธ์ค๋ ํจ์จ์ ์ด๋ค. ์๋ํ๋ฉด map์ด๋ผ๋ Transformations์ reduce ๋ผ๋ Actions๋ฅผ ์ํํ๊ธฐ ์์ํ ๋์ผ ์ํ๋๋ฉฐ ๊ฒฐ๊ตญ Actions์ ๊ฒฐ๊ณผ๊ฐ๋ง ์ฑ์ ๋ฐํํ๊ฒ ๋๋ค. ๊ทธ๋์ Transformations์ด ์ฐจ์งํ๋ ํฐ ๋ฉ๋ชจ๋ฆฌ ์ ์ฝ์ด ๊ฐ๋ฅํ๋ค. ์ฆ, Transformations์ด ๋ฐํํ ์๋ก์ด ํฐ ๋ฐ์ดํฐ์ ์ ๋ฉ๋ชจ๋ฆฌ์ ์ฌ๋ฆฌ์ง ์๋๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก Transformations๋ RDD๋ Actions๋ฅผ ์ํํ ๋๋ง๋ค ์ฌ ์ฐ์ฐ์ด ๊ฐ๋ฅํ๋ค. ํ์ง๋ง persist() ๋๋ cache() ๋ฉ์๋๋ฅผ ์ด์ฉํด ๋ฉ๋ชจ๋ฆฌ์ RDD ์๋ฃ๋ฅผ ์ ์ฅ์ํฌ ์ ์๋ค. ์ด๋ฌํ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ ์ด์ ๋ ๋ค์์ ์ฟผ๋ฆฌํ ์ RDD์ ๋ ๋นจ๋ฆฌ ์ ๊ทผํ๊ธฐ ์ํจ์ด๋ค.
# lines๋ ํ์ฌ ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋๋์ง ์๊ณ ํด๋น ํ์ผ์ ๊ฐ๋ฅดํค๋ ํฌ์ธํฐ์
lines = sc.textFile('people.txt')
# map์ด๋ผ๋ ๋ณํ์ ์ทจํ ํ์ ๊ฒฐ๊ณผ๊ฐ(์ฐ์ฐ๋์ง ์์ ์ํ)
lineLengths = lines.map(lambda s: len(s))
# reduce๋ผ๋ ์ก์
์ ์ทจํจ์ผ๋ก์จ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ํ๋ฉด์ ์์
์ฐ์ฐ์ ์ํ. ๊ฒฐ๊ณผ๊ฐ๋ง driver program์๊ฒ ๋ฐํ!
totalLength = lineLengths.reduce(lambda a, b: a + b)
# ๋ง์ฝ ๋ณํ์ ์ทจํ linesLengths๋ฅผ ์ถํ์๋ ์ฌ์ฉํ ๊ฒ์ด๋ผ๋ฉด persist๋ก ์ ์ฅ
lineLengths.persist()
'Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Infra] ๋ฐ์ดํฐ ์ธํ๋ผ ๊ตฌ์กฐ์ Sources (0) | 2021.04.23 |
---|---|
[PySpark] ์ปจํ ์ธ ๊ธฐ๋ฐ ์ํ ์ถ์ฒ ์์คํ ๋ง๋ค์ด๋ณด๊ธฐ (18) | 2021.02.15 |
[PySpark] PySpark๋ก Regression ๋ชจ๋ธ ๋ง๋ค๊ธฐ (0) | 2021.02.04 |
[PySpark] ํ์ดํ๋ ๋ฐ์ดํฐ๋ก ๋ถ๋ฅ ๋ชจ๋ธ ๋ง๋ค๊ธฐ (2) | 2021.02.03 |
[PySpark] Spark SQL ํํ ๋ฆฌ์ผ (0) | 2021.02.01 |