上虞第一城市门户欢迎您!   手机上虞广播网

Hive,Flink,Spark出现数据倾斜了?

来源:上虞门户网  2021-07-22 16:33

数据倾斜最笼统概念就是数据的分布不平衡,有些地方数据多,有些地方数据少。在计算过程中有些地方数据早早地处理完了,有些地方数据迟迟没有处理完成,造成整个处理流程迟迟没有结束,这就是最直接数据倾斜的表现。

Hive,Flink,Spark出现数据倾斜了?别慌,原因和解决方法早知道

Hive数据倾斜

就是单说hive自身的MR引擎:发现所有的map task全部完成,并且99%的reduce task完成,只剩下一个或者少数几个reduce task一直在执行,这种情况下一般都是发生了数据倾斜。说白了就是Hive的数据倾斜本质上是MapReduce的数据倾斜。

Hive数据倾斜的原因

在MapReduce编程模型中十分常见,大量相同的key被分配到一个reduce里,造成一个reduce任务累死,其他reduce任务闲死。查看任务进度,发现长时间停留在99%或100%,查看任务监控界面,只有少量的reduce子任务未完成。

  1. key分布不均衡。

  2. 业务问题或者业务数据本身的问题,某些数据比较集中。

  • join小表:其中一个表是小表,但是key比较集中,导致的就是某些Reduce的值偏高。

  • 空值或无意义值:如果缺失的项很多,在做join时这些空值就会非常集中,拖累进度。

  • group by:维度过小。

  • distinct:导致最终只有一个Reduce任务。

Hive数据倾斜解决

  1. group by代替distinct 要统计某一列的去重数时,如果数据量很大,count(distinct)就会非常慢,原因与order by类似,count(distinct)逻辑导致最终只有一个Reduce任务。

  2. 对1再优化:group by配置调整

  • map端预聚合group by时,combiner在map端做部分预聚合,可以有效减少shuffle数据量。checkinterval:设置map端预聚合的行数阈值,超过该值就会分拆job。

Hive,Flink,Spark出现数据倾斜了?别慌,原因和解决方法早知道

倾斜均衡配置 Hive自带了一个均衡数据倾斜的配置项。其实现方法是在group by时启动两个MR job。第一个job会将map端数据随机输入reducer,每个reducer做部分聚合,相同的key就会分布在不同的reducer中。第二个job再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果。

Hive,Flink,Spark出现数据倾斜了?别慌,原因和解决方法早知道

3. join基础优化

  • Hive在解析带join的SQL语句时,会默认将最后一个表作为大表,将前面的表作为小表,将它们读进内存。如果表顺序写反,如果大表在前面,引发OOM。不过现在hive自带优化。

  • map join:特别适合大小表join的情况,大小表join在map端直接完成join过程,没有reduce,效率很高。

  • 多表join时key相同:会将多个join合并为一个MR job来处理,两个join的条件不相同,就会拆成多个MR job计算。

4.sort by代替order by将结果按某字段全局排序,这会导致所有map端数据都进入一个reducer中,在数据量大时可能会长时间计算不完。使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by一同使用。如果不加distribute by的话,map端数据就会随机分配到reducer。

5.单独处理倾斜key一般来讲倾斜的key都很少,我们可以将它们抽样出来,对应的行单独存入临时表中,然后打上随机数前缀,最后再进行聚合。或者是先对key做一层hash,先将数据随机打散让它的并行度变大,再汇集。其实办法一样。

Flink数据倾斜

Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压。

部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。

Flink数据倾斜的原因

  1. 代码KeyBy、GroupBy 等操作,错误的使用了分组 Key,产生数据热点。

  2. 业务上有严重的数据热点。

Flink如何定位数据倾斜

  1. 定位反压

Flink Web UI 自带的反压监控(直接方式)、Flink Task Metrics(间接方式)。通过监控反压的信息,可以获取到数据处理瓶颈的 Subtask。

  1. 确定数据倾斜

Flink Web UI 自带Subtask 接收和发送的数据量。当 Subtasks 之间处理的数据量有较大的差距,则该 Subtask 出现数据倾斜。

Flink数据倾斜的处理

数据源 source 消费不均匀

通过调整Flink并行度,解决数据源消费不均匀或者数据源反压的情况。我们常常例如kafka数据源,调整并行度的原则:Source并行度与 kafka分区数是一样的,或者 kafka 分区数是KafkaSource 并发度的整数倍。建议是并行度等于分区数。

key 分布不均匀

上游数据分布不均匀,使用keyBy来打散数据的时候出现倾斜。通过添加随机前缀,打散 key 的分布,使得数据不会集中在几个 Subtask。

两阶段聚合解决 KeyBy(加盐局部聚合+去盐全局聚合)

  • 预聚合:加盐局部聚合,在原来的 key 上加随机的前缀或者后缀。

  • 聚合:去盐全局聚合,删除预聚合添加的前缀或者后缀,然后进行聚合统计。

Spark数据倾斜

  1. Executor lost,OOM,Shuffle过程出错。

  2. Driver OOM。

  3. 单个Executor执行时间特别久,整体任务卡在某个阶段不能结束。

  4. 正常运行的任务突然失败。

Spark定位数据倾斜

Spark数据倾斜只会发生在shuffle过程中。

这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。

出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

Spark数据倾斜的解决方案

  • 使用Hive ETL预处理数据

通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。

  • 适合场景

导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。

  • 过滤少数导致倾斜的key

如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。

  • 适合场景

如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。

  • 提高shuffle操作的并行度

增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。

  • 两阶段聚合(加盐局部聚合+去盐全局聚合)

预聚合:加盐局部聚合,在原来的 key 上加随机的前缀或者后缀。

聚合:去盐全局聚合,删除预聚合添加的前缀或者后缀,然后进行聚合统计。

  • 适合场景

对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。

  • 将reduce join转为map join

使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。

  • 适合场景

在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小,比较适用此方案。

  • 采样倾斜key并分拆join操作

对join导致的倾斜是因为某几个key,可将原本RDD中的倾斜key拆分出原RDD得到新RDD,并以加随机前缀的方式打散n份做join,将倾斜key对应的大量数据分摊到更多task上来规避倾斜。

  • 适合场景

两个较大的RDD/Hive表进行join时,且一个RDD/Hive表中少数key数据量过大,另一个RDD/Hive表的key分布较均匀(RDD中两者之一有一个更倾斜)。

  • 用随机前缀和扩容RDD进行join

查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。然后将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。

  • 适合场景

RDD中有大量key导致倾斜。

总的来说,不管再出现分布式计算框架出现数据倾斜问题解决思路如下:很多数据倾斜的问题,都可以用和平台无关的方式解决,比如更好的数据预处理,异常值的过滤等。因此,解决数据倾斜的重点在于对数据设计和业务的理解,这两个搞清楚了,数据倾斜就解决了大部分了。关注这几个方面:

  1. 业务逻辑方面

  2. 数据预处理。解决热点数据:分而治之(第一次打散计算,第二次再最终聚合计算)。

  3. 程序代码层面

导致最终只有一个Reduce任务的,需要想到用替代的关键字或者算子去提升Reduce任务数。

调参。

4.熟悉自己手中的工具(框架),优秀的框架已经负重前行给你优化了好多。

不仅要学,更学会去用,更要努力去完善拓展框架功能。

文章部分素材来源:大数据左右手

Copyright 2012-2013 上虞第一城市门户网站 版权所有

郑重声明:网站资源摘自互联网,如有侵权,麻烦通知删除,谢谢!