๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Apache Spark

[PySpark] Apache Spark ์™€ RDD ์ž๋ฃŒ๊ตฌ์กฐ

๋ฐ˜์‘ํ˜•

๐Ÿ”Š ๋ณธ ํฌ์ŠคํŒ…์€ Apache Spark 3.0.1 ๊ณต์‹ ๋ฌธ์„œ๋ฅผ ์ง์ ‘ ํ•ด์„ํ•˜์—ฌ ํ•„์ž๊ฐ€ ์ดํ•ดํ•œ ๋‚ด์šฉ์œผ๋กœ ์žฌ๊ตฌ์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค. ํ˜น์—ฌ๋‚˜ ์ปจํ…์ธ  ์ค‘ ํ‹€๋ฆฐ ๋‚ด์šฉ์ด ์žˆ๋‹ค๋ฉด ์ ๊ทน์ ์ธ ํ”ผ๋“œ๋ฐฑ์€ ํ™˜์˜์ž…๋‹ˆ๋‹ค! : )

 

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ํ†ตํ•ฉ ๋ถ„์„ ์—”์ง„์ธ Apache Spark์™€ Apache Spark์˜ ๊ธฐ๋ณธ ์ž๋ฃŒ ๊ตฌ์กฐ์ธ RDD ์ž๋ฃŒ๊ตฌ์กฐ์— ๋Œ€ํ•ด ์†Œ๊ฐœํ•˜๋ ค ํ•œ๋‹ค. Apache Spark๋Š” ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ Spark SQL๊ณผ ๋จธ์‹ ๋Ÿฌ๋‹์„ ์œ„ํ•œ ML Lib, ๊ทธ๋ž˜ํ”„ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ Graph X, ์‹ค์‹œ๊ฐ„ ์ฒ˜๋ฆฌ์™€ ๋ฐฉ๋Œ€ํ•œ ์—ฐ์‚ฐ์„ ์œ„ํ•œ Structured Streaming ๋„๊ตฌ๋„ ์ œ๊ณตํ•œ๋‹ค. ๋˜ํ•œ Apache Spark๋Š” ๋ณธ๋ž˜ Scala ์–ธ์–ด๋กœ ๋งŒ๋“ค์–ด์กŒ์ง€๋งŒ Java, R, Python ๋“ฑ๊ณผ ๊ฐ™์€ ๋‹ค์–‘ํ•œ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์–ธ์–ด API๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

 

Apache Spark๊ฐ€ ์ œ๊ณตํ•˜๋Š” 4๊ฐ€์ง€ ๊ธฐ๋Šฅ

1. Apache Spark๋Š” Cluster mode ์ด๋‹ค!

Apache Spark๋กœ ๊ตฌํ˜„ํ•˜๋Š”  ์•ฑ(Application)์€ ํด๋Ÿฌ์Šคํ„ฐ์—์„œ ๋…๋ฆฝ์ ์ธ ํ”„๋กœ์„ธ์Šค๋กœ ์šด์šฉ๋œ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด ๊ตฌ์ฒด์ ์œผ๋กœ ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•˜๋Š”๊ฑธ๊นŒ? ์šฐ์„  ๊ณต์‹ ๋ฌธ์„œ์— ์†Œ๊ฐœ๋˜์–ด ์žˆ๋Š” ๊ทธ๋ฆผ๋ถ€ํ„ฐ ์‚ดํŽด๋ณด์ž.

 

Apache Spark๋Š” ํด๋Ÿฌ์Šคํ„ฐ๋กœ ์šด์šฉ๋œ๋‹ค.

 

์šฐ์„  Driver Program์ด๋ž€, ์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ๋ฅผ ์‚ฌ์šฉํ•ด ํด๋Ÿฌ์Šคํ„ฐ๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ๋งŒ๋“  SparkContext ๋ผ๋Š” ๊ฐ์ฒด๋ฅผ ์˜๋ฏธํ•œ๋‹ค. ์ด SparkContext๋Š” ๋ช‡ ๊ฐ€์ง€์˜ ํด๋Ÿฌ์Šคํ„ฐ ๋งค๋‹ˆ์ €(์„ธ๋ถ€ ์ข…๋ฅ˜์—๋Š” ์ŠคํŒŒํฌ ์ž์ฒด์— ๋‚ด์žฅ๋œ ๊ฐ€์žฅ ๋‹จ์ˆœํ•œ Standalone, ํ•˜๋‘ก ๋งต๋ฆฌ๋“€์Šค๋ฅผ ์šด์˜ํ•  ์ˆ˜ ์žˆ๋Š” Apache Mesos, ํ•˜๋‘ก2์˜ ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €์ธ Hadoop YARN, ์•ฑ ์‹คํ–‰, ์Šค์ผ€์ผ๋ง์„ ์ž๋™ํ™”ํ•˜๊ณ  ์•ฑ์„ ์ปจํ…Œ์ด๋„ˆํ™” ํ•ด ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” Kubernetes๊ฐ€ ์žˆ๋‹ค.)์™€ ์—ฐ๊ฒฐ์ด ๋œ๋‹ค. ์ด ํด๋Ÿฌ์Šคํ„ฐ ๋งค๋‹ˆ์ €๋Š” ์•ฑ๋“ค ๊ฐ„์— ๋ฆฌ์†Œ์Šค๋ฅผ ์ „๋‹ฌํ•ด์ฃผ๋Š” ์—ญํ• ์„ ํ•œ๋‹ค. 

 

ํด๋Ÿฌ์Šคํ„ฐ ๋งค๋‹ˆ์ €์™€ SparkContext๊ฐ€ ์—ฐ๊ฒฐ๋œ๋‹ค๋ฉด ๊ฐ ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด๋ถ€์˜ Worker Node์—์„œ Executor๋ฅผ ์–ป๊ฒŒ ๋œ๋‹ค. ์ด Executor์€ ์‚ฌ์šฉ์ž๊ฐ€ ๋งŒ๋“  SparkContext(์ผ์ข…์˜ ์•ฑ)๋ฅผ ์œ„ํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๊ฑฐ๋‚˜ ์—ฐ์‚ฐ์„ ์‹คํ–‰ํ•˜๋Š” ํ”„๋กœ์„ธ์Šค๋ฅผ ์˜๋ฏธํ•œ๋‹ค. 

 

์ด์ œ Spark๋Š” Python, R, Java์™€ ๊ฐ™์€ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์–ธ์–ด๋กœ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•œ ํŒŒ์ผ(์ด ํŒŒ์ผ์€ SparkContext๋ฅผ ์ด์šฉํ•ด ๋งŒ๋“  ํŒŒ์ผ์ด๋‹ค)์„ ๊ฐ ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด๋ถ€์— ์žˆ๋Š” Executor๋“ค์—๊ฒŒ ์ „๋‹ฌํ•ด์ค€๋‹ค. ๋งˆ์ง€๋ง‰์œผ๋กœ ์ฝ”๋“œ๋‚ด๋ถ€์˜ SparkContext๊ฐ€ Executor๋“ค์—๊ฒŒ ์ž‘์„ฑ๋œ ์ฝ”๋“œ์˜ task๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด ์ „๋‹ฌํ•œ๋‹ค.

2. Pyspark ์‹œ์ž‘ํ•˜๊ณ  SparkContext ์ƒ์„ฑํ•˜๊ธฐ

Apache Spark์˜ ๊ธฐ๋ณธ์ ์ธ ์ž๋ฃŒ๊ตฌ์กฐ๋Š” RDD ์ž๋ฃŒ๊ตฌ์กฐ์ด๋‹ค. RDD๋Š” Resilient Distributed Dataset์œผ๋กœ 'Resilient'์˜ ์‚ฌ์ „์  ์˜๋ฏธ๋Š” 'ํƒ„๋ ฅ ์žˆ๋Š”' ์ด๋ผ๋Š” ์˜๋ฏธ์ด๋‹ค. ์—ฌ๊ธฐ์„œ๋Š” RDD ์ž๋ฃŒ๊ตฌ์กฐ๊ฐ€ ์•„ํŒŒ์น˜ ์ŠคํŒŒํฌ์˜ ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด๋ถ€์˜ ๋…ธ๋“œ๋“ค ๊ฐ„์— ๊ณต์œ ๋˜์–ด ๋ณ‘๋ ฌ์ ์œผ๋กœ ์—ฐ์‚ฐ๋  ์ˆ˜ ์žˆ๋Š” ์ž๋ฃŒ๊ตฌ์กฐ์ด๊ธฐ ๋•Œ๋ฌธ์— ๊ทธ๋ ‡๊ฒŒ ๋ถˆ๋ฆฐ๋‹ค.

 

RDD ์ž๋ฃŒ๊ตฌ์กฐ๋ฅผ ์‹ค์Šตํ•ด๋ณด๊ธฐ ์ „์— ํ•„์ž๋Š” Python ๊ธฐ๋ฐ˜์ธ Pyspark๋ฅผ ์‚ฌ์šฉํ–ˆ์œผ๋ฏ€๋กœ Pyspark๋ฅผ ์„ค์น˜ํ•˜๊ณ  ์‹คํ–‰ ์‹œ Jupyter Notebookbook์œผ๋กœ ์‹คํ–‰๋˜๋„๋ก ํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ์—ฌ๊ธฐ๋ฅผ ์ฐธ๊ณ ํ•˜์ž.

3. RDD ์ž๋ฃŒ๊ตฌ์กฐ ๋งŒ๋“ค๊ธฐ

RDD๋ฅผ ๋งŒ๋“ค๊ธฐ ์œ„ํ•ด์„œ๋Š” parallelize ๋ฉ”์†Œ๋“œ๋ฅผ ์ด์šฉํ•ด ์•ฑ(Driver program) ๋‚ด๋ถ€์— ์ง์ ‘ ๋งŒ๋“ค์–ด ์ค€๋‹ค.

 

from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
res = distData.reduce(lambda a, b: a + b)
print(res)

 

์—ฌ๊ธฐ์„œ parallelize() ๋ฉ”์†Œ๋“œ์—์„œ partitions(=slices) ์ด๋ผ๋Š” optional ์ธ์ž๊ฐ€ ์žˆ๋‹ค. ์ŠคํŒŒํฌ์˜ ํด๋Ÿฌ์Šคํ„ฐ๋Š” ๊ฐ ํŒŒํ‹ฐ์…˜์—์„œ ํ•˜๋‚˜์˜ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค. ๋ณดํ†ต์€ ํด๋Ÿฌ์Šคํ„ฐ์—์„œ ๊ฐ CPU ๋‹น 2~4๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์„ ๊ฐ€์ง€๊ณ  ์ˆ˜ํ–‰ํ•œ๋‹ค. ์ผ๋ฐ˜์ ์œผ๋กœ ์ŠคํŒŒํฌ๋Š” ์ž๋™์ ์œผ๋กœ ํŒŒํ‹ฐ์…˜์˜ ๊ฐœ์ˆ˜๋ฅผ ์ ์ •ํ•˜๊ฒŒ ์„ธํŒ…ํ•ด์ฃผ์ง€๋งŒ ์‚ฌ์šฉ์ž๊ฐ€ ์ง์ ‘ ์ˆ˜๋™์ ์œผ๋กœ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ง‘์–ด ๋„ฃ์–ด ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๋ฅผ ์ปค์Šคํ„ฐ๋งˆ์ด์ง• ํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค.

 

from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data, 10) # 10๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ์ˆ˜ํ–‰!
res = distData.reduce(lambda a, b: a + b)
print(res)

4. ์™ธ๋ถ€ ๋ฐ์ดํ„ฐ์…‹์„ ๊ฐ€์ ธ์˜ค๊ธฐ

pyspark๋Š” ๋กœ์ปฌ์— ์žˆ๋Š” ํŒŒ์ผ, HDFS, ์นด์‚ฐ๋“œ๋ผ HBase, ์•„๋งˆ์กด S3 ๋“ฑ๊ณผ ๊ฐ™์€ ํ•˜๋‘ก์œผ๋กœ ์ง€์›๋˜๋Š” ์Šคํ† ๋ฆฌ์ง€๋กœ๋ถ€ํ„ฐ ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ์…‹์„ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ๋‹ค. ๋‹ค์Œ์€ txt ํ™•์žฅ์ž ํŒŒ์ผ๋กœ ๋˜์–ด์žˆ๋Š” ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์˜ค๋Š” ์ฝ”๋“œ์ด๋‹ค.

 

distFile = sc.textFile('./people.txt')
# map๊ณผ reduce์ด์šฉํ•ด txtํŒŒ์ผ ๋‚ด๋ถ€ ๊ฐ ๋ผ์ธ๋งˆ๋‹ค length ํ•ฉ์‚ฐ ๊ฐ€๋Šฅ
distFile.map(lambda l: len(l)).reduce(lambda a, b: a + b)

 

textfile() ๋ฉ”์†Œ๋“œ ์ด์™ธ์— wholeTextFiles() ๋„ ์žˆ๋Š”๋ฐ, ์ด๋Š” ์šฉ๋Ÿ‰์ด ์ž‘์€ ์—ฌ๋Ÿฌ ํ…์ŠคํŠธ ํŒŒ์ผ๋“ค์„ ํฌํ•จํ•˜๋Š” ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์ฝ์–ด (filename, content) ์Œ์œผ๋กœ ๊ฒฐ๊ณผ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค. ํ•˜์ง€๋งŒ ์ด๋Ÿฌํ•œ ํŒŒ์ผ์„ ์ฝ๋Š” ๋ฐฉ๋ฒ•์€ ์ตœ๊ทผ์— Spark SQL์˜ read/write ๋ฉ”์†Œ๋“œ์— ์˜ํ•ด ๋Œ€์ฒด๋˜๋Š” ๊ฒฝํ–ฅ์ด ์žˆ๋‹ค.

5. RDD Operations(RDD ์—ฐ์‚ฐ)

RDD ์—ฐ์‚ฐ์—๋Š” 2๊ฐ€์ง€ ์ข…๋ฅ˜๊ฐ€ ์žˆ๋‹ค.

 

  • Transformations : ๊ธฐ์กด์˜ ๋ฐ์ดํ„ฐ์—์„œ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ๋งŒ๋“ค ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค. ์˜ˆ๋กœ๋Š” map ํ•จ์ˆ˜๊ฐ€ ์žˆ๋‹ค.
  • Actions : ๋ฐ์ดํ„ฐ์…‹์—์„œ ํŠน์ •ํ•œ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ ํ›„ Driver program(์•ฑ)์— ์—ฐ์‚ฐ ๊ฒฐ๊ณผ๊ฐ’์„ ์ „๋‹ฌํ•œ๋‹ค. ์˜ˆ๋กœ๋Š” reduce ํ•จ์ˆ˜๊ฐ€ ์žˆ๋‹ค.

์ด ๋•Œ, Transformations์€ Spark ๋‚ด๋ถ€์—์„œ ๋Š๋ฆฌ๋‹ค. ์™œ๋ƒํ•˜๋ฉด Transformations ์ž์ฒด๋Š” ์—ฐ์‚ฐ์„ ๋ฐ”๋กœ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ํ•˜์ง€๋งŒ ์–ด๋–ค Transformations์„ ์ทจํ• ์ง€ ๊ธฐ์–ต(cache)ํ•œ๋‹ค. ๋ฐ˜๋ฉด์— Actions์ด ์ˆ˜ํ–‰๋˜์–ด์•ผ ์‹ค์ œ์ ์ธ ์—ฐ์‚ฐ์„ ์‹œ์ž‘ํ•œ๋‹ค. ์ด๋Ÿฌํ•œ ํ”„๋กœ์„ธ์Šค๋Š” ํšจ์œจ์ ์ด๋‹ค. ์™œ๋ƒํ•˜๋ฉด map์ด๋ผ๋Š” Transformations์€ reduce ๋ผ๋Š” Actions๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์‹œ์ž‘ํ•  ๋•Œ์•ผ ์ˆ˜ํ–‰๋˜๋ฉฐ ๊ฒฐ๊ตญ Actions์˜ ๊ฒฐ๊ณผ๊ฐ’๋งŒ ์•ฑ์— ๋ฐ˜ํ™˜ํ•˜๊ฒŒ ๋œ๋‹ค. ๊ทธ๋ž˜์„œ Transformations์ด ์ฐจ์ง€ํ•˜๋Š” ํฐ ๋ฉ”๋ชจ๋ฆฌ ์ ˆ์•ฝ์ด ๊ฐ€๋Šฅํ•˜๋‹ค. ์ฆ‰, Transformations์ด ๋ฐ˜ํ™˜ํ•  ์ƒˆ๋กœ์šด ํฐ ๋ฐ์ดํ„ฐ์…‹์„ ๋ฉ”๋ชจ๋ฆฌ์— ์˜ฌ๋ฆฌ์ง€ ์•Š๋Š”๋‹ค.

 

๊ธฐ๋ณธ์ ์œผ๋กœ Transformations๋œ RDD๋Š” Actions๋ฅผ ์ˆ˜ํ–‰ํ•  ๋•Œ๋งˆ๋‹ค ์žฌ ์—ฐ์‚ฐ์ด ๊ฐ€๋Šฅํ•˜๋‹ค. ํ•˜์ง€๋งŒ persist() ๋˜๋Š” cache() ๋ฉ”์†Œ๋“œ๋ฅผ ์ด์šฉํ•ด ๋ฉ”๋ชจ๋ฆฌ์— RDD ์ž๋ฃŒ๋ฅผ ์ €์žฅ์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ ๋ฉ”์†Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์ด์œ ๋Š” ๋‹ค์Œ์— ์ฟผ๋ฆฌํ•  ์‹œ RDD์— ๋” ๋นจ๋ฆฌ ์ ‘๊ทผํ•˜๊ธฐ ์œ„ํ•จ์ด๋‹ค.

 

# lines๋Š” ํ˜„์žฌ ๋ฉ”๋ชจ๋ฆฌ์— ๋กœ๋“œ๋˜์ง€ ์•Š๊ณ  ํ•ด๋‹น ํŒŒ์ผ์„ ๊ฐ€๋ฅดํ‚ค๋Š” ํฌ์ธํ„ฐ์ž„
lines = sc.textFile('people.txt')
# map์ด๋ผ๋Š” ๋ณ€ํ™˜์„ ์ทจํ•œ ํ›„์˜ ๊ฒฐ๊ณผ๊ฐ’(์—ฐ์‚ฐ๋˜์ง€ ์•Š์€ ์ƒํƒœ)
lineLengths = lines.map(lambda s: len(s))
# reduce๋ผ๋Š” ์•ก์…˜์„ ์ทจํ•จ์œผ๋กœ์จ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋ฉด์„œ ์ž‘์—… ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰. ๊ฒฐ๊ณผ๊ฐ’๋งŒ driver program์—๊ฒŒ ๋ฐ˜ํ™˜!
totalLength = lineLengths.reduce(lambda a, b: a + b)
# ๋งŒ์•ฝ ๋ณ€ํ™˜์„ ์ทจํ•œ linesLengths๋ฅผ ์ถ”ํ›„์—๋„ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๋ผ๋ฉด persist๋กœ ์ €์žฅ
lineLengths.persist()

 

๋ฐ˜์‘ํ˜•