Spark: RDD design

以 2023 年的视角来看,MapReduce 是个过度力大砖飞的软件,它强调强制的 2个 Stage (一些其他第三方之类的方案会有别的 Stage),然后 Stage 中间 Sort + 落 HDFS 文件来做 Shuffle,靠 Reducer Pull 来传输数据。这样的计算流很固定、性能比较低效,但是相对来说确实是个力大砖飞的方案。

Spark 之前,首先是微软在 SCOPE 系统和 DryadLinq 之类的地方做出了贡献,包括 DAG 一样的计算流、(类)SQL 的支持,但是 Spark 确实是开源社区很早掏出来的,并且在今天已经茁壮成长为了 DataBricks 这样的巨大公司和拥有了庞大的周边生态,可见这套东西打法还是很重要。Spark 支持的对象有 DataSet/DataFrame 和 SQL 甚至 Streaming / ML 等多个目标的庞大系统。不过我们会在之后的内容中提到 Spark SQL 甚至 DataFrame,我们今天只涉及 Spark 论文中提出的 RDD。

RDD

Spark 最初版本整个在 Scala 上编写,它提供的抽象是 RDD。在论文写作的时候,它只做一些 Batch Exec 的计算,并且论文中提到的优势场景包括 iterative algorithms (eg: 图计算,ML 的训练) 和 interactive data mining tools(给同一份数据发多次 adhoc query). 同时,RDD 的目标是尽量用上内存(利用上内存并不难,但是 MPI 可能需要知道对应机器的位置,此外还需要能够从故障或者 worker failover 中恢复出来)、利用 Intermediate result。

相对于 MPI, Pregel 等系统,Spark RDD 构建的是一种粗粒度的抽象,它:

  • 类似 Fp,本身是 immutable 的
  • 描述计算过程,同时也有一些相对 internal 的东西, 比如 Partition。也允许有 persist (持久化) 和 cache (缓存/物化结果并尽量在内存)
  • Parallel:因为 RDD 只描述计算过程,同时一些 Map 累的计算流程通常是相同的,所以允许 Parallel 的去执行。当然也可以根据 partition 之类的东西来决定这部分的描述
  • Fault Tolerant: 这个 FT 名字有点抽象,并不是和 TP Workload 一样的 FT,而是和 MapReduce 那种没算完或者挂了,就去重新 Exec 对应的任务

上面的部分其实有点抽象。不过后面都会讲。论文先举了两个 RDD 的例子:

1
2
3
4
5
6
lines = spark.textFile("hdfs://...") 
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()
errors.count()

errors.filter(_.contains("MySQL")).count()

这里的计算流程可以如下所示:可以看到 RDD 要么来自文件输入(lines) 要么来自对之前 RDD 的 transformation。

image-20230619195027384

文章也对比了 Spark 和 Distributed Memory 的优势(不过我个人认为 RDD 和 Distributed Memory 完全不是一套东西,也用在不同的场景,我现在觉得 Distributed Memory 是 “Internal”)下面的表格吹了很多有点,但我觉得主要区别还是:

  • Spark 自己维护了 lineage,表述数据的构造和血缘关系
  • 声明式需要某种计算 vs 具体实现对应的计算
  • RDD 在内存不够的时候也可以 downgrade 成类似 MapReduce 那种模式(利用 Spill 和 Shuffle)

别的都是虚头巴脑的,感觉扯来扯去浪费时间。

0CAF7F90-40E2-474C-A42C-19B76FF70AB1

这里论文还描述了一下不适合 RDD 不适合 Streaming 或者小亮多次更新的应用,看见 DataBricks 的 Live Table,我陷入了沉思。嘛,技术是发展的,Delta-IO 提供的数据湖本身也是一种绕弯弯的方案吧。

Spark Programming Interface

Spark 的 Cluster 模式如链接所述:https://spark.apache.org/docs/latest/cluster-overview.html 。本身把 worker 部署的时候部署在机器或者容器上,有 standalone, Yarn/Mesos 调度, Kubernetes 模式。这里有几个(全宇宙都一样的)模型:

cluster-overview

image-20230620221329414

用户可以用 Scala 编写 RDD,RDD 是带类型的对象 RDD[T], 对 RDD 操作分为 transformationaction,里面内容其实非常好理解(我感觉一些带副作用的在这里反而显得有点怪):

EBF3B6DA-0602-4995-B618-93751F38FD6C

这里会显得有点混乱,但是其实很简单,首先这些功能差不多都是函数式或者 db 类似的原语. 这里有几个值得注意的:

  • join union cogroup crossProduct: 令人熟悉的数据库老朋友
  • sort: 懂得都懂(其实类型上 sort 还没有,我感觉类似会有个 properties 的东西在里面?)
  • partitionBy: 需要注意的是,这里指定的是类似「物理执行」的属性,这点感觉其实对性能之类的相对来说比较重要,举个例子,比如本身可能走 shuffle 的计算,在 Partition 之后就不一定要走 Shuffle 了。

论文这里提供了一个 PageRank 的例子,其实 MR 之类的都会在这介绍 PageRank,什么公共靶场. 这里需要注意的是,它把 linkscache 下来了,然后在每一步都可以用这个基本的 RDD,而不需要重新计算。这里有个 Python 的例子:https://github.com/apache/spark/blob/master/examples/src/main/python/pagerank.py (注意 cache 和迭代)

image-20230620225708643

需要注意的是,作者认为这里 links 很大,所以没有去 broadcast 它,这里也提到,如果负载比较明确,可以去 partition 一把:

1
links = spark.textFile(...).map(...).partitionBy(myPartFunc).persist()

对的,其实 Spark RDD 还带了一组比较物理性质的操作:

0D7ADA36-5AD3-482E-B1BC-21766A1C686B

Spark 的操作可以指定 numPartition 和对应的 partitioner, 甚至去 repartition,不过这里需要注意 partition 本身不生成 RDD ( repartition 肯定会)。它有:

  • List Partition
  • Hash Partition
  • Range Partition

上面几种,具体还是看业务了。cache / presist 本身是个 lazy 的操作,会在执行的时候才处理,Cache / Persist 的对象是 RDD 的结果

RDD 的组成

作者在这里认为,RDD 包含:

  • Partitions
  • Dependencies (对应的数据源)
  • Compute Function
  • Metadata About Schema

我个人觉得其实有点像数据库的 Physical Plan 一类的东西(或许 Spark SQL 的论文会告诉我有什么区别)。

关于 RDD,几个要关注的操作是:

  • dependency and shuffle
  • plan 构建
  • cache / persist

我们先讲 dependency & shuffle. 也是 Spark 的核心之一。

Dependency and Shuffle

14AC10E6-1CB9-40C2-AAAC-2CA613DAF85B

Spark 会根据操作类型来判断是否要 Shuffle,这里的判断方式其实看上面,意外很简单,它把 Spark 区分为 Narrow Dependencies 和 Wide Dependencies:

  • Narrow Dependencies: where each partition of the parent RDD is used by at most one partition of the child RDD
  • Wide Dependencies: wide dependencies, where multiple child partitions may depend on it.

这里我特地贴原文,因为它含义还是比较清晰的,因为这里一对一 和多对一的单位是 RDD中的 Partition

  • OneToOne、ManyToOne 之类的关系显然肯定是 Narrow Dependencies
  • 多对一参照 Partition,co-locate Join 这种可能会是 Narrow Dependencies!

我们下面区分到 Shuffle 的实现,Shuffle 在 MapReduce 中也能见到,它是一个部分整个写过去,另一部分整个读的操作。这个操作还是相当重的,而且可能涉及排序。这里考虑分成 Shuffle Write 和 Shuffle Read 的阶段,写的时候可能走排序或者 Hash + Agg,读的时候可能还需要再走一个类似多路归并的操作。

BC2A9E92-563A-4D1D-8647-7336690F0C24

在 VLDB‘20 上,LinkedIn 的工程师提供了一个 Spark Shuffle 的方案,Spark 3.2 的时候被 Merge 进去了。之后写篇单独文章介绍这个 Shuffle 的流程好了。

Stage

C0F5047D-EBFA-4C60-9265-16E8822B489B

Spark 会按照 Wide Dependencies 拆分成多个 Stage,Stage 内并行执行(有点类似 Push-based Execution Engine?)

63816205-6672-4A0D-8A47-E3AD07D1A28D

总结

本文关注 Spark 计算和 Shuffle,省略了内存管理、代码生成等部分,因为我感觉这些东西要么我们之前讲过更现代的方案,要么 Photon 自己做了,没必要在意这么老论文是怎么 codegen 、内存管理的。

Spark 感觉论文里也提出了力大砖飞的方式。此外,对于 Shuffle 等地方的优化,本文也只是提到,没有深入(入门阶段先 move-fast 先扫一遍,之后懂得多了再补回来吧)。关于各个 transform,其实本来应该大有可以讲的地方,不过目前点到为止算了。

Spark 还有一套比较大的杀手锏是 Adaptive Query Execution,它能根据大小来动态调整执行的资源,来处理大数据场景的 skew,这是个比较重要的优化,但本文也略过了,实在深感惭愧。

References