本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。

实例伪码

hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
     .mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()

SparkUI截图

在最初观察以下UI时,有几个疑问:
  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
  3. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个

直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。

9E59B789-DDA2-495F-9998-4B85648C69C2
Job 6:
008DEE8C-F1E6-4546-9640-55F00EE1D77D
Job 7:
A9EE2B86-463F-4F6C-B296-1692C578CF48

解析全览

以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。

  1. 白色图标代表coding时的API。
  2. 灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
  3. 蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
  4. 黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。
B2FA598C-82C6-4639-9D0A-8944560CE591

在我们调用spark API时,背后发生了什么呢?

这个问题得分开看。

在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。

但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。

但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。

toJavaRDD的效果

调用该方法时,会触发dataframe的解析(全览图标注为第1步):

 Scala |  copy code |? 
1
lazy val rdd: RDD[Row] = {
2
  // use a local variable to make sure the map closure doesn't capture the whole DataFrame
3
  val schema = this.schema
4
  queryExecution.executedPlan.execute().mapPartitions { rows =>
5
    val converter = CatalystTypeConverters.createToScalaConverter(schema)
6
    rows.map(converter(_).asInstanceOf[Row])
7
  }
8
}

上面的queryExecution.executedPlan会触发以下一系列动作(注意,不包含execute()调用),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。

20426B48-26EC-47CE-982A-23408377025A

plan.execute()调用的效果

这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:

 Scala |  copy code |? 
01
/*Partitioner.RangePartitioner */
02
 
03
  def sketch[: ClassTag](
04
      rdd: RDD[K],
05
      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
06
    val shift = rdd.id
07
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
08
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
09
      val seed = byteswap32(idx ^ (shift << 16))
10
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
11
        iter, sampleSizePerPartition, seed)
12
      Iterator((idx, n, sample))
13
    }.collect()
14
    val numItems = sketched.map(_._2.toLong).sum
15
    (numItems, sketched)
16
  }

该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。

该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。

更详细地看下job6和7的RDD编号:

263C92DB-EC5D-4257-88E1-3C7A97AEA12C

  1. #279(含)之前的RDD都是主子job复用的
  2. 子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。
 Scala |  copy code |? 
1
  protected override def doExecute(): RDD[Row] = attachTree(this"sort") {
2
    child.execute().mapPartitions( { iterator =>

由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。

rdd.collect()调用的效果

终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。

问题解答

  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
    1. 因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
    1. stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
  • stage 6:hc.read.parquet(paths).filter(…).select(…)  + groupBy(all fileds).count()的前半段
  • stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
  • stage 8:被跳过,复用了stage6
  • stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
  • stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
  1. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
    1. 解答与上面一样

经验与教训

1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?

这时同样会生成两个job,且都是从hdfs读取数据了~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。

2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。

Leave a Reply