Archive for 十二月, 2015

问题描述

我们有一个基于Spark的项目,接收用户的输入条件,生成spark job。其中有一段逻辑是df.groupby(cols).agg(…),平时cols都是非空的,还可以正常执行,但有一天,用户选择了一个空的cols,job就hang在那儿了~

假设stage 1对应shuffle stage,stage 2对应后面的result stage,那么stage 1可以顺利跑完,且生成了默认的200个output splits。而stage 2里,199个都可以正常完成,只有一个task,每跑必挂。fail的output:

FetchFailed(BlockManagerId(42, host1, 15656), shuffleId=0, mapId=15, reduceId=169, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to host1

问题分析

从上面的日志,看起来像是stage 2的executor请求shuffle output时挂掉,但这时shuffle server在做什么呢?从spark UI里可以看到该fail的shuffle server在stage 2里也有执行task,并且也fail了!

登录到executor的container里,可以看到其gc日志,young和old都基本使用了100%,达到15GB左右,说明数据量非常大,且都是active的数据,无法被flush掉。但stage 1总共的shuffle output也不过8.9GB。

再细看该executor的stderr log,可以看到大量生成shuffle output请求的日志,累计也差不多8.9GB了!

15/12/15 13:17:30 DEBUG storage.ShuffleBlockFetcherIterator: Creating fetch request of 97451434 at BlockManagerId 

为什么会有如此大的数据倾斜发生呢?

原因定位

看一下spark里agg的实现,最终形成物理执行计划时,对应execution/Aggregate,它对shuffle的影响如下:

 Scala |  copy code |? 
01
  override def requiredChildDistribution: List[Distribution] = {
02
    if (partial) {
03
      UnspecifiedDistribution :: Nil
04
    } else {
05
      if (groupingExpressions == Nil) {
06
        AllTuples :: Nil
07
      } else {
08
        ClusteredDistribution(groupingExpressions) :: Nil
09
      }
10
    }
11
  }

原因不言而明,这时groupingExp为空,于是需要AllTuples的分布,即所有数据在一个分片上!

经验教训

在数据量超过一个节点承受范围的时候,不要直接使用DataFrame.agg,好歹使用DataFrame.groupBy().agg(),且要注意groupBy的cols一定非空!否则只能开多多的资源,让其他executor站旁边看热闹了,造成极大的资源浪费,还会使任务有高概率挂掉。