Archive for 十一月, 2015

本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。

实例伪码

hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
     .mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()

SparkUI截图

在最初观察以下UI时,有几个疑问:
  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
  3. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个

直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。

9E59B789-DDA2-495F-9998-4B85648C69C2
Job 6:
008DEE8C-F1E6-4546-9640-55F00EE1D77D
Job 7:
A9EE2B86-463F-4F6C-B296-1692C578CF48

解析全览

以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。

  1. 白色图标代表coding时的API。
  2. 灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
  3. 蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
  4. 黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。
B2FA598C-82C6-4639-9D0A-8944560CE591

在我们调用spark API时,背后发生了什么呢?

这个问题得分开看。

在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。

但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。

但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。

toJavaRDD的效果

调用该方法时,会触发dataframe的解析(全览图标注为第1步):

 Scala |  copy code |? 
1
lazy val rdd: RDD[Row] = {
2
  // use a local variable to make sure the map closure doesn't capture the whole DataFrame
3
  val schema = this.schema
4
  queryExecution.executedPlan.execute().mapPartitions { rows =>
5
    val converter = CatalystTypeConverters.createToScalaConverter(schema)
6
    rows.map(converter(_).asInstanceOf[Row])
7
  }
8
}

上面的queryExecution.executedPlan会触发以下一系列动作(注意,不包含execute()调用),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。

20426B48-26EC-47CE-982A-23408377025A

plan.execute()调用的效果

这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:

 Scala |  copy code |? 
01
/*Partitioner.RangePartitioner */
02
 
03
  def sketch[: ClassTag](
04
      rdd: RDD[K],
05
      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
06
    val shift = rdd.id
07
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
08
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
09
      val seed = byteswap32(idx ^ (shift << 16))
10
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
11
        iter, sampleSizePerPartition, seed)
12
      Iterator((idx, n, sample))
13
    }.collect()
14
    val numItems = sketched.map(_._2.toLong).sum
15
    (numItems, sketched)
16
  }

该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。

该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。

更详细地看下job6和7的RDD编号:

263C92DB-EC5D-4257-88E1-3C7A97AEA12C

  1. #279(含)之前的RDD都是主子job复用的
  2. 子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。
 Scala |  copy code |? 
1
  protected override def doExecute(): RDD[Row] = attachTree(this"sort") {
2
    child.execute().mapPartitions( { iterator =>

由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。

rdd.collect()调用的效果

终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。

问题解答

  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
    1. 因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
    1. stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
  • stage 6:hc.read.parquet(paths).filter(…).select(…)  + groupBy(all fileds).count()的前半段
  • stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
  • stage 8:被跳过,复用了stage6
  • stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
  • stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
  1. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
    1. 解答与上面一样

经验与教训

1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?

这时同样会生成两个job,且都是从hdfs读取数据了~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。

2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。

最近在做一个spark应用程序验证时,发现一个奇怪的事情,抽样处理后再sort的数据居然是无序的!最后由公司内负责spark的同学予以修复,但在追查的过程中我也恰好看了看SparkSQL的处理过程,理解了SparkSQL里DataFrame的解析、partitioner与Distribution之间的关系。

问题描述

仅当如下代码时,才会出现sort无效的情况:

 Python |  copy code |? 
1
df = hc.read.parquet(path).filter("trade1='Automobile'").filter("trade2='Default'")\
2
.filter("pdate='2015−06−03'").select("userid").limit(count).repartition(1024)\
3
.sort("userid")

而如果sort前没有同时出现limit和repartition,sort的结果就是有序的。

排查过程

直觉上如果sort结果完全无序,压根过不了spark社区,不可能发布,所以一定是在某些特定场景下才会出问题,故经过尝试首先确定了以上描述的特定错误场景。

那么它与正确场景有什么区别呢?例如hc.read.parquet().filter().select().sort()?打印它们的执行计划可以看到:

72 Sort [baiduid#87 ASC], true

73  Repartition 1024, true

74   Limit 100

75    Project [baiduid#87]

76     Filter (((trade1#96 = Automobile) && (trade2#97 = Default)) && (pdate#98 = 2015-06-03))

77      PhysicalRDD [baiduid#87,trade1#96,trade2#97,pdate#98], UnionRDD[63] at toString at NativeMethodAccessorImpl.java:-2

 

而正常情况下:

150 Sort [baiduid#267 ASC], true

151  Exchange (RangePartitioning 200)

152   Repartition 1024, true

153    Project [baiduid#267]

154     Filter (((trade1#276 = Automobile) && (trade2#277 = Default)) && (pdate#278 = 2015-06-03))

155      PhysicalRDD [baiduid#267,trade1#276,trade2#277,pdate#278], UnionRDD[203] at toString at NativeMethodAccessorImpl.java:-2

可以看到正常时多了一个Exchange步骤,但我们的代码里肯定没有显式调用它。

修复方式及原因总结

在execution/basicOperators.scala的Repartition添加如下代码:
 Scala |  copy code |? 
1
  override def outputPartitioning = {
2
    if (numPartitions == 1) SinglePartition
3
    else UnknownPartitioning(numPartitions)
4
  }

原因是如果没有override,该值=child.outputPartitioning,在此场景是Limit.outputParititioning = SinglePartition,意味着满足sort分布要求(但实际却是无序的)。

要想修复这个bug,还得对spark的执行过程有所了解,所以我们先来看它。

SparkSQL解析过程分析

我们使用hc.read.parquet()初始化一个dataframe时,此时需关注的是LogicalPlan:

 Scala |  copy code |? 
1
      sqlContext.baseRelationToDataFrame(
2
        new ParquetRelation2(
3
          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))

即此时的logicalPlan = LogicalRelation(ParquetRelation2)

而filter、select、sort等操作,都会将之前的dataframe.logicalPlan作为child,再生成包含新的logicalPlan的新的DataFrame。在我们的问题示例代码里,拥有这些LogicalPlan(按生成的先后顺序):

  1. LogicalRelation(ParquetRelation2) — parquet生成
  2. Filter 3个
  3. Project
  4. Limit
  5. Repartition
  6. Sort  — 最终的LogicalPlan

假定我们此时通过针对Sort的dataframe.collect()触发spark执行,其实是触发了

queryExecution.executedPlan.executeCollect()的执行,这儿是对SQLContext内部类QueryExecution的lazy变量executedPlan的调用,并间接触发了一系列过程(为了描述方便,忽略了细节):

  1. analyzed = analyzer.execute(logical)
  2. optimizedPlan = optimizer.execute(withCachedData)
  3. sparkPlan = planner.plan(optimizedPlan).next()
  4. executedPlan = prepareForExecution.execute(sparkPlan)

(当然,如果调用的不是DataFrame api而是sql的话,还需要parseSQL过程,这里忽略了)

Analyzer过程

它会调用一堆filter chain来处理这些logicalPlan tree,通过以下代码可以看到:

 Bash |  copy code |? 
01
/* Analyzer.scala */
02
  lazy val batches: Seq[Batch] = Seq(
03
    Batch("Substitution", fixedPoint,
04
      CTESubstitution ::
05
      WindowsSubstitution ::
06
      Nil : _*),
07
    Batch("Resolution", fixedPoint,
08
      ResolveRelations ::
09
      ResolveReferences ::
10
      ResolveGroupingAnalytics ::
11
      ResolveSortReferences ::
12
      ResolveGenerate ::
13
      ResolveFunctions ::
14
      ExtractWindowExpressions ::
15
      GlobalAggregates ::
16
      UnresolvedHavingClauseAttributes ::
17
      TrimGroupingAliases ::
18
      typeCoercionRules ++
19
      extendedResolutionRules : _*)
20
  )
21
 
22
/* HiveTypeCoercion */
23
  val typeCoercionRules =
24
    PropagateTypes ::
25
    ConvertNaNs ::
26
    InConversion ::
27
    WidenTypes ::
28
    PromoteStrings ::
29
    DecimalPrecision ::
30
    BooleanComparisons ::
31
    BooleanCasts ::
32
    StringToIntegralCasts ::
33
    FunctionArgumentConversion ::
34
    CaseWhenCoercion ::
35
    Division ::
36
    PropagateTypes ::
37
    ExpectedInputConversion ::
38
    Nil
39
 
40
/* SQLContext.scala */
41
    new Analyzer(catalog, functionRegistry, conf) {
42
      override val extendedResolutionRules =
43
        ExtractPythonUdfs ::
44
        sources.PreInsertCastAndRename ::
45
        Nil
46
 
47
      override val extendedCheckRules = Seq(
48
        sources.PreWriteCheck(catalog)
49
      )
50
    }

 

该过程主要将各个table filed绑定到真正的table结构上去,也会发现field错误、类型错误、类型适配等。但整体而言,它只是完成分析过程,不会改变执行计划。

Optimizer过程

故名思议,这一阶段会进行优化,如果sql或api调用写的太烂,这一阶段完成,可能会跟原始语句有较大变化。但本质上,它只是把本地可以得出结论的语句优化掉(例如针对非nullable的字段判断is null,那肯定是true的),或者把可合并的合并了(例如多个filter、select等),也会稍微调整filter、union等的顺序。

 Scala |  copy code |? 
01
  val batches =
02
    // SubQueries are only needed for analysis and can be removed before execution.
03
    Batch("Remove SubQueries", FixedPoint(100),
04
      EliminateSubQueries) ::
05
    Batch("Operator Reordering", FixedPoint(100),
06
      UnionPushdown,
07
      CombineFilters,
08
      PushPredicateThroughProject,
09
      PushPredicateThroughJoin,
10
      PushPredicateThroughGenerate,
11
      ColumnPruning,
12
      ProjectCollapsing,
13
      CombineLimits) ::
14
    Batch("ConstantFolding", FixedPoint(100),
15
      NullPropagation,
16
      OptimizeIn,
17
      ConstantFolding,
18
      LikeSimplification,
19
      BooleanSimplification,
20
      SimplifyFilters,
21
      SimplifyCasts,
22
      SimplifyCaseConversionExpressions) ::
23
    Batch("Decimal Optimizations", FixedPoint(100),
24
      DecimalAggregates) ::
25
    Batch("LocalRelation", FixedPoint(100),
26
      ConvertToLocalRelation) :: Nil

prepareForExcution阶段

它只做了一件很关键的事情:

 Scala |  copy code |? 
1
    val batches =
2
      Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil

在这个EnsureRequirements类里,会检查children们的partitioner是否匹配要求的Distribution!我们的错误示例中,就是因为Repartition错误的设置了自己的outputPartition才出的问题。继续往下看。

Partitioning与Distribution的关系

EnsureRequirements类的作用:

 * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
 * of input data meets the
 * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
 * each operator by inserting [[Exchange]] Operators where required.  Also ensure that the
 * required input partition ordering requirements are met.
确保paritition的input data满足Distribution的要求,以及排序要求。
先来看下什么是Distribution要求吧:
  • Distribution 基类
 * Specifies how tuples that share common expressions will be distributed when a query is executed
描述了当一个query在一组服务器并行执行后,拥有相同expressions的tuples会如何分布。
 * in parallel on many machines.  Distribution can be used to refer to two distinct physical
 * properties:
可被用于两种物理属性:
 *  – Inter-node partitioning of data: In this case the distribution describes how tuples are
 *    partitioned across physical machines in a cluster.  Knowing this property allows some
 *    operators (e.g., Aggregate) to perform partition local operations instead of global ones.
节点内的分片:在这种情况下,描述了tuples在物理集群里是如何分片的。了解这个属性后,一些算子(例如aggregate)就可以在分片里执行操作,而非集群规模。
 *  – Intra-partition ordering of data: In this case the distribution describes guarantees made
 *    about how tuples are distributed within a single partition.
节点间的分片:在这种情况下,描述了tuples在一个分片内,是如何分布的
  • UnspecifiedDistribution 直接继承Distribution,代表没有顺序和分布的保证
  • AllTuples 代表只有一个分片
  • ClusteredDistribution 代表相同“key”会放到同一个分片,或是连续分布在一个仅有的partition
  • OrderedDistribution 代表相同“key”会在同一分片,且是连续分布的
再看下什么是Partition:
  • Partitioning 基类
  • UnknownPartitioning
  • SinglePartition
  • BroadcastPartitioning
  • HashPartitioning
  • RangePartitioning
从partitioning.scala里可以看到不同的partition方式都满足那些Distribution要求:
  • UnknownPartitioning.satisfies(UnspecifiedDistribution) == true
  • SinglePartition.satisfies(ALL) == true
  • BroadcastPartitioning.satisfies(ALL) == true
  • HashPartitioning.satisfies(UnspecifiedDistribution) == true
  • HashPartitioning.satisfies(ClusteredDistribution) 在hash的expression是required的子集时 == true
  • RangePartitioning.satisfies(UnspecifiedDistribution) == true
  • RangePartitioning.satisfies(OrderedDistribution) 在排序fields 前者包含后者,或后者包含前者时,且顺序一致时 == true
  • RangePartitioning.satisfies(ClusteredDistribution) 在Range的expression是required的子集时 == true
  • 其他都是false

针对每个child,会计算是否满足distribution要求:val valid = child.outputPartitioning.satisfies(required)

并且打印如下log:(current是child.outputPartitioning)

15/11/12 15:10:03 DEBUG EnsureRequirements: Valid distribution,required: OrderedDistribution(List(userid#0 ASC)) current: SinglePartition

从上面可以看到,只有SinglePartition和RangePartitioning才满足已排序分布的要求,而我们的错误示例里,居然已经是SinglePartition了!

在org.apache.spark.sql.execution.Limit里,可以看到其override def outputPartitioning: Partitioning = SinglePartition,即只要limit操作,就会变成一个分片,这也make sense。

但org.apache.spark.sql.execution.Repartition里,由于没有override这个method,所以继承的是父类的值:override def outputPartitioning: Partitioning = child.outputPartitioning!而它的child是Limit……,所以repartition完了,该值也是SinglePartition了,满足sort排序的要求……(但实际是不满足的)

将Repartition的outputpart修改为UnknownPartitioning就可以了。这时由于不满足分布要求,EnsureRequirement会插入Exchange执行计划。

Exchange类

* Performs a shuffle that will result in the desired `newPartitioning`.  Optionally sorts each

* resulting partition based on expressions from the partition key.  It is invalid to construct an

* exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.

该类会通过shuffle获得新的分片。可选的按照partitionKey对结果分片进行排序。使用不可作用于partitionKey的newOrdering,来构造一个exchange operator是不合法的。

 

该类的doExecute()方法,分别针对HashPartitioning、RangePartitioning、SinglePartition进行处理。在这些case的必要情况下,会提前把待shuffle的对象作copy,再发送给externsorter。

 

 

* Determines whether records must be defensively copied before being sent to the shuffle.

决定在进行shuffle前是否需要保守地复制。

* Several of Spark’s shuffle components will buffer deserialized Java objects in memory. The

一些spark的shuffle组件会在内存中,缓存一定的反序列化后的java对象。

* shuffle code assumes that objects are immutable and hence does not perform its own defensive

这些shuffle代码假设对象是不可变的,所以不自行进行保守地复制。

* copying. In Spark SQL, however, operators’ iterators return the same mutable `Row` object. In

但是在sparksql里,算子的迭代器返回了可变的Row对象。

* order to properly shuffle the output of these operators, we need to perform our own copying

为了正确排序这些sparksql算子的output,我们需要先把数据copy出来。

* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it

这些copy是高代价的,所以要尽量避免它。

* whenever possible. This method encapsulates the logic for choosing when to copy.

这个方法封装了是否需要copy的逻辑。

*

* In the long run, we might want to push this logic into core’s shuffle APIs so that we don’t

* have to rely on knowledge of core internals here in SQL.

在未来,这部分逻辑会移入核心shuffle API。

*

* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.