spark优化 数据倾斜解决办法

2020年11月20日 26点热度 0人点赞 0条评论

数据倾斜 为什么会数据倾斜 spark 中的数据倾斜并不是说原始数据存在倾斜,原始数据都是一个一个的 block,大小都一样,不存在数据倾斜; 而是指 shuffle 过程中产生的数据倾斜

最近一直有各种各样的事,所以原定上周四发的kafka源码的文章就搁浅了,眼看今天又是周四了,不能鸽了,毕竟年轻人要讲武德... ...

在2k多篇笔记里面翻吧翻吧找了这篇文章:spark中数据倾斜的优化和解决方案,先发一下凑合看,文章比较长,但也算是有用的点

也在这里提醒下大家,注意锻炼身体,毕竟身体才是革命的本钱

提示一下:

  • stage是由宽依赖来划分的
  • stage之间是要做shuffle操作的

导致分布式计算应用程序出现数据倾斜的原因就是shuffle,数据倾斜的调优,都是围绕:

  • 要么不使用shuffle
  • 要么让shuffle在执行过程中均匀分发数据

什么是数据倾斜?

    所谓数据倾斜指的是,在并行处理的数据集中,某一部分(如Spark或kafka的一个Partition)的数据明显多于其他部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。

原因可能是自己指定的分区规则导致的数据倾斜,也可能是数据源分布本身有问题,有可能出现,但大概率不会。

数据倾斜发生的现象

  • 绝大多数task执行都非常快,但个别task执行极慢,假设100个task并行执行,其中97个task分钟级别执行完成,剩余的两三个task却要小时级别执行完,这种情况比较常见
  • 原本能够正常执行的Spark作业,某天却突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的,此种情况比较少见。

数据倾斜的原理

数据倾斜的危害

  • 整体耗时较长(任务的完成由执行时间最长的那个task决定)
  • 应用程序可能异常退出(某个task执行时出来的数据量远远大于正常节点,则需要的资源容易出现瓶颈,当资源不足,则应用程序退出)
  • 资源闲置(处理等待状态的task资源得不到及时的释放,处于限制浪费状态)

如何消除或缓解数据倾斜

1.避免数据源倾斜-HDFS(对于不可切分文件可能出现数据倾斜,对于可切分文件,一般来说,不存在数据倾斜问题)

  • 可切分:基本不会
  • 不可切分:源文件不均匀,最终导致分布式应用程序计算产生数据倾斜

2.避免数据倾斜-Kafka

  • kafka不是计算引擎,只是一个用来在流式项目架构中起削峰填谷作用的消息中转平台,所以为保证一个topic的分布式平衡,尽量不要使用hash散列或者跟业务有关的自定义分区规则等方式来进行数据分区,否则会造成下游消费者一开始就产生数据倾斜
  • kafka尽量使用随机、轮询等不会造成数据倾斜的数据分区规则

3.定位处理逻辑-Stage 和 Task

  • 归根结底,数据倾斜产生的原因,就是两个stage中的shuffle过程导致的,所以我们只需要研究shuffle算子即可。
  • 我们知道了导致数据倾斜的问题就是shuffle算子,所以先去找到代码中shuffle算子,比如:distinct、groupByKey、reduceByKey、aggergateByKey、join、replication等
  • 如果Spark Application运行过程中,出现数据倾斜,可以通过web界面,查看各stage的运行情况

4.查看导致倾斜key的数据分布情况

  • 知道数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决
  • 此时根据执行操作的情况不同,可以有很多种查看key分布的方式:
    • 如果是Spark SQL中group by,join语句导致的数据倾斜,那么就查询一下sql中使用的表的key分布情况
    • 如果对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布式情况。
    • 数据采样

数据倾斜解决方案

方案一:使用ETL预处理数据

  • 适用场景
    • 导致数据倾斜的是hive表。如果该表中的数据本身很不均匀(比如某个key对应了100万数据,其他key只对应了10条数据),而且业务场景需要频繁使用Spark 对 hive 表执行某个分析操作,那么比较适合使用这种方案
  • 实现思路
    • 此时可以评估一下,是否可以通过hive来进行数据预处理(即通过hive etl预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的hive表了,而是预处理后的hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作。
  • 实现原理
    • 这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是,这种方式属于治标不治本,因为毕竟数据本身就存在分布不均匀的问题,所以hive etl中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致hive etl的速度很慢。我们只是把数据倾斜的发生提前到了 Hive etl中,避免Spark程序发生数据倾斜而已。
  • 方案优缺点
    • 优点:实现起来简单便捷,效果突出,完全规避掉了数据倾斜,Spark作业性能会大幅度提升
    • 缺点:治标不治本

方案二:调整Shuffle的并行度

  • 适用场景
    • 大量不同的key被分配到了相同的task,造成该task数据量过大。
    • 如果我们必须对数据倾斜迎难而上,那么建议优先使用此方案,因为这是处理数据倾斜最简单的一种方式,但是也是一种属于碰运气的方案。因为这种方案,并不一定让你解决数据倾斜,甚至有可能加重。当然,总归你会调整到一个合适的并行度能解决,前提是这种方案适用于hash散列的分区方式。凑巧的是各种分布式计算引擎,例如map reduce,spark等默认使用hash散列的方式进行数据分区。
    • spark 在做shuffle时,默认使用hashpartitioner(非hash shuffle)对数据进行分区。如果并行度设置的不合理,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其他task,从而造成数据倾斜。
    • 如果调整shuffle时的并行度,使得原本被分配到同一个task的不同key,分发到不同的task上处理,则可以降低原task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。
  • 实现思路
    • 在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。
    • 对于Spark Sql中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitons,该参数代表了shuffle readTask的并行度,默认值是200,对于很多场景来说都有点过小。
  • 实现原理
    • 增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。

方案三:将reduce join 转为 map join

  • 适用场景
    • 在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百兆或1-2个G)
    • 必须是大小表join时候才可以使用此方法。
  • 实践思路
    • map join,join逻辑的完成是在mapper阶段,如果是spark任务,表示只用一个stage就执行完了join操作,避免了两阶段之间的shuffle,效率高,没有shuffle也就没有了倾斜,但是使用资源较多,只适合大小表join的场景
    • reduce Join,join逻辑的完成是在reducer阶段完成的,如果是mapreduce任务,则表示mapper阶段执行完之后就把数据shuffle到reducer阶段来执行join逻辑,可能会导致join操作。如果是spark任务,意味着上一个stage的执行结果数据shuffle到下一个stage中来完成join操作,同样也可能阐述数据倾斜。
  • 实现思路
    • 不使用join算子进行连接操作,而使用broadcast变量 与 map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。
    • 将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量,接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小的rdd

方案四:局部聚合 + 全局聚合

  • 适用场景
    • 通用解决方案,只适用于聚合操作类,join类不适合
    • 串行执行
  • 实现思路
    • 多重shuffle操作,一次随机shuffle + 一次hash散列
  • 实现过程
    • hive中可通过参数控制
    • spark中需要写代码控制

方案五:使用随机前缀和扩容 RDD 进行join

  • 适用场景
    • 如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key没有意义。
    • 笛卡尔(Descartes)乘积又叫直积。假设集合A={a,b},集合B={0,1,2},则两个集合的笛卡尔积为{(a,0),(a,1),(a,2),(b,0),(b,1), (b,2)},所以笛卡尔积可以理解为两个集合内所有元素都相互组合在一起,时间复杂度极高,以sql举例子,可以理解为两个表无条件的join操作。
  • 实现思路
    • 首先看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的Rdd/Hive表,比如有多个key都对应超过1w条数据
    • 然后将该RDD的每条数据都打上一个N以内的随机前缀
    • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0-n的前缀
    • 最后将两个处理后的rdd进行join即可
    • 本质:增加随机字段/链接字段 + 扩容RDD(表A 和 表B),从而增加task个数
  • 实现原理
    • 将原本一样的key,通过附加前缀变成不一样的key,将处理后不一样的key分散到多个task中去处理,而不是让一个task处理大量的相同key
    • 此方法对内存资源要求很高

方案六:自定义分区

  • 适用场景
    • 使用自定义的partitioner实现类代替默认的hash partitioner,尽量将所有不同的key均匀分配到不同的task中。
    • 大量不同key被分配到了相同task,造成该task数据量过大。
  • 实现思路
    • 先通过抽样,了解数据的key的分布规律,然后根据规律,去定制自己的数据分区规则,尽量保证所有的task的数据量相差无几
  • 常见分区解释
    • 随机分区
      • 优点:数据分布均匀
      • 缺点:具有相同特点的数据不会保证被分配到相同分区
    • 轮询分区
      • 优点:确保一定不会出现数据倾斜
      • 缺点:无法根据存储/计算能力分配存储/计算压力
    • hash散列
      • 优点:具有相同特点的数据保证被分配到相同的分区
      • 缺点:极容易产生数据倾斜
    • 范围分区
      • 优点:相邻的数据都在相同分区
      • 缺点:部分分区的数据量会超出其他分区,需要进行切分、裂变以保持所有分区的数据量是均匀的,如果每个分区不排序,那么裂变就会很困难

结束语

    就像标题说的那样,如果你觉得还有一些方法也算,你可以留言说出来,毕竟只有思想上的碰撞,才会有提升

萧伯纳都说了:如果你有一个苹果,我有一个苹果,彼此交换,我们每个人仍然只有一个苹果;如果你有一种思想,我有一种思想,彼此交换,我们每个人就有了两种甚至多于两种的思想

最后,前面说了,身体是革命的本钱,也推荐一种运动方式给大家,提高身体素质,这种运动是集速度、爆发、灵敏、协调于一身的肾上腺素爆表训练运动,关键是花费的时间也不多,动图解释

运动之前做好如下保护措施:关注微信公众号,并点点“在看”和“分享”,造就金刚护体神功,peace~

阿布

源自灵魂深处的自我救赎。

文章评论