Spark 性能调优

Transformation & Action

说到 Transformation 和 Action,就必须得提一下 RDD,RDD 的中文名字叫做弹性分布式数据集,我刚开始接触 Spark 的时候也非常纠结 RDD 到底是个什么东西,后来发现,RDD 其实就是一个 Spark 的很抽象的数据结构。

Spark 中 RDD 提供了两种类型的操作:Transformation 和 Action,所有 Transformation 都是采用懒策略,所谓懒策略就是指当 RDD 之间出现 Transformation 操作的时候是不会真正进行计算的,Spark 只记录了 RDD 的父 RDD,以及数据集进行的逻辑操作,当有 Action 操作提交时才会被触发。

  • Transformation 操作:得到一个新的 RDD,比如从数据源生成一个新的 RDD,或者从 RDD 生成一个新的 RDD,以下是属于 Transformation 的操作,
    • map
    • filter
    • flatmap
    • sample
    • union
    • groupByKey
    • reduceByKey
    • join
    • groupWith
    • Cartesian
  • Action 操作:Action 操作一般是得到一个结果集或者一个值,Spark 会将 RDD 结果集缓存到内存中,以下是属于 Action 的操作,
    • reduce
    • collect
    • count
    • take
    • first
    • saveAsTextFile
    • saveAsSequenceFile
    • foreach

Shuffle 操作

说到 Shuffle 操作就必须提一下 RDD 的宽窄依赖,具有窄依赖关系的 RDD 可以在同一个 stage 中进行计算,宽窄依赖关系是划分 stage 的重要依据。

窄依赖

父 RDD 的 partition 至多被一个子 RDD partition 依赖,表现为一个父 RDD 的分区对应于一个子 RDD 的分区,和两个父 RDD 的分区对应于一个子 RDD 的分区。图中,map/filter 和 union 属于第一类,对输入进行协同划分(co-partitioned)的 join 属于第二类。

宽依赖

父 RDD 的 partition 被多个子 RDD partitions 依赖,子 RDD 的分区依赖于 父 RDD 的所有分区,这是因为 shuffle 类操作,如图中的 groupByKey 和未经协同划分的 join。

WechatIMG97

以下是属于 shuffle 的操作,

  • 去重,distinct
  • 聚合
    • reduceByKey
    • groupBykEY
    • AggregateByKey
    • combineByKey
  • 排序,sortByKey
  • 重分区
    • coalesce
    • repartition
  • 集合或表操作
    • intersection,返回两个集合的交集,去重
    • subtract,相当于对主 RDD 做了一个条件为从 RDD 的 filter 操作
    • subtractByKey,操作同 subtract,只不过是针对 Key 进行操作
    • join,leftOuterJoin,fullOuterJoin,rightOuterJoin..等一切 join 操作

部分操作分析

reduceByKey & groupByKey

  • reduceByKey,可以实现自定义操作,具体的操作为先在单个 Executor 上进行相同 Key 的聚合操作,merge 操作的方式可以自定义,然后将结果集返回至 driver,然后再将相同 Key 载入到一个 Executor 进行聚合操作。
  • groupByKey,不能实现自定义操作,如果聚合后还需要进行后续操作需要对 groupByKey 后的数据集进行 map 操作,具体操作为先把所有相同 Key 的数据载入到一个 Executor 中,然后进行聚合操作。
  • 根据上述描述可以知道,如果数据量比较小的情况下,并且只需要进行聚合操作,那么用 groupByKey 即可,如果数据集较大,用 groupByKey 在最初的分 partitions 操作的过程中可能会比较耗费性能,因为数据传输涉及到网络传输以及 IO 操作,这时候用 reduceByKey 会比较好。

coalesce & repartition

  • coalesce,一般用于减少分区操作,比如当前有 100 个 partitions,需要减少至 10 个,那么用 coalesce 会避免 shuffle 操作,但是同时相应的也会减少并行度,如果增加分区数量,也会触发 shuffle 操作,传入的第二个参数可以避免减少分区时减少并行度,如果减少分区过程中第二个参数传入的是 true,那么会在重分区的过程中增加一步 shuffle 操作,增加并行度。
  • repartition,一定会触发 shuffle 操作,相当于将整个数据集重新分到指定个数分区中,这个操作一般用于数据集较大,但是单个 key 之间数据量相差不大的情况,这样重分区会增加并行度。不适用的情况就是,少数 key 的数据量远大于其他 key 的数据量,这样重分区后,数据量较大的 key 还是会划分到同一个 partition 中,这个 stage 的执行时间基本就是这个 job 的性能瓶颈了。
  • 综上,如果需要减少分区的时候,比如数据量较小,但是存储时候希望存储到同一个 partition 中,可以用 coalesce 减少分区数量,会避免 shuffle 操作,但是如果数据量较大但数据相对均匀的情况下,可以使用 repartition 增加并行度。

countByKey

  • countByKey,先在单个 Executor 上进行 count,然后聚合到 driver 上进行 count

配置优化

shuffle 配置

  • spark.shuffle.io.maxRetries,最大重试次数,当做聚合操作时,shuffle read 和 shuffle write 操作所在节点会拉取属于自己的数据,但有时候因为网络问题会导致拉取失败,适当增加重试次数的时候可以增加稳定性,避免由于 GC 或者网络等原因导致任务失败。
  • spark.shuffle.io.retryWait,重试拉取数据的等待时间。
  • spark.shuffle.memoryFraction,Executor 内存中,分配给 shuffle read task 进行聚合操作的内存比例,默认为 20%,当内存较为充足的时候,而且很少使用持久化操作时,建议提高比例,避免由于内存不足频繁读写磁盘。

driver 配置

  • spark.driver.maxResultSize,最大结果集大小,默认 1G,可以根据资源合理配置这个参数,避免由于结果集超过内存限制而导致任务失败。
  • spark.executor.memory,Executor 内存大小,默认 1G。

SQL 配置

  • spark.sql.shuffle.partitions,执行 SQL 中的 shuffle 操作中默认分区数量,比如 group by、join 等,默认为 200,事实上 200 个分区数量在大部分场景下都不够,适当调整这个参数对于整个 job 优化非常有效。

其他配置

  • spark.default.parallelism,每个 stage 默认 task 数量,Spark 中 Task 被执行的并发度 = Executor数目 * 每个Executor核数,如果 Executor 的数量较多,但是单个 stage 的 task 数量不够,就可能导致只有 1~2 个 Executor 在执行,其他 Executor 在闲置,这样就会造成资源浪费。

实际问题的优化方案

整体数据集非常大,但是数据较为均匀

这种情况,上面介绍的时候其实已经提到了。我遇到过一个 Case,第一次数据量约为 30 亿,第二次数据量约为 50 亿,第三次数据量约为 87 亿,30 亿数据量的时候,只需要增加内存大小就可以解决,50 亿数据量的时候用了 repartition 做重分区也可以解决,但是到了 87 亿数据量的时候,仅仅是重分区的过程就花费了将近 2 个小时,由于我们整个数据集较大,但是数据较为均匀,所以修改了 SQL 默认的分区数量,将整个作业时间从近 3 个小时优化至 30 分钟。

大部分 stage 执行时间较短,单个 stage 执行时间很长

这种情况一般是因为数据倾斜,建议是首先观察一下耗时最久的 stage 是第几个 stage,然后观察自己的代码不管是 SQL 还是 RDD 操作,根据上面提到的 shuffle 操作进行一下划分,确定一下耗时最久的 stage 出现的位置,然后将数据集 countByKey 输出一下,查看一下数据集分布情况,然后有几种解决方案:

  1. 根据多个 key 组合后再重新分区,这样会有效减少单个 partition 中的数据量
  2. 用key+随机数作为新的key,这样会避免单个key数据太大的情况,但是如果单个key数据很大在后续shuffle操作中依然会很消耗性能,如果后续只有map操作是可以这样来避免倾斜的
  3. 根据倾斜情况,进行数据预处理

由于 join 导致 Executor OOM

join 操作会导致 shuffle 操作,会将两个数据集聚合在同一个 Executor 上然后进行 join 操作,如果两个数据集比较大可能就会导致 Executor OOM。报错如下,org.apache.spark.SparkException: Job aborted due to stage failure: Task 1678 in stage 9.0 failed 4 times, most recent failure,虽然表面看上去报错是因为某个 Executor 超时并且重试失败,但是在作业进行的过程中观察 stage 失败原因会发现是由于 Executor OOM 导致的。这时候可以用 broadcast 代替 join 操作。具体步骤如下:

  1. 选择较小的数据集进行广播
  2. 遍历较大的数据集进行 map 或者 mapToPair 操作,两个数据集的 key 相同则进行 join 操作

这两种操作的区别非常明显,就是 join 操作需要聚合到一个 Executor 上,很容易导致 OOM,而广播只需要将较小的数据集广播到各个 Executor,然后较大的数据集依然是根据 key 的分布在不同的 Executor 上,相当于是在各个 Executor 上分别进行 join 操作。我遇到的实际场景,用广播代替了 join 之后,时间缩短了 81.25%。