Archive for the ‘spark’ Category

本文尝试用spark自带example streaming.NetworkWordCount为例,解释spark streaming 1.6.0的执行流程。

例子代码如下:

 Scala |  copy code |? 
01
    // Create the context with a 1 second batch size
02
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
03
    val ssc = new StreamingContext(sparkConf, Minutes(1))
04
 
05
    // Create a socket stream on target ip:port and count the
06
    // words in input stream of \n delimited text (eg. generated by 'nc')
07
    // Note that no duplication in storage level only for running locally.
08
    // Replication necessary in distributed scenario for fault tolerance.
09
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
10
    val words = lines.flatMap(_.split(" "))
11
    val wordCounts = words.map(=> (x, 1)).reduceByKey(_ + _)
12
    wordCounts.print()
13
    ssc.start()
14
    ssc.awaitTermination()

Streaming的核心是DStream类,即discretized streams。以上的socketTextStream、flatMap、map、reduceByKey、print方法分别生成了5个子类实例:SocketInputDStream<-FlatMappedDStream<-MappedDStream<-ShuffledDStream<-ForEachDStream,前面是后面的parent,可以类比RDD的依赖关系。另外,首尾分别是input和output stream。

但截至到wordCounts.print(),spark集群并没有任何实质性的操作呢,直到ssc.start(),它会触发一系列的初始化,其中包括input stream的数据输入,并使用类似RDD的方式,以outputStream为入口依次触发parent的执行。将分如下几个部分详细说明:

  1. start触发的初始化过程,即input stream任务如何提交至executors?
  2. input stream blocks的产生过程,即数据是如何被读入和缓存的?
  3. input stream如何触发后续的transform、output操作的?

Start触发的初始化过程

SparkStreaming

 

当运行在集群模式下时,driver仅仅初始化job,input的真正读取者是worker节点。如上所示,StreamingContext.start()启动streaming-start子线程,触发JobScheduler.start(),进而触发两个关键实例的start方法:ReceiverTracker.start()和JobGenerator.start()。前者会生成并异步提交input Receiver job,后者以定时器监听duration超时、生成的后续处理任务。先仅关注前者。

核心方法是ReceiverTrackerEndpoint.startReceiver(receiver, scheduledLocations)方法,它调用ssc.sparkContext.submitJob提交Job,传入的RDD是仅包含receiver信息的单元素RDD,processPartition回调方法如下,在worker端取出receiver信息,并初始化ReceiverSupervisorImpl对象,开始读取数据。

 Scala |  copy code |? 
01
      // Function to start the receiver on the worker node
02
      val startReceiverFunc: Iterator[Receiver[_]] =&gt; Unit =
03
        (iterator: Iterator[Receiver[_]]) =&gt; {
04
          if (!iterator.hasNext) {
05
            throw new SparkException(
06
              "Could not start receiver as object not found.")
07
          }
08
          // flykobe: worker拿到RDD,里面仅包含一个元素,就是receiver
09
          // 包装并且启动数据接收
10
          if (TaskContext.get().attemptNumber() == 0) {
11
            val receiver = iterator.next()
12
            assert(iterator.hasNext == false)
13
            val supervisor = new ReceiverSupervisorImpl(
14
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
15
            supervisor.start()
16
            supervisor.awaitTermination()
17
          } else {
18
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
19
          }
20
        }

注意,一个app可以有多个input streams,所以可以在该阶段触发生成多个Jobs(如何调度这些Jobs,就是后面JobScheduler等的事情了)。

Input stream blocks的产生过程

Spark Streaming的处理模型是小批量、伪实时,所以需要对读入的数据进行缓存,每隔一段时间生成一个blocks,再每隔duration时间,由这些blocks组成RDDs。

SparkStreaming-1

 

有两种方式blocks,一种是Receiver单条写入,框架处理底层细节,例如SocketInputDStream、KafkaInputDStream;另一种是Receiver直接形成block,自行控制缓存等。

以SocketInputDStream为例,它的receiver阻塞读取socket数据,每接收到一行,就调用Receiver.store()方法进行存储,进而触发上图左边的逻辑。

首先将数据暂存在BlockGenerator.currentBuffer里,BlockGenerator定时器子线程每隔spark.streaming.blockInterval时间,就调用updateCurrentBuffer()将buffer里的数据形成block,并put进blocksForPushing队列。以上可视为queue的生产过程。

BlockGenerator还有一个blockPushingThread子线程,使用keepPushingBlocks()方法阻塞监听该queue,一旦有block产生,就调用ReceiverSupervisorImpl.pushAndReportBlock()方法,按照storage level交由BlockManager.doPut存储在内存 and/or 磁盘上,在此期间可能会写WAL日志。随后通过AKKA向master的ReceiverTracker发送AddBlock消息,由ReceivedBlockTracker.addBlock()方法建立block与input streaming的关联关系。

这里需要注意的是,block的产生间隔是由spark.streaming.blockInterval控制的,默认是200ms,而非duration参数。

完成以上步骤后,worker节点里保存了blocks的数据,而master节点存储了blocks列表。

Input stream如何触发后续操作

示例里没有window操作,处理间隔仅有duration控制,相对简单。

SparkStreaming-2

 

前面提到JobGenerator的定时器线程,它每隔ssc.graph.batchDuration.milliseconds执行一次,简单生成GenerateJobs消息,由JobGenerator EventLoop子线程处理,触发JobGenerator.generateJobs()。

它首先调用ReceivedBlockTracker.allocateBlocksToBatch()方法,按需写入master端的WAL日志,并组装起(time => (streamID=>blocks, …) ),保存到实例属性timeToAllocatedBlocks HashMap里。

随后进入关键步骤,由于一个app可以有多个output streams,故调用DStreamGraph.generateJobs()依次触发。以这儿的ForEachDStream为例,它的generateJob方法触发生成parent RDD,继续触发parent.compute()方法,最终导致SocketInputDStream按需将duration里的blocks形成RDDs:

 Scala |  copy code |? 
01
 /**
02
   * Generates RDDs with blocks received by the receiver of this stream. */
03
  override def compute(validTime: Time): Option[RDD[T]] = {
04
    val blockRDD = {
05
 
06
      if (validTime &lt; graph.startTime) {
07
        // If this is called for any time before the start time of the context,
08
        // then this returns an empty RDD. This may happen when recovering from a
09
        // driver failure without any write ahead log to recover pre−failure data.
10
        new BlockRDD[T](ssc.sc, Array.empty)
11
      } else {
12
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
13
        // for this batch
14
        val receiverTracker = ssc.scheduler.receiverTracker
15
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
16
 
17
        // Register the input blocks information into InputInfoTracker
18
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
19
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
20
 
21
        // Create the BlockRDD
22
        createBlockRDD(validTime, blockInfos)
23
      }
24
    }
25
    Some(blockRDD)
26
  }

截止到这儿,已经把blocks关联到RDD了。随后调用JobScheduler.submitJobSet()异步提交Job即可。

以上大体描述了spark streaming的job触发过程,但如何对input replication、怎么触发checkpoint、WAL日志如何使用等尚未涉及。

上一篇文章从pushdown filter的角度分析了spark与parquet的结合方式,由于其中涉及不少类和数据结构,补充如下。而Dremel论文里已详细解释了repetitionLevel、definitionLevel和value SerDe协议,不再赘述。下面主要从代码实现角度。

ParquetRecordPushDownFilter

在parquet的加载过程中,维持两颗树,一颗是与requested schema对应的fields tree,另一颗是与filters对应的predicate tree。其加载过程就是不断从解压后的二级制流里获取数据,将其对应到上面两颗树的过程。

predicate tree比较简单,如上图中predicate示例,在逻辑上由filterPredicate树和其叶子节点组成的valueInspector数组构成。每读入一个primitive value(可以看到pushdown filter不支持array、map、struct操作),就获取其对应的inspectors,依次计算boolean值存储起来,如下图所示。

在一个record读取完成后,由FilteringRecordMaterializer.getCurrentRecord()触发,递归计算predicate tree的最终结果,若未通过则抛弃缓存中的record值。

在叶子节点的读取过程中,同时也会更新fields tree对应的数据缓存,如下图,针对每个record维持了一个SpecificMutableRow 。这时有两种叶子节点,一种tree level = 1是root下直接的field,如下图中的short节点;另一种tree level > 1,是nested的子节点,如下图array里下的primitive值。对于前者直接在SpecificMutableRow下有对应槽位,而后者又有一个临时的ArrayBuf缓存,仅当该field递归读完后,由end()触发,才会set回SpecificMutableRow的槽位里。

读取完record并且通过filters之后,由CatalystRecordMaterializer.getCurrentRecord()触发将缓存中的数据,装配为spark所需要的row对象,这里涉及codegen过程,虽然有code缓存,但应该仍然是挺耗时的操作。

ParquetRecordPushDownFilter-datastruct

Pushdown在传统关系型数据库里,是一种常用的查询优化手段,在分布式计算领域,由于数据跨节点传输的代价更大,故pushdown也变得尤为重要。可能由于该手段极为常见,反而找不到对其的准确定义。如果将计算过程视为一颗树,则可以简单理解为尽量将计算下沉至叶子节点,尽早过滤掉不需要的数据,从而减少上层计算的数据量。

本文尝试解读基于Parquet存储的SparkSQL pushdown实现,spark版本1.6.0,parquet版本1.7.0。非分布式计算科班出身,更多是个人思考,谬误处请指正。

面对一个查询请求,如何实现pushdown优化呢?可粗略分为两个阶段,首先找出可以pushdown的操作组合,随后在数据加载阶段予以执行。前者属于执行计划范畴,后者可纳入底层数据结构的实现。

Pushdown之执行计划

SparkSQL的输入可以是DataFrame的算子组合,也可以是sql语句,本质上其解析过程是一样的,即语法、语义的解析(Parser),优化(Optimizer),生成逻辑和物理执行计划等,串联过程在sparkexecution.QueryExecution类里。

考虑df.select(a, b, c).filter(by a).filter(by b).select(c).filter(by c)这样的查询,在optimizer阶段,需要合并多个filters(CombineFilters),并调整算子间的顺序,例如将部分filter移到select等前面(PushPredicateThroughAggregate/Generate/Join/Project)。

以较为简单的PushPredicateThroughProjection为例
优化前,sparkPlan chain是:Filter(condition, … ) ->Projection(fields, …) -> … (注:右侧plan更早,是左侧的child)。
实现为:
  1. 将projection的fields转化为map(attribute => child), e.g., ‘SELECT a + b AS c, d …’ produces Map(c -> a + b).
  2. 将filter的condition分为deterministic和nondeterministic
    1. 如果nondeterministic为空,代表所有filter condition都是确定的,可以全部pushdown,转化后为:Projection(fields, …) -> Filter(将condition里出现的alias转化为实际child, …) -> …
    2. 如果deterministic为空,代表所有都无法pushdown,直接返回,chain不变
    3. 如果两者都非空,则仅pushdown deterministic condition,转化后为:Filter(nondeterministic, … ) -> Project(fields, …) -> Filter(pushedCondition, …) -> …

对于更为复杂的查询,可参看下面带pushdown字眼的rule实现。但总而言之,在optimizer完成后,可能会产生与输入不一样的filters(可能是多个,不一定连续),并且尽量前置

 Scala |  copy code |? 
01
object DefaultOptimizer extends Optimizer {
02
  val batches =
03
    // SubQueries are only needed for analysis and can be removed before execution.
04
    Batch("Remove SubQueries", FixedPoint(100),
05
      EliminateSubQueries) ::
06
    Batch("Aggregate", FixedPoint(100),
07
      ReplaceDistinctWithAggregate,
08
      RemoveLiteralFromGroupExpressions) ::
09
    Batch("Operator Optimizations", FixedPoint(100),
10
      // Operator push down
11
      SetOperationPushDown,
12
      SamplePushDown,
13
      PushPredicateThroughJoin,
14
      PushPredicateThroughProject,
15
      PushPredicateThroughGenerate,
16
      PushPredicateThroughAggregate,
17
      ColumnPruning,
18
      // Operator combine
19
      ProjectCollapsing,
20
      CombineFilters,
21
      CombineLimits,
22
      // Constant folding
23
      NullPropagation,
24
      OptimizeIn,
25
      ConstantFolding,
26
      LikeSimplification,
27
      BooleanSimplification,
28
      RemoveDispensableExpressions,
29
      SimplifyFilters,
30
      SimplifyCasts,
31
      SimplifyCaseConversionExpressions) ::
32
    Batch("Decimal Optimizations", FixedPoint(100),
33
      DecimalAggregates) ::
34
    Batch("LocalRelation", FixedPoint(100),
35
      ConvertToLocalRelation) :: Nil
36
}

由于pushdown最终需要在数据源上执行,所以在SparkPlanner的rule DataSourceStrategy里需要找到plan chain中所有可以deterministic的filters,并传递给Parquet使用,如下代码所示。

需要说明的是PhysicalOperation,它利用scala的unapply解包方法,调用collectProjectsAndFilters()将plan chain解析为(Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression])四元组,这里我们主要关注第二个元素Seq[Expression],它是所有filters的sequence。

从代码可以见,由于涉及partition keys的filters可以通过忽略不相关分片更高效的过滤,所以pushedFilters仅包括不涉及partition keys的filters。

 Scala |  copy code |? 
01
02
    // Scanning partitioned HadoopFsRelation
03
    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
04
        if t.partitionSpec.partitionColumns.nonEmpty =&amp;amp;gt;
05
      // We divide the filter expressions into 3 parts
06
      val partitionColumns = AttributeSet(
07
        t.partitionColumns.map(=&amp;amp;gt; l.output.find(_.name == c.name).get))
08
 
09
      // Only pruning the partition keys
10
      val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
11
 
12
      // Only pushes down predicates that do not reference partition keys.
13
      val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
14
 
15
      // Predicates with both partition keys and attributes
16
      val combineFilters = filters.toSet −− partitionFilters.toSet −− pushedFilters.toSet
17
 
18
      val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
19
 
20
      logInfo {
21
        val total = t.partitionSpec.partitions.length
22
        val selected = selectedPartitions.length
23
        val percentPruned = (1 − selected.toDouble / total.toDouble) * 100
24
        s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
25
      }
26
 
27
      val scan = buildPartitionedTableScan(
28
        l,
29
        projects,
30
        pushedFilters,
31
        t.partitionSpec.partitionColumns,
32
        selectedPartitions)
33
 
34
      combineFilters
35
        .reduceLeftOption(expressions.And)
36
        .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
37

需要注意,在scan执行后,还需要执行combineFilters,它由既包含partition keys、又包含非partition keys的filters组成。

上面仅是找到所有filters的序列,但尚未判断哪些可以被pushdown,这是通过buildPartitionedTableScan触发判断的,调用链如下:

  1. 以上获取到备选pushedFilters变量
  2. 调用buildPartitionedTableScan方法,传递pushedFilters作为filters参数
    1. 生成scanBuild(requiredColumns: Seq[Attribute], filters: Array[Filter])方法,注意此filters非彼filters
    2. 调用pruneFilterProject方法,继续传递pushedFilters作为filterPredicates参数
      1. 调用pruneFilterProjectRaw方法,对scanBuild进行wrap,并继续传递pushedFilters作为filterPredicates参数
        1. 调用selectFilters方法,传入翻译为底层field names的pushedFilters作为predicates参数,返回值也叫pushedFilters,为了区分,记为realPushedFilters
          1. 遍历pushedFilters,依次调用translateFilter方法,将支持pushdown的catalyst predicate操作转变为data source的filter操作(支持=,>,<,>=,<=,and,or,not,字符串startsWith、endsWith、contain等)
        2. realPushedFilters传递给scanBuild方法,作为filter参数

以上的realPushedFilters才是spark认为可以被pushdown的filters组合。在scanBuild方法里,调用ParquetRelation.buildInternalScan方法,将realPushedFilters用FilterApi.And方法连接起来、组成一个Filter对象,再调用ParquetInputFormat.setFilterPredicate()设置为parquet.private.read.filter.predicate,从而使后面的parquet解析过程可以真正执行push down filter操作(注:Parquet pushDown不支持字符串操作,所以相关filter在这一步会被忽略)。

注意,传递给parquet的pushdown filters都是org.apache.parquet.filter2.predicate下的FilterPredicate子类对象。

Pushdown之底层执行

push down filters在Parquet里有两个应用场景:基于statistic数据的row group filter,以及record filter。前者可跳过大量数据,效率高;后者实际是在解压缩、反序列化之后进行的,仅能降低数据加载阶段产出的数据量。

RowGroupFilter

数据加载阶段task的执行由SqlNewHadoopRDD.compute()触发,针对Parquet文件,生成ParquetRecordReader对象(spark 1.6里针对flat schema有其他优化),并调用其initialize方法。在开启taskSideMeta时,在execution端执行row group filter操作(taskSideMeta=false的情况参考BLOG)。

 Scala |  copy code |? 
1
// if task.side.metadata is set, rowGroupOffsets is null
2
    if (rowGroupOffsets == null) {
3
      // then we need to apply the predicate push down filter
4
      footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
5
      MessageType fileSchema = footer.getFileMetaData().getSchema();
6
      Filter filter = getFilter(configuration);
7
      filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
8
   }

通过getFilter方法可将parquet.private.read.filter.predicate配置的pushDownFilters解析出来,并包装为FilterPredicateCompat对象,供filterRowGroups使用。后者使用Visitor模式,触发RowGroupFilter.visit方法,调用StatisticsFilter.canDrop()针对每个parquet block(即rowGroup)依次执行predicate操作。

如下是RowGroupFilter.visit代码,仅通过rowGroupFilters的blocks才有机会被解压、反序列化、继续操作:

 Scala |  copy code |? 
01
 
02
  @Override
03
  public List&amp;amp;lt;BlockMetaData&amp;amp;gt; visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
04
    FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
05
 
06
    // check that the schema of the filter matches the schema of the file
07
    SchemaCompatibilityValidator.validate(filterPredicate, schema);
08
 
09
    List&amp;amp;lt;BlockMetaData&amp;amp;gt; filteredBlocks = new ArrayList&amp;amp;lt;BlockMetaData&amp;amp;gt;();
10
 
11
    for (BlockMetaData block : blocks) {
12
      if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
13
        filteredBlocks.add(block);
14
      }
15
    }
16
 
17
    return filteredBlocks;
18
  }
19

StatisticsFilter.canDrop也是用的Visitor模式,触发StatisticsFilter.visit()重载方法,以最简单的equal过滤为例,可见仅当Parquet文件包含statistic信息时才有效。

 Scala |  copy code |? 
01
02
@Override
03
  public &amp;amp;lt;extends Comparable&amp;amp;lt;T&amp;amp;gt;&amp;amp;gt; Boolean visit(Eq&amp;amp;lt;T&amp;amp;gt; eq) {
04
    Column&amp;amp;lt;T&amp;amp;gt; filterColumn = eq.getColumn();
05
    T value = eq.getValue();
06
    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
07
    Statistics&amp;amp;lt;T&amp;amp;gt; stats = columnChunk.getStatistics();
08
 
09
    if (stats.isEmpty()) {
10
      // we have no statistics available, we cannot drop any chunks
11
      return false;
12
    }
13
 
14
    if (value == null) {
15
      // we are looking for records where v eq(null)
16
      // so drop if there are no nulls in this chunk
17
      return !hasNulls(columnChunk);
18
    }
19
 
20
    if (isAllNulls(columnChunk)) {
21
      // we are looking for records where v eq(someNonNull)
22
      // and this is a column of all nulls, so drop it
23
      return true;
24
    }
25
 
26
    // drop if value &amp;amp;lt; min || value &amp;amp;gt; max
27
    return value.compareTo(stats.genericGetMin()) &amp;amp;lt; 0 || value.compareTo(stats.genericGetMax()) &amp;amp;gt; 0;
28
  }
29

Record Filter

通过RowGroupFilter的parquet blocks,通过InternalParquetRecordReader进行读取,涉及到的核心类如下:

ParquetRecordFilter

Parquet代码里,大量使用Visitor模式,下面需要注意。

在MessageColumnIO.getRecordReader()里,构造了三个关键对象:

  1. FilteringRecordMaterializer,在一个record读取完毕后,会调用它的getCurrentRecord方法,检查predicate是否通过。该类有一个关键属性FilteringGroupConverter对象
  2. ColumnReadStoreImpl,实际读取column的入口类,并且将FilteringGroupConverter对象传递给它
  3. RecordReaderImplementation,它的read方法触发底层读取、解压、反序列化等操作,并在一个record完成后,触发FilteringRecordMaterializer.getCurrentRecord方法

另外,RecordReaderImplementation在初始化时,会针对每一个叶子节点(即存储primitive值的地方)生成对应的ColumnReaderImpl对象,后者包含FilteringPrimitiveConverter属性,是对读入的每个value执行predicate的对象,基于ColumnReadStoreImpl.FilteringGroupConverter获得。

在RecordReaderImplementation.read()里,每读入一个r、d、value tuple,就会调用ColumnReaderImpl.writeCurrentValueToConverter(),进而触发调用对属性binding的writeValue方法的调用,根据value的类型,触发FilteringPrimitiveConverter对应addXXX方法。以boolean类型为例,会执行所有valueInspectors即predicate操作:

 Scala |  copy code |? 
1
  @Override
2
  public void addBoolean(boolean value) {
3
    for (ValueInspector valueInspector : valueInspectors) {
4
      valueInspector.update(value);
5
    }
6
    delegate.addBoolean(value);
7
  }

valueInspectors是由IncrementallyUpdatedFilterPredicateBuilder生成的,其update方法会父类ValueInspector的setResult方法,设置当前predicate对象的result和isKnown两个boolean标记位。

 Scala |  copy code |? 
01
    if (clazz.equals(Boolean.class)) {                                                                      
02
      if (pred.getValue() == null) {
03
        valueInspector = new ValueInspector() {                                                             
04
          @Override
05
          public void updateNull() {                                                                        
06
            setResult(true);                                                                                
07
          }                                                                                                 
08
          
09
          @Override
10
          public void update(boolean value) {                                                               
11
            setResult(false);                                                                               
12
          }                                                                                                 
13
        };
14
      } else {
15
        final boolean target = (Boolean) (Object) pred.getValue();                                          
16
        
17
        valueInspector = new ValueInspector() {                                                             
18
          @Override
19
          public void updateNull() {                                                                        
20
            setResult(false);                                                                               
21
          }                                                                                                 
22
          
23
          @Override
24
          public void update(boolean value) {                                                               
25
            setResult(value == target);                                                                     
26
          }                                                                                                 
27
        };                                                                                                  
28
      }                                                                                                     
29
    }

RecordReaderImplementation.read()里,当完整读完一个record后,调用FilteringRecordMaterializer.getCurrentRecord方法,判断是否通过了所有的pushdown filters,若未通过则read方法返回null。具体而言,最终判断predicate是否通过,是要把inspection数组里所有result结果,通过and、or重新串联起来的,流程如下:

  1. RecordReaderImplementation.read()在完整读取一个record后,调用FilteringRecordMaterializer.getCurrentRecord()
    1. 调用IncrementallyUpdatedFilterPredicateEvaluator.evaluate(predicate根节点)
      1. Visitor模式调用predicate.accept(instance),instance完全是为了使用visitor模式构造出来的空IncrementallyUpdatedFilterPredicateEvaluator对象
        1. 调用IncrementallyUpdatedFilterPredicateEvaluator.visit()重载方法,可接受IncrementallyUpdatedFilterPredicate.And/Or或ValueInspector对象作为参数
        2. 如果是And、Or,则利用left、right IncrementallyUpdatedFilterPredicateEvaluator对象生成结果
        3. 如果是ValueInspector对象,则返回之前生成的result bool值

至此,Parquet完成了record级别的pushdown。

虽然Parquet等列式存储号称可提升性能N倍,但在实际中,正如新近发布的Apache Arrow指出的,其本身的序列化、反序列化耗时占比可达70%。Spark 1.6中,针对flat schema进行了一些优化,本文尝试源码级别的解读。

在SqlNewHadoopRDD类中,若开启了spark.sql.parquet.enableUnsafeRowRecordReader、数据源是parquet格式,且tryInitialize成功的话,就会采用UnsafeRowParquetRecordReader替代之前的parquet-hadoop jar提供的ParquetRecordReader类,作为parquet文件解析的入口。

在tryInitialize中,首先调用parent方法,处理了task.side.metadata不同设置时,对footer meta的处理。随后才是检查当前schema是否满足特殊处理的要求:

  • 不可包含非primitive类型 或 repeated类型(例如 struct、array、map)
  • 由于parquet里有type和original type的概念,需要再对original检查,original仅为null、decimal、utf8或date
  • 不支持original type为decimal,且精度超过18的数字
  • 不支持int96 type
  • 不支持schema演进

接下来loadBatch方法就很关键了。

首先调用ParquetFileReader.readNextRowGroup()读入未解压、未序列化的数据,会一次性读入一个row group里涉及到的所有columns,这也是压缩和序列化的单位。

随后,loadBatch进行批量加载。与ParquetRecordReader相比,它最大的区别之一应该在于批量反序列化。可以想象,此时内存中有一个row group的n个columns数据,针对每个column一次性解压缩后,每轮最多反序列化64个values存放到缓存里。

 Scala |  copy code |? 
01
  private boolean loadBatch() throws IOException {
02
    // no more records left
03
    if (rowsReturned &gt;= totalRowCount) { return false; }
04
    checkEndOfRowGroup();
05
 
06
    int num = (int)Math.min(rows.length, totalCountLoadedSoFar − rowsReturned);
07
    rowsReturned += num;
08
 
09
    if (containsVarLenFields) {
10
      for (int i = 0; i &lt; rowWriters.length; ++i) {
11
        rowWriters[i].holder().resetTo(fixedSizeBytes);
12
      }
13
    }
14
 
15
    for (int i = 0; i &lt; columnReaders.length; ++i) {
16
      switch (columnReaders[i].descriptor.getType()) {
17
        case BOOLEAN:
18
          decodeBooleanBatch(i, num);
19
          break;
20
        case INT32:
21
          if (originalTypes[i] == OriginalType.DECIMAL) {
22
            decodeIntAsDecimalBatch(i, num);
23
          } else {
24
            decodeIntBatch(i, num);
25
          }
26
          break;
27
        case INT64:
28
          Preconditions.checkState(originalTypes[i] == null
29
              || originalTypes[i] == OriginalType.DECIMAL,
30
              "Unexpected original type: " + originalTypes[i]);
31
          decodeLongBatch(i, num);
32
          break;
33
        case FLOAT:
34
          decodeFloatBatch(i, num);
35
          break;
36
        case DOUBLE:
37
          decodeDoubleBatch(i, num);
38
          break;
39
        case BINARY:
40
          decodeBinaryBatch(i, num);
41
          break;
42
        case FIXED_LEN_BYTE_ARRAY:
43
          Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL,
44
              "Unexpected original type: " + originalTypes[i]);
45
          decodeFixedLenArrayAsDecimalBatch(i, num);
46
          break;
47
        case INT96:
48
          throw new IOException("Unsupported " + columnReaders[i].descriptor.getType());
49
      }
50
      numBatched = num;
51
      batchIdx = 0;
52
    }
53
    return true;
54
  }

另一个重要区别是,这里使用unsafeRow,backed by raw memory instead of Java objects。每行包含numFields个tuples,一个tuple对应parquet的一列,每个tuple又由3部分组成:

[null bit set] [values] [variable length portion]

The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.以bit位记录列的null情况,8字节对齐。

In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length primitive types, such as long, double, or int, we store the value directly in the word. For fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the base address of the row) that points to the beginning of the variable-length field, and length (they are combined into a long).

values部分则采用了经典的变长存储方法。对于基本数字类型,直接存储在value字段里;其他变长类型,则在value里存储地址偏移量,而在第三部分variable length portion里存储真正的数据。

通过这种unsafe的方式,一方面可以减少GC的消耗。另一方面,也减少了java对象wrap的冗余存储,由于parquet flat格式里,有很多small size数据,这方面的优化对应内存用量应该也有比较大的效果。

最后结合UnsafeRow的get方法,看下内存复用程度。由于我不是Java出身,措辞可能不专业,如果有更好的解释方式,请留言。

针对基本数据类型,例如int直接返回变量本身(因为基本类型本身的内存是不可复用的):

 Scala |  copy code |? 
1
  @Override
2
  public int getInt(int ordinal) {
3
    assertIndexIsValid(ordinal);
4
    return Platform.getInt(baseObject, getFieldOffset(ordinal));
5
  }

而针对可修改的binary,则返回内存copy,以免row被后续mapPartitions等处理时修改了:

 Scala |  copy code |? 
01
02
  @Override
03
  public byte[] getBinary(int ordinal) {
04
    if (isNullAt(ordinal)) {
05
      return null;
06
    } else {
07
      final long offsetAndSize = getLong(ordinal);
08
      final int offset = (int) (offsetAndSize &gt;&gt; 32);
09
      final int size = (int) offsetAndSize;
10
      final byte[] bytes = new byte[size];
11
      Platform.copyMemory(
12
        baseObject,
13
        baseOffset + offset,
14
        bytes,
15
        Platform.BYTE_ARRAY_OFFSET,
16
        size
17
      );
18
      return bytes;
19
    }
20
  }

另外,没有看到unsaferow的内存何时释放。

Spark1.6在2016年1月发布,其中包含内存模型的优化,将之前shuffle、storage、other内存静态配置的方式,修改为动态抢占式。其对应设计稿中,包含了多种折衷和思考,对于理解新老版本内存模型都很有好处,故翻译如下。其中部分专有名词维持英文。

英文原文地址:https://issues.apache.org/jira/secure/attachment/12765646/unified-memory-management-spark-10000.pdf

Unified Memory Management in Spark 1.6

本文提出一种新的Spark内存管理模型,打破了老版本中execution和storage内存空间之间不可逾越的边界。Execution可以借用空闲的storage内存,反之亦然。而当内存压力上升时,被借用的内存会被回收。但考虑到实现复杂度,不会回收被借用于exection的内存(即仅会将execution借给storage的内存,回收还给execution)。

概览

Spark的内存可以大体归为两类:execution和storage。前者包括shuffles、joins、sorts和aggregations所需内存,后者包括cache和节点间数据传输所需内存。

在Spark 1.5和之前版本里,两者是静态配置的,不支持借用。这种方式有如下限制:

  • 不存在普适的默认配置
  • 内存配置的调优,需要使用者了解spark内部原理
  • (默认配置下)即使application无需cache,也只能使用很小一部分内存

我们的目标是,通过内存空间的融合,消除以上限制。

最终结果是提供更好的性能,且无须使用者调优就可以获得较优的内存使用。而且,由于无须静态配置内存分配方式,可以使用一个application支撑多种不同负载,而不会导致过多的spill。

老版本的内存管理

老版本Spark采用静态分配的内存空间,即将全部内存划分为3个独立区块,通过Spark配置设定每个区块相对JVM heap的百分比。

Execution:用于缓存shuffle、join、sort和aggregation的临时数据,通过spark.shuffle.memoryFraction(默认0.2)配置。

Storage:主要用于缓存未来可能会访问的数据,但也用于broadcast和发送较大的task结果(缓存和部分网络应用),通过spark.storage.memoryFraction(默认0.6)配置。

Other:剩余内存主要用于用户代码和spark内部meta数据。由于该类型内存不可控(默认0.2),故本文不再予以讨论。

在可控内存区块,一旦达到配置上限,内存中的数据会被spill至磁盘。而storage区块的数据(在ONLY_MEM配置时)甚至会被抛弃。不论哪种情况,都会导致IO或重算,造成性能下降。

内存配置详解

为了降低OOM风险,例如应对超大数据单元,Spark内存管理过于小心了 ,引入了safety配置解决数据倾斜。内存相关配置如下:

spark.shuffle.memoryFraction(default0.2)

spark.shuffle.safetyFraction(default0.8)

spark.storage.memoryFraction(default0.6)

spark.storage.safetyFraction(default0.9)

spark.storage.unrollFraction(default0.2)

即execution内存最多仅占JVM heap的0.2*0.8=16%!对于无需cache数据的应用,大部分heap内存都被浪费了,而(shuffle等)中间数据却被频繁spill到磁盘并读取。unroll相关配置将在后面介绍。

Execution内存管理

execution内存被分配给JVM里的多个task线程。与上面不同的是,task间的execution内存分配是动态的,如果没有其他tasks存在,Spark允许一个task占用所有可用execution内存。

有以下几个内存相关类:

ShuffleMemoryManager负责全局计数和内存调度(policy enforcement)。它是核心仲裁者,根据task当前内存用量决定如何进行分配。一个JVM里仅有一个实例。

TaskMemoryManager负责每个task的内存分配和跟踪。它通过page table追踪on-heap内存块,task退出时它若检查到有page未释放则会抛出异常。它使用ExecutorMemoryManager真正执行分配和释放。每个task一个实例。

ExecutorMemoryManager负责具体实现,它管理on-heap和off-heap内存,并实现了一个weak reference pool支持跨tasks的空闲页复用。一个JVM里仅有一个实例。

这几个类如下进行交互。一旦task需要分配较大内存,它首先向ShuffleMemoryManager申请X bytes。如果申请可以被满足,task会向TaskMemoryManager要求分配X bytes内存。后者更新自己的page table,并且要求ExecutorMemoryManager真正分配内存。

内存分配策略

每个task可以向ShuffleMemoryManager申请1/N的execution内存,N是当前存在的tasks数量(通过cores配置)。如果内存申请无法满足,可能会导致数据spill从而释放一些内存。依赖于不同的操作类型,该task随后可能继续尝试,或要求分配较小一些的内存。

为了避免过多spill,task仅当已分配到1/(2N)内存后才会spill。如果还没达到1/(2N)就内存不足了,那么该task会阻塞等待其他tasks释放内存。否则,如果先启动的task占用过多内存,会导致后启动的tasks持续spill。

例子:某executor先启动一个task A,并在task B启动前快速占用了所有可用内存。(B启动后)N变成2,task B会阻塞直到task A spill,自己可获得1/(2N)=1/4的execution内存。而一旦task B获取到了1/4的内存,A和B就都有可能spill了。

注意:直到A无法获得更多内存,它才会spill。与此同时,由于内存被占用,其他新tasks可能被饿死。可用通过一些强制spill机制予以规避,但不在本文讨论范围。

Storage内存管理

Storage内存通过BlockManager管理,虽然主要用于缓存RDD分片,它也被用于torrent broadcast和将较大的结果数据发送给driver。

Storage level

每个block都设置了一个storage level,用于指定是存储在内存、磁盘还是off-heap。Block也可以设置为存储在内存和磁盘,当内存不足的时候就可以转移至磁盘了。

Storage level也指定是否以序列化的方式进行存储。需要注意MEMORY_AND_DISK_SER,即内存数据也是以序列化方式存储的,故当需要转移至磁盘时无需额外序列化了,这种配置时,内存回收代价较低。

回收(evict)策略

老版本的回收策略基本就是LRU,且仅用于内存存储数据。有两个例外需要注意。首先,永远不会为了保存当前RDD的新blocks而回收其已存在的blocks。其次,如果unrolling失败,那所涉及的新block直接被回收。

Unrolling

如果BlockManager收到需存储在内存中的iterator,需要将其unroll为数组。但由于迭代数据可能较大、无法全部放入内存,BlockManager需要逐步unroll以避免OOM,并间歇性的检查内存是否足够。

Unroll所需内存消耗storage空间。如果当前没有缓存数据,unrolling可占用全部storage空间。否则,unrolling内存上限由spark.storage.unrollFraction(默认0.2)配置。注意,这些sub-region(unrolling、cache等)都不是静态保留,而是随着抛弃现存blocks而冬天变化的。

如果在抛弃现存blocks后unrolling还是失败了,BlockManager会直接把unrolling对应的block存储到磁盘里。

设计方案

下面给出设计方案,并讨论折衷点。这里仅涉及high level内存管理策略。

建议

Execution和storage的内存空间可交叉。如果execution内存不足,可以借用storage的空闲空间,反之亦然。被借用的内存可以随时被回收,但考虑到实现复杂度,第一版设计里,借给execution的内存用于不会被强制回收。将引入以下新配置:

  • spark.memory.fraction(默认0.75):用于execution和storage的堆内存比例。该值越低,越容易发生spill和缓存数据回收。该配置实际上也限定了OTHER内存大小,以及偶发超大record的内存消耗。
  • spark.memory.storageFraction(默认0.5):spark.memory.fraction中storage内存的比例。缓存数据仅在该内存超限时回收。
  • spark.memory.useLegacyMode(默认false):是否使用老版本的内存管理模型。老版本将堆内存分为固定大小的区块,在未调优时可能导致过度spill。以下deprecated内存配置仅该配置打开时生效:

    • spark.storage.memoryFraction
    • spark.storage.safetyFraction
    • spark.storage.unrollFraction
    • spark.shuffle.memoryFraction
    • spark.shuffle.safetyFraction

以下将讨论折衷。

如何处理内存压力

由于execution和storage共享同一块内存空间,在内存压力上升导致竞争时,我们必须明确内存回收规则。有以下三种可能的选择:

  • 优先回收execution内存
  • 优先回收storage内存
  • 不回收内存

内存回收代价

Storage内存的回收依赖于storage level。MEMORY_ONLY的数据回收代价最大,因为后续的访问将导致重算。MEMORY_AND_DISK_SER代价最小,因为内存回收过程不涉及序列化,仅增加了I/O。

Execution内存的回收场景比较单一,所有被回收的数据将被spill到磁盘,不涉及重算。而且随着最近新增的unsafe功能,execution数据大多以紧凑格式存储,序列化成本也较低。

但有一点需要注意的是,被spill到磁盘的数据必须被回读,所以虽然没有重算,但execution内存回收仍有一定成本。

实现复杂度

Storage回收比较简单,基于现有机制就可将数据落地磁盘。但execution回收较为复杂,有以下方式:

  • 为所有execution内存块注册一个spill callback
  • 修改回读(poll)和spill以实现联动

每种方式,我们都需要关注是否有一个页存储了cache数据(我的理解是,都需要跟踪存储位置)。而且,一些操作还假设至少有一个page内存可用,如果我们强制spill execution内存,就需要小心、以免回收掉保留内存,而把这些操作饿死。

另外,当回收execution内存时,还需要解决如何处理即将被cache的blocks的问题。最简单的方式是等待直到有足够的内存被释放了,但这可能会导致死锁,尤其是待缓存块还依赖内存中exection数据的时候(例如,persist shuffled数据的时候)。一种替代方案是,将待缓存块直接落地磁盘,随后一旦有空闲内存再读取出来。为了避免落地所有待缓存块,可预留少量(例如5%的堆)内存用于存储一部分待缓存块。

但考虑到复杂度,第一版中暂不实现。如果今后证明该功能确实重要,再考虑升级。

选型

最终选择回收缓存数据。由于设计目的是期望提供更多的execution内存,而强制回收execution内存于事无补。另外该选择也更容易实现。

但如果允许所有回收缓存数据,那对于依赖缓存的applications也会有显著影响。所以,我们也需要为缓存块保留一部分内存。

最小保留(Minimum reservations)

不论是回收缓存块还是spill execution内存,都需要允许使用者保留一部分内存以免任一方被饿死(注:因为回收和spill都会释放出空闲的、可被借用的内存)。有动态和静态两者设置方法。老版本为execution和storage都提供了静态保留方式,不足之处仍然是割裂。

这次新的设计里提供动态最小预留内存的方式。即storage可以借用execution内存,但一旦后者需要就会立即被回收并归还。反之亦然,但有一个例外,如果application尝试缓存数据块,但所有内存都用于execution了,那我们不会回收execution内存,而是直接将待缓存数据块落地磁盘。

另外,我们还需为OTHER用途设置内存,老版本默认是20%的堆空间,且在application执行期不变。本次设计不予以改变,但配置方式改为通过spark.memory.fraction(默认0.75),即剩余空间用于OTHER。

其他设计方式

以上分析了相关技术折衷点,这里再对比其他一些方面。

在所有新设计里(A-C),execution和storage内存之和都由spark.memory.fraction限制。另外,为了后向兼容性,使用者可开启spark.memory.useLegacyMode以使用老版本。

(X)Existing behavior。将execution和storage内存分别限制在独立空间,不允许交叉。各自内存空间大小是静态配置的。用于Spark1.5及之前版本。

(A)Evict cached blocks,full fluidity。Execution和storage共享统一的内存空间。当内存压力大时,回收缓存数据。仅当抢占storage后依然内存不足时,execution才会spill。

(B)Evict cached blocks,static storage reservation。与A类似,但为storage保留一定内存不可被借用,缓存块仅当超出该空间时才会被回收和借用。通过spark.memory.storageFraction(默认0.0)进行配置,执行期不可变。

(C)Evict cached blocks,dynamic storage reservation。与B类似,但storage的保留内存也是动态可变的。区别是当内存空闲时,execution可以最大限量的借用。这也是我们选择的方案。

未选择A的原因是,它没有解决多租赁(multi-tenancy)和严重依赖缓存的application问题。所以提出B,添加了保留内存。

B的问题是,保留内存在某些场景下仍然需要使用者配置。在共享环境里(注:一个application支撑多处理请求时)默认值0并不奏效;通过较大的shuffle,一个使用者可能回收另一个使用者的缓存数据。另外,不论设置什么非0的配置,都会导致内存块割裂。例如,为了避免共享环境下潜在的性能倒退,可能设置该值为0.6(旧版本默认storage比例),从而导致execution内存仅占0.4*0.75 = 0.3的堆空间,并不比之前好,尤其是根本无需缓存数据时。

C没有设立强制的内存隔离。当storage空间有空闲时,execution可以借用。唯一的问题是,当storage需要缓存数据,但内存都被execution占用时怎么办?初始版本直接将待缓存数据落地磁盘。

C也是唯一满足如下要求的:

  • Storage内存没有上限(X不满足)
  • Execution内存没有上限(X和B不满足)
  • 可保证最小storage空间(A不满足)

所以我们选择了方案C

(注,后面是实现层面,看代码更合适,故不翻译了)

========================

最后总结下,在新版本且legacy=false时,有如下内存借用规则:

  • execution可借用所有空闲的storage内存
  • execution也可以收回storage向自己借用的内存
  • 但storage已使用的、属于storage的内存,不可被借用
  • storage可借用execution空闲内存
  • 但storage无法收回已经被execution借用的内存

故可以看到规则整体倾向于execution,尽量保证shuffle内存充沛。

另外,该模型不涉及内存实际的分配与释放,仍是由JVM保证的。它做的是内存计数器。

之前提到parquet.block.size所控制的parquet row group大小是一个需要调优的spark参数。其中重要一点,就是控制任务的并发度。

在Hadoop里,任务的并发默认是以hdfs block为单位的,而Spark里多了一种选择,即以RowGroup为基本单位。以下将对比spark 1.4和我厂根据1.5打的patch版本,分析两者的实现方式和优劣。

在调用HiveContext.read.parquet(path)时,会触发ParquetRelation2对象生成SqlNewHadoopRDD对象,并覆盖其中getPartitions()方法,它是本次分析的关键入口。

Spark 1.4社区版

该版本的SqlNewHadoopRDD.getPartitions()里,使用FilteringParquetRowInputFormat类,在spark.sql.parquet.cacheMetadata=true(默认)时,会使用cachedStatus和cachedFooters缓存,覆盖父类的listStatus()和getFooters()方法。后者会触发对parquet footers的扫描

 Scala |  copy code |? 
01
        // Overridden so we can inject our own cached files statuses.
02
        override def getPartitions: Array[SparkPartition] = {
03
          val inputFormat = if (cacheMetadata) {
04
            new FilteringParquetRowInputFormat {
05
              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
06
              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
07
            }
08
          } else {
09
            new FilteringParquetRowInputFormat
10
          }
11
 
12
          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
13
          val rawSplits = inputFormat.getSplits(jobContext)
14
 
15
          Array.tabulate[SparkPartition](rawSplits.size) { i =&gt;
16
            new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
17
          }
18
        }

如下图,在默认配置时,是以hdfsBlock为单位,进行task调度的。但可以通过设置parquet.task.side.metadata=false,使以rowGroup为单位调度(当group太小时,会合并,保证满足minsize)。但即使采用taskSideMeta,默认情况下,这时也读取了footers信息!而这是不必要的。
下图包含3个部分:
  • 中间流程:FilteringParquetRowInputFormat.getSplits(),在ParquetRelation2.buildScan()所构建SqlNewHadoopRDD的getPartitions()里被调用。
  • 左边流程:spark.sql.parquet.cacheMetadata=true(默认),使用getTaskSideSplits()。
  • 右边流程:spark.sql.parquet.cacheMetadata=fasle,使用getClientSideSplits()。

图中蓝色标识了默认分支。

getSplits

这时会产生一个问题,当默认情况下,一个rowGroup横跨多个splits时,怎么办?即可能有多个executor都收到这个RowGroup的处理请求,并分别逻辑持有一部分block数据。

如下图所示,当每个executor收到task对应的split信息时,读取所在文件的footer meta,拿到所有的rowGroups。用split指定的blocks range,去圈定应该自己处理的rowGroups,包括两种:

  • 完全在blocks range里的,肯定是自己处理
  • rowGroup中点在range里的,也是自己处理

getSplits-executor

为什么采取中点呢?在合理设置的parquet.block.size和hdfs.block.size时,两者的值应该比较接近,那么当一个hdfs block持有row group中点时,它至少拥有一多半的数据。从而保证仅有一个task会处理该rowGroup。

如下是parquet-format包里ParquetMetadataConverter.java的关键代码:

 Scala |  copy code |? 
01
  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
02
    List&lt;RowGroup&gt; rowGroups = metaData.getRow_groups();
03
    List&lt;RowGroup&gt; newRowGroups = new ArrayList&lt;RowGroup&gt;();
04
    for (RowGroup rowGroup : rowGroups) {
05
      long totalSize = 0;
06
      long startIndex = getOffset(rowGroup.getColumns().get(0));
07
      for (ColumnChunk col : rowGroup.getColumns()) {
08
        totalSize += col.getMeta_data().getTotal_compressed_size();
09
      }
10
      long midPoint = startIndex + totalSize / 2;
11
      if (filter.contains(midPoint)) {
12
        newRowGroups.add(rowGroup);
13
      }
14
    }
15
    metaData.setRow_groups(newRowGroups);
16
    return metaData;
17
  }

近似社区1.5版本

在构造SqlNewHadoopRDD时,不再使用FilteringParquetRowInputFormat而是直接使用ParquetInputFormat,并覆盖其listStatus方法,在spark.sql.parquet.cacheMetadata=true(默认)时,使用cachedStatuses。SqlNewHadoopRDD.getPartitions()触发ParquetInputFormat.getSplits()方法。

默认即parquet.task.side.metadata=true时,直接调用hadoop的FileInputFormat.getSplits()方法,即与hadoop的分片方式一致。

分片大小由以下3者决定:

  • 默认情况就是该hdfs文件的blockSize。
  • 如果设置maxSize比blockSize小,则可以让一个block被多个tasks处理(在executor端,通过上面的方式,保证rowGroup单次处理)。
  • 如果设置的minSize比blockSize大,则可以合并多个blocks由一个task处理。可以看出来,这也足够灵活控制并发了。

以下是hadoop-core里FileInputFormat.computeSplitSize代码:

 Scala |  copy code |? 
1
  protected long computeSplitSize(long goalSize, long minSize,
2
                                       long blockSize) {
3
    return Math.max(minSize, Math.min(goalSize, blockSize));
4
  }

而如果设置了parquet.task.side.metadata=false,则回到社区1.4版本的方式,由driver端扫描footers,根据rowGroups计算splits。

Parquet作为目前SparkSQL默认的存储方式,号称在性能与存储空间等方面达到较好地平衡。但为了达到高性能,还需要进一步调优,其中parquet.block.size是经常提到的一个配置。从字面看,它控制了parquet block的数据大小,但不同的block size会带来怎样的红利、是否适合业务要求呢?parquet支持多种encoding和compress方式,这里的block size是encoded and compressed size吗?

前一个话题较大,后续再讨论,但以我的经验而言,并不是像官方文档所说,1GB就一定比256MB好,得看应用场景。这里主要通过对parquet和spark源码的分析,阐释第二个问题。

先给结论

parquet.block.size控制的是parquet编码后、encoding后、大部分Pages compressed后的数据大小。但是是粗略的size,因为check mem时:

  • 可能有部分数据在encoder缓存中,未计算在内
  • 可能采用原始数据大小,而实际写入的是dict编码后的大小
  • 最后一个page的数据还未压缩

Parquet文件和LOG分析

下面示例中,parquet.block.size=256MB, parquet.page.size=1MB,都是默认值。在parquet里,经常混用block和row group的概念,可以认为两者相等。但需注意,这里的block与hdfs.block是两码事。

使用parquet tools可以看到meta信息:

row group 1: RC:9840100 TS:698748990 OFFSET:4

其中RC是RowCount,TS是压缩前的TotalByteSize,OFFSET是该row group的StartingPos。可以看到TS达到666MB,远超配置的256MB。

查看写入日志:

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: mem size 268,650,005 > 268,435,456: flushing 9,418,847 records to disk.

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 267,733,756

Jan 7, 2016 2:03:59 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 151,651,009B for [xxx] BINARY: 9,418,847 values, 336,901,990B raw, 151,629,488B comp, 324 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY, PLAIN], dic { 25,702 entries, 925,241B raw, 25,702B comp}

可以看到当内存用量略超过parquet.block.size时,会触发block的flush。而raw大小一般会远超parquet.block.size。

从这些表象,也可以猜测,block size应该控制的是compressed后的数据大小吧,但为了得到更加精确和确定的答案,还是看源码。

源码解释

在调用df.write.parquet()写入数据时,包括以下关键步骤(以下涉及spark和parquet-mr代码):

Spark-parquet-write-2

Spark-parquet-write从上图可以看到,check时的内存是所有columns buffered size之和,那这些项都是什么含义呢?还是先给结论:

  • repetition level、definition level、data对应最后一个page(还没真的形成page)的size,是未压缩的内存消耗大小。
  • pageWriter.getMemSize 是ColumnChunkPageWriter.getMemSize,即buf.size(),是之前flush完的pages形成的压缩后的二进制字节流的大小。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSizeInMemory() {
3
    return repetitionLevelColumn.getBufferedSize()
4
        + definitionLevelColumn.getBufferedSize()
5
        + dataColumn.getBufferedSize()
6
        + pageWriter.getMemSize();
7
  }

我们先看r level、d level和data size,对应上面代码的前3项,这个比较好理解。

其中,repetitionLevelColumn和definitionLevelColumn根据maxLevel值,可能是RunLengthBitPackingHybridValuesWriter或DevNullValuesWriter对象;dataColumn根据value type由ParquetProperties.getValuesWriter决定,例如逻辑类型string对应到底层数据type binary,一般会使用PlainBinaryDictionaryValuesWriter对象。以下举几个例子。

RunLengthBitPackingHybridValuesWriter的内存消耗实际发生在encoder里,所以直接返回其encoder.size,即RunLengthBitPackingHybridEncoder.getBufferedSize()。通过后者的writeInt(),可以看到并不是每个value都立刻写入baos内存(调用writeOrAppendBitPackedRun方法),之外还有一些缓存未计算,但误差较小。

 Scala |  copy code |? 
01
  public void writeInt(int value) throws IOException {
02
    if (value == previousValue) {
03
      // keep track of how many times we've seen this value
04
      // consecutively
05
      ++repeatCount;
06
 
07
      if (repeatCount &amp;amp;amp;amp;gt;= 8) {
08
        // we've seen this at least 8 times, we're
09
        // certainly going to write an rle−run,
10
        // so just keep on counting repeats for now
11
        return;
12
      }
13
    } else {
14
      // This is a new value, check if it signals the end of
15
      // an rle−run
16
      if (repeatCount &amp;amp;amp;amp;gt;= 8) {
17
        // it does! write an rle−run
18
        writeRleRun();
19
      }
20
 
21
      // this is a new value so we've only seen it once
22
      repeatCount = 1;
23
      // start tracking this value for repeats
24
      previousValue = value;
25
    }
26
 
27
    // We have not seen enough repeats to justify an rle−run yet,
28
    // so buffer this value in case we decide to write a bit−packed−run
29
    bufferedValues[numBufferedValues] = value;
30
    ++numBufferedValues;
31
 
32
    if (numBufferedValues == 8) {
33
      // we've encountered less than 8 repeated values, so
34
      // either start a new bit−packed−run or append to the
35
      // current bit−packed−run
36
      writeOrAppendBitPackedRun();
37
    }
38
  }

PlainBinaryDictionaryValuesWriter等需形成dictionary的writers,使用的不是编码后的value内存消耗(这时就是dict index了),而是原始写入的value bytes size。因为当dict编码不合适时(例如重复率没有那么高),会fallback回原始方式,这时要防止page过大。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSize() {
3
    // use raw data size to decide if we want to flush the page
4
    // so the acutual size of the page written could be much more smaller
5
    // due to dictionary encoding. This prevents page being to big when fallback happens.
6
    return rawDataByteSize;
7
  }

从上面可以看到,check时内存的计算方式,其实是比较粗略的。但计算的size确实是parquet编码(已形成repetition level和definition level),且经过encode了。那压缩发生在此之后吗?(图太大,建议点开看)

Spark-parquet-write
从上面的流程图可以看到,一旦buffered size超过parquet.block.size,就会触发flush,把最后一个分片的内容调用writePage方法写出,将一个page里所有rows的repetition、definition和data以二进制的形式拼接在一起,并针对字节流进行压缩。

还有一个需要注意的点,即dictionary对象是在flush时生成的,其内存消耗也没有计算在check mem里。不过dict的大小由配置决定,一般也就是MB级别,不会太大。

再来看下ColumnChunkPageWriter.getMemSize,为什么说它是编码、压缩后的pages字节流大小呢?

这就得结合着parquet.page.size了。这个值会被compress实现类和ColumnWriteImpl类使用。这里主要关注后者,在该类的每个writeXXX方法的最后都会调用accountForValueWritten方法,该方法检查当前column的内存消耗是否达到page.size,如果达到就也会调用writePage方法。而后者在上图可以看到,会形成字节流并压缩。所以,也可以明白,为什么parquet的document里面说,page是encoding和compress的unit了。

 Scala |  copy code |? 
01
  private void accountForValueWritten() {
02
    ++ valueCount;
03
    if (valueCount &amp;gt; valueCountForNextSizeCheck) {
04
      // not checking the memory used for every value
05
      long memSize = repetitionLevelColumn.getBufferedSize()
06
          + definitionLevelColumn.getBufferedSize()
07
          + dataColumn.getBufferedSize();
08
      if (memSize &amp;gt; pageSizeThreshold) {
09
        // we will write the current page and check again the size at the predicted middle of next page
10
        valueCountForNextSizeCheck = valueCount / 2;
11
        writePage();
12
      } else {
13
        // not reached the threshold, will check again midway
14
        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
15
      }
16
    }
17
  }

从上面还可以看到,一个完整block在写入的时候,是需要全部存放在一个executor内存里的(其实读取时也一样),所以设置时,也需要考虑下自己executor的内存是否充沛。

总结

可以粗略的认为parquet.block.size控制的是最终写入磁盘的数据大小。

由于读写都是以block为单位的,较大的block size在一定的数据量下,会减少磁盘寻址,带来较高的IO吞吐。但需要注意的是,其解压后的大小可能是磁盘大小的好几倍(我们几乎达到10倍),太大也会使后续计算时内存压力变大。

参考资料

  • https://parquet.apache.org/documentation/latest/
  • http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format
  • http://my.oschina.net/jhone/blog/517918
  • http://tajo.apache.org/docs/0.8.0/table_management/parquet.html

问题描述

我们有一个基于Spark的项目,接收用户的输入条件,生成spark job。其中有一段逻辑是df.groupby(cols).agg(…),平时cols都是非空的,还可以正常执行,但有一天,用户选择了一个空的cols,job就hang在那儿了~

假设stage 1对应shuffle stage,stage 2对应后面的result stage,那么stage 1可以顺利跑完,且生成了默认的200个output splits。而stage 2里,199个都可以正常完成,只有一个task,每跑必挂。fail的output:

FetchFailed(BlockManagerId(42, host1, 15656), shuffleId=0, mapId=15, reduceId=169, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to host1

问题分析

从上面的日志,看起来像是stage 2的executor请求shuffle output时挂掉,但这时shuffle server在做什么呢?从spark UI里可以看到该fail的shuffle server在stage 2里也有执行task,并且也fail了!

登录到executor的container里,可以看到其gc日志,young和old都基本使用了100%,达到15GB左右,说明数据量非常大,且都是active的数据,无法被flush掉。但stage 1总共的shuffle output也不过8.9GB。

再细看该executor的stderr log,可以看到大量生成shuffle output请求的日志,累计也差不多8.9GB了!

15/12/15 13:17:30 DEBUG storage.ShuffleBlockFetcherIterator: Creating fetch request of 97451434 at BlockManagerId 

为什么会有如此大的数据倾斜发生呢?

原因定位

看一下spark里agg的实现,最终形成物理执行计划时,对应execution/Aggregate,它对shuffle的影响如下:

 Scala |  copy code |? 
01
  override def requiredChildDistribution: List[Distribution] = {
02
    if (partial) {
03
      UnspecifiedDistribution :: Nil
04
    } else {
05
      if (groupingExpressions == Nil) {
06
        AllTuples :: Nil
07
      } else {
08
        ClusteredDistribution(groupingExpressions) :: Nil
09
      }
10
    }
11
  }

原因不言而明,这时groupingExp为空,于是需要AllTuples的分布,即所有数据在一个分片上!

经验教训

在数据量超过一个节点承受范围的时候,不要直接使用DataFrame.agg,好歹使用DataFrame.groupBy().agg(),且要注意groupBy的cols一定非空!否则只能开多多的资源,让其他executor站旁边看热闹了,造成极大的资源浪费,还会使任务有高概率挂掉。

本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。

实例伪码

hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
     .mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()

SparkUI截图

在最初观察以下UI时,有几个疑问:
  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
  3. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个

直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。

9E59B789-DDA2-495F-9998-4B85648C69C2
Job 6:
008DEE8C-F1E6-4546-9640-55F00EE1D77D
Job 7:
A9EE2B86-463F-4F6C-B296-1692C578CF48

解析全览

以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。

  1. 白色图标代表coding时的API。
  2. 灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
  3. 蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
  4. 黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。
B2FA598C-82C6-4639-9D0A-8944560CE591

在我们调用spark API时,背后发生了什么呢?

这个问题得分开看。

在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。

但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。

但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。

toJavaRDD的效果

调用该方法时,会触发dataframe的解析(全览图标注为第1步):

 Scala |  copy code |? 
1
lazy val rdd: RDD[Row] = {
2
  // use a local variable to make sure the map closure doesn't capture the whole DataFrame
3
  val schema = this.schema
4
  queryExecution.executedPlan.execute().mapPartitions { rows =&gt;
5
    val converter = CatalystTypeConverters.createToScalaConverter(schema)
6
    rows.map(converter(_).asInstanceOf[Row])
7
  }
8
}

上面的queryExecution.executedPlan会触发以下一系列动作(注意,不包含execute()调用),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。

20426B48-26EC-47CE-982A-23408377025A

plan.execute()调用的效果

这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:

 Scala |  copy code |? 
01
/*Partitioner.RangePartitioner */
02
 
03
  def sketch[: ClassTag](
04
      rdd: RDD[K],
05
      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
06
    val shift = rdd.id
07
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
08
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =&gt;
09
      val seed = byteswap32(idx ^ (shift &lt;&lt; 16))
10
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
11
        iter, sampleSizePerPartition, seed)
12
      Iterator((idx, n, sample))
13
    }.collect()
14
    val numItems = sketched.map(_._2.toLong).sum
15
    (numItems, sketched)
16
  }

该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。

该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。

更详细地看下job6和7的RDD编号:

263C92DB-EC5D-4257-88E1-3C7A97AEA12C

  1. #279(含)之前的RDD都是主子job复用的
  2. 子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。
 Scala |  copy code |? 
1
  protected override def doExecute(): RDD[Row] = attachTree(this"sort") {
2
    child.execute().mapPartitions( { iterator =&gt;

由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。

rdd.collect()调用的效果

终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。

问题解答

  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
    1. 因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
    1. stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
  • stage 6:hc.read.parquet(paths).filter(…).select(…)  + groupBy(all fileds).count()的前半段
  • stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
  • stage 8:被跳过,复用了stage6
  • stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
  • stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
  1. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
    1. 解答与上面一样

经验与教训

1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?

这时同样会生成两个job,且都是从hdfs读取数据了~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。

2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。

最近在做一个spark应用程序验证时,发现一个奇怪的事情,抽样处理后再sort的数据居然是无序的!最后由公司内负责spark的同学予以修复,但在追查的过程中我也恰好看了看SparkSQL的处理过程,理解了SparkSQL里DataFrame的解析、partitioner与Distribution之间的关系。

问题描述

仅当如下代码时,才会出现sort无效的情况:

 Python |  copy code |? 
1
df = hc.read.parquet(path).filter("trade1='Automobile'").filter("trade2='Default'")\
2
.filter("pdate='2015−06−03'").select("userid").limit(count).repartition(1024)\
3
.sort("userid")

而如果sort前没有同时出现limit和repartition,sort的结果就是有序的。

排查过程

直觉上如果sort结果完全无序,压根过不了spark社区,不可能发布,所以一定是在某些特定场景下才会出问题,故经过尝试首先确定了以上描述的特定错误场景。

那么它与正确场景有什么区别呢?例如hc.read.parquet().filter().select().sort()?打印它们的执行计划可以看到:

72 Sort [baiduid#87 ASC], true

73  Repartition 1024, true

74   Limit 100

75    Project [baiduid#87]

76     Filter (((trade1#96 = Automobile) && (trade2#97 = Default)) && (pdate#98 = 2015-06-03))

77      PhysicalRDD [baiduid#87,trade1#96,trade2#97,pdate#98], UnionRDD[63] at toString at NativeMethodAccessorImpl.java:-2

 

而正常情况下:

150 Sort [baiduid#267 ASC], true

151  Exchange (RangePartitioning 200)

152   Repartition 1024, true

153    Project [baiduid#267]

154     Filter (((trade1#276 = Automobile) && (trade2#277 = Default)) && (pdate#278 = 2015-06-03))

155      PhysicalRDD [baiduid#267,trade1#276,trade2#277,pdate#278], UnionRDD[203] at toString at NativeMethodAccessorImpl.java:-2

可以看到正常时多了一个Exchange步骤,但我们的代码里肯定没有显式调用它。

修复方式及原因总结

在execution/basicOperators.scala的Repartition添加如下代码:
 Scala |  copy code |? 
1
  override def outputPartitioning = {
2
    if (numPartitions == 1) SinglePartition
3
    else UnknownPartitioning(numPartitions)
4
  }

原因是如果没有override,该值=child.outputPartitioning,在此场景是Limit.outputParititioning = SinglePartition,意味着满足sort分布要求(但实际却是无序的)。

要想修复这个bug,还得对spark的执行过程有所了解,所以我们先来看它。

SparkSQL解析过程分析

我们使用hc.read.parquet()初始化一个dataframe时,此时需关注的是LogicalPlan:

 Scala |  copy code |? 
1
      sqlContext.baseRelationToDataFrame(
2
        new ParquetRelation2(
3
          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))

即此时的logicalPlan = LogicalRelation(ParquetRelation2)

而filter、select、sort等操作,都会将之前的dataframe.logicalPlan作为child,再生成包含新的logicalPlan的新的DataFrame。在我们的问题示例代码里,拥有这些LogicalPlan(按生成的先后顺序):

  1. LogicalRelation(ParquetRelation2) — parquet生成
  2. Filter 3个
  3. Project
  4. Limit
  5. Repartition
  6. Sort  — 最终的LogicalPlan

假定我们此时通过针对Sort的dataframe.collect()触发spark执行,其实是触发了

queryExecution.executedPlan.executeCollect()的执行,这儿是对SQLContext内部类QueryExecution的lazy变量executedPlan的调用,并间接触发了一系列过程(为了描述方便,忽略了细节):

  1. analyzed = analyzer.execute(logical)
  2. optimizedPlan = optimizer.execute(withCachedData)
  3. sparkPlan = planner.plan(optimizedPlan).next()
  4. executedPlan = prepareForExecution.execute(sparkPlan)

(当然,如果调用的不是DataFrame api而是sql的话,还需要parseSQL过程,这里忽略了)

Analyzer过程

它会调用一堆filter chain来处理这些logicalPlan tree,通过以下代码可以看到:

 Bash |  copy code |? 
01
/* Analyzer.scala */
02
  lazy val batches: Seq[Batch] = Seq(
03
    Batch("Substitution", fixedPoint,
04
      CTESubstitution ::
05
      WindowsSubstitution ::
06
      Nil : _*),
07
    Batch("Resolution", fixedPoint,
08
      ResolveRelations ::
09
      ResolveReferences ::
10
      ResolveGroupingAnalytics ::
11
      ResolveSortReferences ::
12
      ResolveGenerate ::
13
      ResolveFunctions ::
14
      ExtractWindowExpressions ::
15
      GlobalAggregates ::
16
      UnresolvedHavingClauseAttributes ::
17
      TrimGroupingAliases ::
18
      typeCoercionRules ++
19
      extendedResolutionRules : _*)
20
  )
21
 
22
/* HiveTypeCoercion */
23
  val typeCoercionRules =
24
    PropagateTypes ::
25
    ConvertNaNs ::
26
    InConversion ::
27
    WidenTypes ::
28
    PromoteStrings ::
29
    DecimalPrecision ::
30
    BooleanComparisons ::
31
    BooleanCasts ::
32
    StringToIntegralCasts ::
33
    FunctionArgumentConversion ::
34
    CaseWhenCoercion ::
35
    Division ::
36
    PropagateTypes ::
37
    ExpectedInputConversion ::
38
    Nil
39
 
40
/* SQLContext.scala */
41
    new Analyzer(catalog, functionRegistry, conf) {
42
      override val extendedResolutionRules =
43
        ExtractPythonUdfs ::
44
        sources.PreInsertCastAndRename ::
45
        Nil
46
 
47
      override val extendedCheckRules = Seq(
48
        sources.PreWriteCheck(catalog)
49
      )
50
    }

 

该过程主要将各个table filed绑定到真正的table结构上去,也会发现field错误、类型错误、类型适配等。但整体而言,它只是完成分析过程,不会改变执行计划。

Optimizer过程

故名思议,这一阶段会进行优化,如果sql或api调用写的太烂,这一阶段完成,可能会跟原始语句有较大变化。但本质上,它只是把本地可以得出结论的语句优化掉(例如针对非nullable的字段判断is null,那肯定是true的),或者把可合并的合并了(例如多个filter、select等),也会稍微调整filter、union等的顺序。

 Scala |  copy code |? 
01
  val batches =
02
    // SubQueries are only needed for analysis and can be removed before execution.
03
    Batch("Remove SubQueries", FixedPoint(100),
04
      EliminateSubQueries) ::
05
    Batch("Operator Reordering", FixedPoint(100),
06
      UnionPushdown,
07
      CombineFilters,
08
      PushPredicateThroughProject,
09
      PushPredicateThroughJoin,
10
      PushPredicateThroughGenerate,
11
      ColumnPruning,
12
      ProjectCollapsing,
13
      CombineLimits) ::
14
    Batch("ConstantFolding", FixedPoint(100),
15
      NullPropagation,
16
      OptimizeIn,
17
      ConstantFolding,
18
      LikeSimplification,
19
      BooleanSimplification,
20
      SimplifyFilters,
21
      SimplifyCasts,
22
      SimplifyCaseConversionExpressions) ::
23
    Batch("Decimal Optimizations", FixedPoint(100),
24
      DecimalAggregates) ::
25
    Batch("LocalRelation", FixedPoint(100),
26
      ConvertToLocalRelation) :: Nil

prepareForExcution阶段

它只做了一件很关键的事情:

 Scala |  copy code |? 
1
    val batches =
2
      Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil

在这个EnsureRequirements类里,会检查children们的partitioner是否匹配要求的Distribution!我们的错误示例中,就是因为Repartition错误的设置了自己的outputPartition才出的问题。继续往下看。

Partitioning与Distribution的关系

EnsureRequirements类的作用:

 * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
 * of input data meets the
 * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
 * each operator by inserting [[Exchange]] Operators where required.  Also ensure that the
 * required input partition ordering requirements are met.
确保paritition的input data满足Distribution的要求,以及排序要求。
先来看下什么是Distribution要求吧:
  • Distribution 基类
 * Specifies how tuples that share common expressions will be distributed when a query is executed
描述了当一个query在一组服务器并行执行后,拥有相同expressions的tuples会如何分布。
 * in parallel on many machines.  Distribution can be used to refer to two distinct physical
 * properties:
可被用于两种物理属性:
 *  – Inter-node partitioning of data: In this case the distribution describes how tuples are
 *    partitioned across physical machines in a cluster.  Knowing this property allows some
 *    operators (e.g., Aggregate) to perform partition local operations instead of global ones.
节点内的分片:在这种情况下,描述了tuples在物理集群里是如何分片的。了解这个属性后,一些算子(例如aggregate)就可以在分片里执行操作,而非集群规模。
 *  – Intra-partition ordering of data: In this case the distribution describes guarantees made
 *    about how tuples are distributed within a single partition.
节点间的分片:在这种情况下,描述了tuples在一个分片内,是如何分布的
  • UnspecifiedDistribution 直接继承Distribution,代表没有顺序和分布的保证
  • AllTuples 代表只有一个分片
  • ClusteredDistribution 代表相同“key”会放到同一个分片,或是连续分布在一个仅有的partition
  • OrderedDistribution 代表相同“key”会在同一分片,且是连续分布的
再看下什么是Partition:
  • Partitioning 基类
  • UnknownPartitioning
  • SinglePartition
  • BroadcastPartitioning
  • HashPartitioning
  • RangePartitioning
从partitioning.scala里可以看到不同的partition方式都满足那些Distribution要求:
  • UnknownPartitioning.satisfies(UnspecifiedDistribution) == true
  • SinglePartition.satisfies(ALL) == true
  • BroadcastPartitioning.satisfies(ALL) == true
  • HashPartitioning.satisfies(UnspecifiedDistribution) == true
  • HashPartitioning.satisfies(ClusteredDistribution) 在hash的expression是required的子集时 == true
  • RangePartitioning.satisfies(UnspecifiedDistribution) == true
  • RangePartitioning.satisfies(OrderedDistribution) 在排序fields 前者包含后者,或后者包含前者时,且顺序一致时 == true
  • RangePartitioning.satisfies(ClusteredDistribution) 在Range的expression是required的子集时 == true
  • 其他都是false

针对每个child,会计算是否满足distribution要求:val valid = child.outputPartitioning.satisfies(required)

并且打印如下log:(current是child.outputPartitioning)

15/11/12 15:10:03 DEBUG EnsureRequirements: Valid distribution,required: OrderedDistribution(List(userid#0 ASC)) current: SinglePartition

从上面可以看到,只有SinglePartition和RangePartitioning才满足已排序分布的要求,而我们的错误示例里,居然已经是SinglePartition了!

在org.apache.spark.sql.execution.Limit里,可以看到其override def outputPartitioning: Partitioning = SinglePartition,即只要limit操作,就会变成一个分片,这也make sense。

但org.apache.spark.sql.execution.Repartition里,由于没有override这个method,所以继承的是父类的值:override def outputPartitioning: Partitioning = child.outputPartitioning!而它的child是Limit……,所以repartition完了,该值也是SinglePartition了,满足sort排序的要求……(但实际是不满足的)

将Repartition的outputpart修改为UnknownPartitioning就可以了。这时由于不满足分布要求,EnsureRequirement会插入Exchange执行计划。

Exchange类

* Performs a shuffle that will result in the desired `newPartitioning`.  Optionally sorts each

* resulting partition based on expressions from the partition key.  It is invalid to construct an

* exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.

该类会通过shuffle获得新的分片。可选的按照partitionKey对结果分片进行排序。使用不可作用于partitionKey的newOrdering,来构造一个exchange operator是不合法的。

 

该类的doExecute()方法,分别针对HashPartitioning、RangePartitioning、SinglePartition进行处理。在这些case的必要情况下,会提前把待shuffle的对象作copy,再发送给externsorter。

 

 

* Determines whether records must be defensively copied before being sent to the shuffle.

决定在进行shuffle前是否需要保守地复制。

* Several of Spark’s shuffle components will buffer deserialized Java objects in memory. The

一些spark的shuffle组件会在内存中,缓存一定的反序列化后的java对象。

* shuffle code assumes that objects are immutable and hence does not perform its own defensive

这些shuffle代码假设对象是不可变的,所以不自行进行保守地复制。

* copying. In Spark SQL, however, operators’ iterators return the same mutable `Row` object. In

但是在sparksql里,算子的迭代器返回了可变的Row对象。

* order to properly shuffle the output of these operators, we need to perform our own copying

为了正确排序这些sparksql算子的output,我们需要先把数据copy出来。

* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it

这些copy是高代价的,所以要尽量避免它。

* whenever possible. This method encapsulates the logic for choosing when to copy.

这个方法封装了是否需要copy的逻辑。

*

* In the long run, we might want to push this logic into core’s shuffle APIs so that we don’t

* have to rely on knowledge of core internals here in SQL.

在未来,这部分逻辑会移入核心shuffle API。

*

* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.