问题描述
我们有一个基于Spark的项目,接收用户的输入条件,生成spark job。其中有一段逻辑是df.groupby(cols).agg(…),平时cols都是非空的,还可以正常执行,但有一天,用户选择了一个空的cols,job就hang在那儿了~
假设stage 1对应shuffle stage,stage 2对应后面的result stage,那么stage 1可以顺利跑完,且生成了默认的200个output splits。而stage 2里,199个都可以正常完成,只有一个task,每跑必挂。fail的output:
FetchFailed(BlockManagerId(42, host1, 15656), shuffleId=0, mapId=15, reduceId=169, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to host1
问题分析
从上面的日志,看起来像是stage 2的executor请求shuffle output时挂掉,但这时shuffle server在做什么呢?从spark UI里可以看到该fail的shuffle server在stage 2里也有执行task,并且也fail了!
登录到executor的container里,可以看到其gc日志,young和old都基本使用了100%,达到15GB左右,说明数据量非常大,且都是active的数据,无法被flush掉。但stage 1总共的shuffle output也不过8.9GB。
再细看该executor的stderr log,可以看到大量生成shuffle output请求的日志,累计也差不多8.9GB了!
15/12/15 13:17:30 DEBUG storage.ShuffleBlockFetcherIterator: Creating fetch request of 97451434 at BlockManagerId
为什么会有如此大的数据倾斜发生呢?
原因定位
看一下spark里agg的实现,最终形成物理执行计划时,对应execution/Aggregate,它对shuffle的影响如下:
Scala | | copy code | | ? |
01 | override def requiredChildDistribution: List[Distribution] = { |
02 | if (partial) { |
03 | UnspecifiedDistribution :: Nil |
04 | } else { |
05 | if (groupingExpressions == Nil) { |
06 | AllTuples :: Nil |
07 | } else { |
08 | ClusteredDistribution(groupingExpressions) :: Nil |
09 | } |
10 | } |
11 | } |
原因不言而明,这时groupingExp为空,于是需要AllTuples的分布,即所有数据在一个分片上!
经验教训
在数据量超过一个节点承受范围的时候,不要直接使用DataFrame.agg,好歹使用DataFrame.groupBy().agg(),且要注意groupBy的cols一定非空!否则只能开多多的资源,让其他executor站旁边看热闹了,造成极大的资源浪费,还会使任务有高概率挂掉。