最近在做一个spark应用程序验证时,发现一个奇怪的事情,抽样处理后再sort的数据居然是无序的!最后由公司内负责spark的同学予以修复,但在追查的过程中我也恰好看了看SparkSQL的处理过程,理解了SparkSQL里DataFrame的解析、partitioner与Distribution之间的关系。

问题描述

仅当如下代码时,才会出现sort无效的情况:

 Python |  copy code |? 
1
df = hc.read.parquet(path).filter("trade1='Automobile'").filter("trade2='Default'")\
2
.filter("pdate='2015−06−03'").select("userid").limit(count).repartition(1024)\
3
.sort("userid")

而如果sort前没有同时出现limit和repartition,sort的结果就是有序的。

排查过程

直觉上如果sort结果完全无序,压根过不了spark社区,不可能发布,所以一定是在某些特定场景下才会出问题,故经过尝试首先确定了以上描述的特定错误场景。

那么它与正确场景有什么区别呢?例如hc.read.parquet().filter().select().sort()?打印它们的执行计划可以看到:

72 Sort [baiduid#87 ASC], true

73  Repartition 1024, true

74   Limit 100

75    Project [baiduid#87]

76     Filter (((trade1#96 = Automobile) && (trade2#97 = Default)) && (pdate#98 = 2015-06-03))

77      PhysicalRDD [baiduid#87,trade1#96,trade2#97,pdate#98], UnionRDD[63] at toString at NativeMethodAccessorImpl.java:-2

 

而正常情况下:

150 Sort [baiduid#267 ASC], true

151  Exchange (RangePartitioning 200)

152   Repartition 1024, true

153    Project [baiduid#267]

154     Filter (((trade1#276 = Automobile) && (trade2#277 = Default)) && (pdate#278 = 2015-06-03))

155      PhysicalRDD [baiduid#267,trade1#276,trade2#277,pdate#278], UnionRDD[203] at toString at NativeMethodAccessorImpl.java:-2

可以看到正常时多了一个Exchange步骤,但我们的代码里肯定没有显式调用它。

修复方式及原因总结

在execution/basicOperators.scala的Repartition添加如下代码:
 Scala |  copy code |? 
1
  override def outputPartitioning = {
2
    if (numPartitions == 1) SinglePartition
3
    else UnknownPartitioning(numPartitions)
4
  }

原因是如果没有override,该值=child.outputPartitioning,在此场景是Limit.outputParititioning = SinglePartition,意味着满足sort分布要求(但实际却是无序的)。

要想修复这个bug,还得对spark的执行过程有所了解,所以我们先来看它。

SparkSQL解析过程分析

我们使用hc.read.parquet()初始化一个dataframe时,此时需关注的是LogicalPlan:

 Scala |  copy code |? 
1
      sqlContext.baseRelationToDataFrame(
2
        new ParquetRelation2(
3
          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))

即此时的logicalPlan = LogicalRelation(ParquetRelation2)

而filter、select、sort等操作,都会将之前的dataframe.logicalPlan作为child,再生成包含新的logicalPlan的新的DataFrame。在我们的问题示例代码里,拥有这些LogicalPlan(按生成的先后顺序):

  1. LogicalRelation(ParquetRelation2) — parquet生成
  2. Filter 3个
  3. Project
  4. Limit
  5. Repartition
  6. Sort  — 最终的LogicalPlan

假定我们此时通过针对Sort的dataframe.collect()触发spark执行,其实是触发了

queryExecution.executedPlan.executeCollect()的执行,这儿是对SQLContext内部类QueryExecution的lazy变量executedPlan的调用,并间接触发了一系列过程(为了描述方便,忽略了细节):

  1. analyzed = analyzer.execute(logical)
  2. optimizedPlan = optimizer.execute(withCachedData)
  3. sparkPlan = planner.plan(optimizedPlan).next()
  4. executedPlan = prepareForExecution.execute(sparkPlan)

(当然,如果调用的不是DataFrame api而是sql的话,还需要parseSQL过程,这里忽略了)

Analyzer过程

它会调用一堆filter chain来处理这些logicalPlan tree,通过以下代码可以看到:

 Bash |  copy code |? 
01
/* Analyzer.scala */
02
  lazy val batches: Seq[Batch] = Seq(
03
    Batch("Substitution", fixedPoint,
04
      CTESubstitution ::
05
      WindowsSubstitution ::
06
      Nil : _*),
07
    Batch("Resolution", fixedPoint,
08
      ResolveRelations ::
09
      ResolveReferences ::
10
      ResolveGroupingAnalytics ::
11
      ResolveSortReferences ::
12
      ResolveGenerate ::
13
      ResolveFunctions ::
14
      ExtractWindowExpressions ::
15
      GlobalAggregates ::
16
      UnresolvedHavingClauseAttributes ::
17
      TrimGroupingAliases ::
18
      typeCoercionRules ++
19
      extendedResolutionRules : _*)
20
  )
21
 
22
/* HiveTypeCoercion */
23
  val typeCoercionRules =
24
    PropagateTypes ::
25
    ConvertNaNs ::
26
    InConversion ::
27
    WidenTypes ::
28
    PromoteStrings ::
29
    DecimalPrecision ::
30
    BooleanComparisons ::
31
    BooleanCasts ::
32
    StringToIntegralCasts ::
33
    FunctionArgumentConversion ::
34
    CaseWhenCoercion ::
35
    Division ::
36
    PropagateTypes ::
37
    ExpectedInputConversion ::
38
    Nil
39
 
40
/* SQLContext.scala */
41
    new Analyzer(catalog, functionRegistry, conf) {
42
      override val extendedResolutionRules =
43
        ExtractPythonUdfs ::
44
        sources.PreInsertCastAndRename ::
45
        Nil
46
 
47
      override val extendedCheckRules = Seq(
48
        sources.PreWriteCheck(catalog)
49
      )
50
    }

 

该过程主要将各个table filed绑定到真正的table结构上去,也会发现field错误、类型错误、类型适配等。但整体而言,它只是完成分析过程,不会改变执行计划。

Optimizer过程

故名思议,这一阶段会进行优化,如果sql或api调用写的太烂,这一阶段完成,可能会跟原始语句有较大变化。但本质上,它只是把本地可以得出结论的语句优化掉(例如针对非nullable的字段判断is null,那肯定是true的),或者把可合并的合并了(例如多个filter、select等),也会稍微调整filter、union等的顺序。

 Scala |  copy code |? 
01
  val batches =
02
    // SubQueries are only needed for analysis and can be removed before execution.
03
    Batch("Remove SubQueries", FixedPoint(100),
04
      EliminateSubQueries) ::
05
    Batch("Operator Reordering", FixedPoint(100),
06
      UnionPushdown,
07
      CombineFilters,
08
      PushPredicateThroughProject,
09
      PushPredicateThroughJoin,
10
      PushPredicateThroughGenerate,
11
      ColumnPruning,
12
      ProjectCollapsing,
13
      CombineLimits) ::
14
    Batch("ConstantFolding", FixedPoint(100),
15
      NullPropagation,
16
      OptimizeIn,
17
      ConstantFolding,
18
      LikeSimplification,
19
      BooleanSimplification,
20
      SimplifyFilters,
21
      SimplifyCasts,
22
      SimplifyCaseConversionExpressions) ::
23
    Batch("Decimal Optimizations", FixedPoint(100),
24
      DecimalAggregates) ::
25
    Batch("LocalRelation", FixedPoint(100),
26
      ConvertToLocalRelation) :: Nil

prepareForExcution阶段

它只做了一件很关键的事情:

 Scala |  copy code |? 
1
    val batches =
2
      Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil

在这个EnsureRequirements类里,会检查children们的partitioner是否匹配要求的Distribution!我们的错误示例中,就是因为Repartition错误的设置了自己的outputPartition才出的问题。继续往下看。

Partitioning与Distribution的关系

EnsureRequirements类的作用:

 * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
 * of input data meets the
 * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
 * each operator by inserting [[Exchange]] Operators where required.  Also ensure that the
 * required input partition ordering requirements are met.
确保paritition的input data满足Distribution的要求,以及排序要求。
先来看下什么是Distribution要求吧:
  • Distribution 基类
 * Specifies how tuples that share common expressions will be distributed when a query is executed
描述了当一个query在一组服务器并行执行后,拥有相同expressions的tuples会如何分布。
 * in parallel on many machines.  Distribution can be used to refer to two distinct physical
 * properties:
可被用于两种物理属性:
 *  – Inter-node partitioning of data: In this case the distribution describes how tuples are
 *    partitioned across physical machines in a cluster.  Knowing this property allows some
 *    operators (e.g., Aggregate) to perform partition local operations instead of global ones.
节点内的分片:在这种情况下,描述了tuples在物理集群里是如何分片的。了解这个属性后,一些算子(例如aggregate)就可以在分片里执行操作,而非集群规模。
 *  – Intra-partition ordering of data: In this case the distribution describes guarantees made
 *    about how tuples are distributed within a single partition.
节点间的分片:在这种情况下,描述了tuples在一个分片内,是如何分布的
  • UnspecifiedDistribution 直接继承Distribution,代表没有顺序和分布的保证
  • AllTuples 代表只有一个分片
  • ClusteredDistribution 代表相同“key”会放到同一个分片,或是连续分布在一个仅有的partition
  • OrderedDistribution 代表相同“key”会在同一分片,且是连续分布的
再看下什么是Partition:
  • Partitioning 基类
  • UnknownPartitioning
  • SinglePartition
  • BroadcastPartitioning
  • HashPartitioning
  • RangePartitioning
从partitioning.scala里可以看到不同的partition方式都满足那些Distribution要求:
  • UnknownPartitioning.satisfies(UnspecifiedDistribution) == true
  • SinglePartition.satisfies(ALL) == true
  • BroadcastPartitioning.satisfies(ALL) == true
  • HashPartitioning.satisfies(UnspecifiedDistribution) == true
  • HashPartitioning.satisfies(ClusteredDistribution) 在hash的expression是required的子集时 == true
  • RangePartitioning.satisfies(UnspecifiedDistribution) == true
  • RangePartitioning.satisfies(OrderedDistribution) 在排序fields 前者包含后者,或后者包含前者时,且顺序一致时 == true
  • RangePartitioning.satisfies(ClusteredDistribution) 在Range的expression是required的子集时 == true
  • 其他都是false

针对每个child,会计算是否满足distribution要求:val valid = child.outputPartitioning.satisfies(required)

并且打印如下log:(current是child.outputPartitioning)

15/11/12 15:10:03 DEBUG EnsureRequirements: Valid distribution,required: OrderedDistribution(List(userid#0 ASC)) current: SinglePartition

从上面可以看到,只有SinglePartition和RangePartitioning才满足已排序分布的要求,而我们的错误示例里,居然已经是SinglePartition了!

在org.apache.spark.sql.execution.Limit里,可以看到其override def outputPartitioning: Partitioning = SinglePartition,即只要limit操作,就会变成一个分片,这也make sense。

但org.apache.spark.sql.execution.Repartition里,由于没有override这个method,所以继承的是父类的值:override def outputPartitioning: Partitioning = child.outputPartitioning!而它的child是Limit……,所以repartition完了,该值也是SinglePartition了,满足sort排序的要求……(但实际是不满足的)

将Repartition的outputpart修改为UnknownPartitioning就可以了。这时由于不满足分布要求,EnsureRequirement会插入Exchange执行计划。

Exchange类

* Performs a shuffle that will result in the desired `newPartitioning`.  Optionally sorts each

* resulting partition based on expressions from the partition key.  It is invalid to construct an

* exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.

该类会通过shuffle获得新的分片。可选的按照partitionKey对结果分片进行排序。使用不可作用于partitionKey的newOrdering,来构造一个exchange operator是不合法的。

 

该类的doExecute()方法,分别针对HashPartitioning、RangePartitioning、SinglePartition进行处理。在这些case的必要情况下,会提前把待shuffle的对象作copy,再发送给externsorter。

 

 

* Determines whether records must be defensively copied before being sent to the shuffle.

决定在进行shuffle前是否需要保守地复制。

* Several of Spark’s shuffle components will buffer deserialized Java objects in memory. The

一些spark的shuffle组件会在内存中,缓存一定的反序列化后的java对象。

* shuffle code assumes that objects are immutable and hence does not perform its own defensive

这些shuffle代码假设对象是不可变的,所以不自行进行保守地复制。

* copying. In Spark SQL, however, operators’ iterators return the same mutable `Row` object. In

但是在sparksql里,算子的迭代器返回了可变的Row对象。

* order to properly shuffle the output of these operators, we need to perform our own copying

为了正确排序这些sparksql算子的output,我们需要先把数据copy出来。

* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it

这些copy是高代价的,所以要尽量避免它。

* whenever possible. This method encapsulates the logic for choosing when to copy.

这个方法封装了是否需要copy的逻辑。

*

* In the long run, we might want to push this logic into core’s shuffle APIs so that we don’t

* have to rely on knowledge of core internals here in SQL.

在未来,这部分逻辑会移入核心shuffle API。

*

* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.

 

One Comment

  1. YC says:

    “请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?

    这时同样会生成两个job,且都是从hdfs读取数据了~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。”

    大神,我想问一下,为什么没有shuffle dependency就不会复用???
    还有RDD的只读性,底层实现是不是每个新rdd都是在新的内存空间上new出来的?如果是的,应该是可以复用的吧~

Leave a Reply