数据倾斜

1.1 什么是数据倾斜

在map /reduce程序执行时,有时候会出现reduce大部分执行完毕,但是有一个或者几个reduce运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。这种现象就是因为key分布不均匀、散度不够导致的,也就是数据倾斜。

1.2 数据倾斜产生的原因

在HIVE上执行JOIN,GROUP BY,COUNT, DISTINCT等操作的时候可能会发现ruduce阶段卡在99.99%,一直卡着不能结束,查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成;进一步查看进程日志或者WEBUI会发现:有一个多几个reduce卡住;CONTAINER报错OOM,读写的数据量极大,超过其它正常的reduce ,伴随着数据倾斜,会出现任务被kill等各种情况。一般情况下Hive的数据倾斜,都发生在SQL中GROUP和ON上,而且和数据逻辑绑定比较深。


数据倾斜的解决办法

2.1 调整配置:

  • 增加JVM 的内存 。这适用于数据的key非常少,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率。
  • 增加Reduce的个数 。适用于有较多记录值的key都被分到了同一个分区,导致的数据倾斜。如MapReduce,它的分区默认是HashPartitioner,让key的哈希值对设定的Reducer Task个数取余。如果我们增加Reduce的个数(修改numReduceTasks值),就会让一些key被分到不同的分区。虽然工作量仍然会不均衡,但是已不会有这么严重的数据倾斜。
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
--每个reduce任务处理的数据量,默认为1000^3=1G

set mapred.reduce.tasks = 15;
--每个任务最大的reduce数,默认为999

2.2 聚合类型的数据倾斜解决方法:

聚合类型出现数据倾斜主要是使用group by、distinct造成的。针对聚合类的数据倾斜,有以下方法:

  • 通过加随机前缀重新设计key 。比如,我们可以在map阶段随机加上一个固定长度的随机数,使得分区的时候不会像之前那样分到同一个节点,完成一次局部聚合。在这之后将前缀去除,重新进行一次全局聚合即可。如下例子:
--水果字段名为category
selectcount (substr(x.category,1,2)) 
from
(select concat(category,'_',cast(round(10*rand())+1as string))
from table1
groupby concat(category,'_',cast(round(10*rand())+1as string))
) x --1阶段聚合
groupby substr(x.category,1,2);     --2阶段聚合
  • 使用COMBINER合并 。Map阶段会将环形缓冲区的数据排序并溢写,在溢写之前,使用combiner将相同key数据进行合并(如累加)。这减轻了数据倾斜的现象,减轻了map端向reduce端发送的数据量(减轻了网络带宽),也减轻了map端和reduce端中间的shuffle阶段的数据拉取数量(本地化磁盘IO速率)。

2.3 大表Join小表的数据倾斜解决办法:

set hive.auto.convert.join = true; -- hive是否自动根据文件量大小,选择将common join转成map join 。
set hive.mapjoin.smalltable.filesize =25000000; --大表小表判断的阈值,如果表的大小小于该值25Mb,则会被判定为小表。则会被加载到内存中运行,将commonjoin转化成mapjoin。一般这个值也就最多几百兆的样子。

当一个大表驱动小表的时候,可以将小表直接读到内存中,进行Map Join,省去了Shuffle阶段。

2.4 大表Join大表的数据倾斜解决办法:

  • 对表分桶排序后Join 。当两个大表都做了分桶处理,且分桶数量相同或者成倍数的时候,可以让相同桶的数据进行Map Join。
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
  • 将大表拆分,对倾斜部分单独处理 。我们可以根据业务中几乎一定会出现数据倾斜的数据拆分出来,将倾斜与未倾斜的部分分别做处理,再将结果合并。

例如,当需要将订单信息表和卖家评级表进行Join,用于获取不同评级的订单量。因为两个表都很大,并且肯定会出现二八法则,即少部分卖家会占有大量买家,而大部分卖家只有很少的买家。在这种情况下,我们可以从订单信息表中将大卖家的评级挑出来放到临时表中,再处理非大卖家的评级,最后做一个union all即可。

部分来源:https://blog.csdn.net/weixin_41812379/article/details/121930949