Archive for 三月, 2016

本文尝试用spark自带example streaming.NetworkWordCount为例,解释spark streaming 1.6.0的执行流程。

例子代码如下:

 Scala |  copy code |? 
01
    // Create the context with a 1 second batch size
02
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
03
    val ssc = new StreamingContext(sparkConf, Minutes(1))
04
 
05
    // Create a socket stream on target ip:port and count the
06
    // words in input stream of \n delimited text (eg. generated by 'nc')
07
    // Note that no duplication in storage level only for running locally.
08
    // Replication necessary in distributed scenario for fault tolerance.
09
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
10
    val words = lines.flatMap(_.split(" "))
11
    val wordCounts = words.map(=> (x, 1)).reduceByKey(_ + _)
12
    wordCounts.print()
13
    ssc.start()
14
    ssc.awaitTermination()

Streaming的核心是DStream类,即discretized streams。以上的socketTextStream、flatMap、map、reduceByKey、print方法分别生成了5个子类实例:SocketInputDStream<-FlatMappedDStream<-MappedDStream<-ShuffledDStream<-ForEachDStream,前面是后面的parent,可以类比RDD的依赖关系。另外,首尾分别是input和output stream。

但截至到wordCounts.print(),spark集群并没有任何实质性的操作呢,直到ssc.start(),它会触发一系列的初始化,其中包括input stream的数据输入,并使用类似RDD的方式,以outputStream为入口依次触发parent的执行。将分如下几个部分详细说明:

  1. start触发的初始化过程,即input stream任务如何提交至executors?
  2. input stream blocks的产生过程,即数据是如何被读入和缓存的?
  3. input stream如何触发后续的transform、output操作的?

Start触发的初始化过程

SparkStreaming

 

当运行在集群模式下时,driver仅仅初始化job,input的真正读取者是worker节点。如上所示,StreamingContext.start()启动streaming-start子线程,触发JobScheduler.start(),进而触发两个关键实例的start方法:ReceiverTracker.start()和JobGenerator.start()。前者会生成并异步提交input Receiver job,后者以定时器监听duration超时、生成的后续处理任务。先仅关注前者。

核心方法是ReceiverTrackerEndpoint.startReceiver(receiver, scheduledLocations)方法,它调用ssc.sparkContext.submitJob提交Job,传入的RDD是仅包含receiver信息的单元素RDD,processPartition回调方法如下,在worker端取出receiver信息,并初始化ReceiverSupervisorImpl对象,开始读取数据。

 Scala |  copy code |? 
01
      // Function to start the receiver on the worker node
02
      val startReceiverFunc: Iterator[Receiver[_]] =&gt; Unit =
03
        (iterator: Iterator[Receiver[_]]) =&gt; {
04
          if (!iterator.hasNext) {
05
            throw new SparkException(
06
              "Could not start receiver as object not found.")
07
          }
08
          // flykobe: worker拿到RDD,里面仅包含一个元素,就是receiver
09
          // 包装并且启动数据接收
10
          if (TaskContext.get().attemptNumber() == 0) {
11
            val receiver = iterator.next()
12
            assert(iterator.hasNext == false)
13
            val supervisor = new ReceiverSupervisorImpl(
14
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
15
            supervisor.start()
16
            supervisor.awaitTermination()
17
          } else {
18
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
19
          }
20
        }

注意,一个app可以有多个input streams,所以可以在该阶段触发生成多个Jobs(如何调度这些Jobs,就是后面JobScheduler等的事情了)。

Input stream blocks的产生过程

Spark Streaming的处理模型是小批量、伪实时,所以需要对读入的数据进行缓存,每隔一段时间生成一个blocks,再每隔duration时间,由这些blocks组成RDDs。

SparkStreaming-1

 

有两种方式blocks,一种是Receiver单条写入,框架处理底层细节,例如SocketInputDStream、KafkaInputDStream;另一种是Receiver直接形成block,自行控制缓存等。

以SocketInputDStream为例,它的receiver阻塞读取socket数据,每接收到一行,就调用Receiver.store()方法进行存储,进而触发上图左边的逻辑。

首先将数据暂存在BlockGenerator.currentBuffer里,BlockGenerator定时器子线程每隔spark.streaming.blockInterval时间,就调用updateCurrentBuffer()将buffer里的数据形成block,并put进blocksForPushing队列。以上可视为queue的生产过程。

BlockGenerator还有一个blockPushingThread子线程,使用keepPushingBlocks()方法阻塞监听该queue,一旦有block产生,就调用ReceiverSupervisorImpl.pushAndReportBlock()方法,按照storage level交由BlockManager.doPut存储在内存 and/or 磁盘上,在此期间可能会写WAL日志。随后通过AKKA向master的ReceiverTracker发送AddBlock消息,由ReceivedBlockTracker.addBlock()方法建立block与input streaming的关联关系。

这里需要注意的是,block的产生间隔是由spark.streaming.blockInterval控制的,默认是200ms,而非duration参数。

完成以上步骤后,worker节点里保存了blocks的数据,而master节点存储了blocks列表。

Input stream如何触发后续操作

示例里没有window操作,处理间隔仅有duration控制,相对简单。

SparkStreaming-2

 

前面提到JobGenerator的定时器线程,它每隔ssc.graph.batchDuration.milliseconds执行一次,简单生成GenerateJobs消息,由JobGenerator EventLoop子线程处理,触发JobGenerator.generateJobs()。

它首先调用ReceivedBlockTracker.allocateBlocksToBatch()方法,按需写入master端的WAL日志,并组装起(time => (streamID=>blocks, …) ),保存到实例属性timeToAllocatedBlocks HashMap里。

随后进入关键步骤,由于一个app可以有多个output streams,故调用DStreamGraph.generateJobs()依次触发。以这儿的ForEachDStream为例,它的generateJob方法触发生成parent RDD,继续触发parent.compute()方法,最终导致SocketInputDStream按需将duration里的blocks形成RDDs:

 Scala |  copy code |? 
01
 /**
02
   * Generates RDDs with blocks received by the receiver of this stream. */
03
  override def compute(validTime: Time): Option[RDD[T]] = {
04
    val blockRDD = {
05
 
06
      if (validTime &lt; graph.startTime) {
07
        // If this is called for any time before the start time of the context,
08
        // then this returns an empty RDD. This may happen when recovering from a
09
        // driver failure without any write ahead log to recover pre−failure data.
10
        new BlockRDD[T](ssc.sc, Array.empty)
11
      } else {
12
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
13
        // for this batch
14
        val receiverTracker = ssc.scheduler.receiverTracker
15
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
16
 
17
        // Register the input blocks information into InputInfoTracker
18
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
19
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
20
 
21
        // Create the BlockRDD
22
        createBlockRDD(validTime, blockInfos)
23
      }
24
    }
25
    Some(blockRDD)
26
  }

截止到这儿,已经把blocks关联到RDD了。随后调用JobScheduler.submitJobSet()异步提交Job即可。

以上大体描述了spark streaming的job触发过程,但如何对input replication、怎么触发checkpoint、WAL日志如何使用等尚未涉及。

上一篇文章从pushdown filter的角度分析了spark与parquet的结合方式,由于其中涉及不少类和数据结构,补充如下。而Dremel论文里已详细解释了repetitionLevel、definitionLevel和value SerDe协议,不再赘述。下面主要从代码实现角度。

ParquetRecordPushDownFilter

在parquet的加载过程中,维持两颗树,一颗是与requested schema对应的fields tree,另一颗是与filters对应的predicate tree。其加载过程就是不断从解压后的二级制流里获取数据,将其对应到上面两颗树的过程。

predicate tree比较简单,如上图中predicate示例,在逻辑上由filterPredicate树和其叶子节点组成的valueInspector数组构成。每读入一个primitive value(可以看到pushdown filter不支持array、map、struct操作),就获取其对应的inspectors,依次计算boolean值存储起来,如下图所示。

在一个record读取完成后,由FilteringRecordMaterializer.getCurrentRecord()触发,递归计算predicate tree的最终结果,若未通过则抛弃缓存中的record值。

在叶子节点的读取过程中,同时也会更新fields tree对应的数据缓存,如下图,针对每个record维持了一个SpecificMutableRow 。这时有两种叶子节点,一种tree level = 1是root下直接的field,如下图中的short节点;另一种tree level > 1,是nested的子节点,如下图array里下的primitive值。对于前者直接在SpecificMutableRow下有对应槽位,而后者又有一个临时的ArrayBuf缓存,仅当该field递归读完后,由end()触发,才会set回SpecificMutableRow的槽位里。

读取完record并且通过filters之后,由CatalystRecordMaterializer.getCurrentRecord()触发将缓存中的数据,装配为spark所需要的row对象,这里涉及codegen过程,虽然有code缓存,但应该仍然是挺耗时的操作。

ParquetRecordPushDownFilter-datastruct

Pushdown在传统关系型数据库里,是一种常用的查询优化手段,在分布式计算领域,由于数据跨节点传输的代价更大,故pushdown也变得尤为重要。可能由于该手段极为常见,反而找不到对其的准确定义。如果将计算过程视为一颗树,则可以简单理解为尽量将计算下沉至叶子节点,尽早过滤掉不需要的数据,从而减少上层计算的数据量。

本文尝试解读基于Parquet存储的SparkSQL pushdown实现,spark版本1.6.0,parquet版本1.7.0。非分布式计算科班出身,更多是个人思考,谬误处请指正。

面对一个查询请求,如何实现pushdown优化呢?可粗略分为两个阶段,首先找出可以pushdown的操作组合,随后在数据加载阶段予以执行。前者属于执行计划范畴,后者可纳入底层数据结构的实现。

Pushdown之执行计划

SparkSQL的输入可以是DataFrame的算子组合,也可以是sql语句,本质上其解析过程是一样的,即语法、语义的解析(Parser),优化(Optimizer),生成逻辑和物理执行计划等,串联过程在sparkexecution.QueryExecution类里。

考虑df.select(a, b, c).filter(by a).filter(by b).select(c).filter(by c)这样的查询,在optimizer阶段,需要合并多个filters(CombineFilters),并调整算子间的顺序,例如将部分filter移到select等前面(PushPredicateThroughAggregate/Generate/Join/Project)。

以较为简单的PushPredicateThroughProjection为例
优化前,sparkPlan chain是:Filter(condition, … ) ->Projection(fields, …) -> … (注:右侧plan更早,是左侧的child)。
实现为:
  1. 将projection的fields转化为map(attribute => child), e.g., ‘SELECT a + b AS c, d …’ produces Map(c -> a + b).
  2. 将filter的condition分为deterministic和nondeterministic
    1. 如果nondeterministic为空,代表所有filter condition都是确定的,可以全部pushdown,转化后为:Projection(fields, …) -> Filter(将condition里出现的alias转化为实际child, …) -> …
    2. 如果deterministic为空,代表所有都无法pushdown,直接返回,chain不变
    3. 如果两者都非空,则仅pushdown deterministic condition,转化后为:Filter(nondeterministic, … ) -> Project(fields, …) -> Filter(pushedCondition, …) -> …

对于更为复杂的查询,可参看下面带pushdown字眼的rule实现。但总而言之,在optimizer完成后,可能会产生与输入不一样的filters(可能是多个,不一定连续),并且尽量前置

 Scala |  copy code |? 
01
object DefaultOptimizer extends Optimizer {
02
  val batches =
03
    // SubQueries are only needed for analysis and can be removed before execution.
04
    Batch("Remove SubQueries", FixedPoint(100),
05
      EliminateSubQueries) ::
06
    Batch("Aggregate", FixedPoint(100),
07
      ReplaceDistinctWithAggregate,
08
      RemoveLiteralFromGroupExpressions) ::
09
    Batch("Operator Optimizations", FixedPoint(100),
10
      // Operator push down
11
      SetOperationPushDown,
12
      SamplePushDown,
13
      PushPredicateThroughJoin,
14
      PushPredicateThroughProject,
15
      PushPredicateThroughGenerate,
16
      PushPredicateThroughAggregate,
17
      ColumnPruning,
18
      // Operator combine
19
      ProjectCollapsing,
20
      CombineFilters,
21
      CombineLimits,
22
      // Constant folding
23
      NullPropagation,
24
      OptimizeIn,
25
      ConstantFolding,
26
      LikeSimplification,
27
      BooleanSimplification,
28
      RemoveDispensableExpressions,
29
      SimplifyFilters,
30
      SimplifyCasts,
31
      SimplifyCaseConversionExpressions) ::
32
    Batch("Decimal Optimizations", FixedPoint(100),
33
      DecimalAggregates) ::
34
    Batch("LocalRelation", FixedPoint(100),
35
      ConvertToLocalRelation) :: Nil
36
}

由于pushdown最终需要在数据源上执行,所以在SparkPlanner的rule DataSourceStrategy里需要找到plan chain中所有可以deterministic的filters,并传递给Parquet使用,如下代码所示。

需要说明的是PhysicalOperation,它利用scala的unapply解包方法,调用collectProjectsAndFilters()将plan chain解析为(Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression])四元组,这里我们主要关注第二个元素Seq[Expression],它是所有filters的sequence。

从代码可以见,由于涉及partition keys的filters可以通过忽略不相关分片更高效的过滤,所以pushedFilters仅包括不涉及partition keys的filters。

 Scala |  copy code |? 
01
02
    // Scanning partitioned HadoopFsRelation
03
    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
04
        if t.partitionSpec.partitionColumns.nonEmpty =&amp;amp;gt;
05
      // We divide the filter expressions into 3 parts
06
      val partitionColumns = AttributeSet(
07
        t.partitionColumns.map(=&amp;amp;gt; l.output.find(_.name == c.name).get))
08
 
09
      // Only pruning the partition keys
10
      val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
11
 
12
      // Only pushes down predicates that do not reference partition keys.
13
      val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
14
 
15
      // Predicates with both partition keys and attributes
16
      val combineFilters = filters.toSet −− partitionFilters.toSet −− pushedFilters.toSet
17
 
18
      val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
19
 
20
      logInfo {
21
        val total = t.partitionSpec.partitions.length
22
        val selected = selectedPartitions.length
23
        val percentPruned = (1 − selected.toDouble / total.toDouble) * 100
24
        s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
25
      }
26
 
27
      val scan = buildPartitionedTableScan(
28
        l,
29
        projects,
30
        pushedFilters,
31
        t.partitionSpec.partitionColumns,
32
        selectedPartitions)
33
 
34
      combineFilters
35
        .reduceLeftOption(expressions.And)
36
        .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
37

需要注意,在scan执行后,还需要执行combineFilters,它由既包含partition keys、又包含非partition keys的filters组成。

上面仅是找到所有filters的序列,但尚未判断哪些可以被pushdown,这是通过buildPartitionedTableScan触发判断的,调用链如下:

  1. 以上获取到备选pushedFilters变量
  2. 调用buildPartitionedTableScan方法,传递pushedFilters作为filters参数
    1. 生成scanBuild(requiredColumns: Seq[Attribute], filters: Array[Filter])方法,注意此filters非彼filters
    2. 调用pruneFilterProject方法,继续传递pushedFilters作为filterPredicates参数
      1. 调用pruneFilterProjectRaw方法,对scanBuild进行wrap,并继续传递pushedFilters作为filterPredicates参数
        1. 调用selectFilters方法,传入翻译为底层field names的pushedFilters作为predicates参数,返回值也叫pushedFilters,为了区分,记为realPushedFilters
          1. 遍历pushedFilters,依次调用translateFilter方法,将支持pushdown的catalyst predicate操作转变为data source的filter操作(支持=,>,<,>=,<=,and,or,not,字符串startsWith、endsWith、contain等)
        2. realPushedFilters传递给scanBuild方法,作为filter参数

以上的realPushedFilters才是spark认为可以被pushdown的filters组合。在scanBuild方法里,调用ParquetRelation.buildInternalScan方法,将realPushedFilters用FilterApi.And方法连接起来、组成一个Filter对象,再调用ParquetInputFormat.setFilterPredicate()设置为parquet.private.read.filter.predicate,从而使后面的parquet解析过程可以真正执行push down filter操作(注:Parquet pushDown不支持字符串操作,所以相关filter在这一步会被忽略)。

注意,传递给parquet的pushdown filters都是org.apache.parquet.filter2.predicate下的FilterPredicate子类对象。

Pushdown之底层执行

push down filters在Parquet里有两个应用场景:基于statistic数据的row group filter,以及record filter。前者可跳过大量数据,效率高;后者实际是在解压缩、反序列化之后进行的,仅能降低数据加载阶段产出的数据量。

RowGroupFilter

数据加载阶段task的执行由SqlNewHadoopRDD.compute()触发,针对Parquet文件,生成ParquetRecordReader对象(spark 1.6里针对flat schema有其他优化),并调用其initialize方法。在开启taskSideMeta时,在execution端执行row group filter操作(taskSideMeta=false的情况参考BLOG)。

 Scala |  copy code |? 
1
// if task.side.metadata is set, rowGroupOffsets is null
2
    if (rowGroupOffsets == null) {
3
      // then we need to apply the predicate push down filter
4
      footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
5
      MessageType fileSchema = footer.getFileMetaData().getSchema();
6
      Filter filter = getFilter(configuration);
7
      filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
8
   }

通过getFilter方法可将parquet.private.read.filter.predicate配置的pushDownFilters解析出来,并包装为FilterPredicateCompat对象,供filterRowGroups使用。后者使用Visitor模式,触发RowGroupFilter.visit方法,调用StatisticsFilter.canDrop()针对每个parquet block(即rowGroup)依次执行predicate操作。

如下是RowGroupFilter.visit代码,仅通过rowGroupFilters的blocks才有机会被解压、反序列化、继续操作:

 Scala |  copy code |? 
01
 
02
  @Override
03
  public List&amp;amp;lt;BlockMetaData&amp;amp;gt; visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
04
    FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
05
 
06
    // check that the schema of the filter matches the schema of the file
07
    SchemaCompatibilityValidator.validate(filterPredicate, schema);
08
 
09
    List&amp;amp;lt;BlockMetaData&amp;amp;gt; filteredBlocks = new ArrayList&amp;amp;lt;BlockMetaData&amp;amp;gt;();
10
 
11
    for (BlockMetaData block : blocks) {
12
      if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
13
        filteredBlocks.add(block);
14
      }
15
    }
16
 
17
    return filteredBlocks;
18
  }
19

StatisticsFilter.canDrop也是用的Visitor模式,触发StatisticsFilter.visit()重载方法,以最简单的equal过滤为例,可见仅当Parquet文件包含statistic信息时才有效。

 Scala |  copy code |? 
01
02
@Override
03
  public &amp;amp;lt;extends Comparable&amp;amp;lt;T&amp;amp;gt;&amp;amp;gt; Boolean visit(Eq&amp;amp;lt;T&amp;amp;gt; eq) {
04
    Column&amp;amp;lt;T&amp;amp;gt; filterColumn = eq.getColumn();
05
    T value = eq.getValue();
06
    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
07
    Statistics&amp;amp;lt;T&amp;amp;gt; stats = columnChunk.getStatistics();
08
 
09
    if (stats.isEmpty()) {
10
      // we have no statistics available, we cannot drop any chunks
11
      return false;
12
    }
13
 
14
    if (value == null) {
15
      // we are looking for records where v eq(null)
16
      // so drop if there are no nulls in this chunk
17
      return !hasNulls(columnChunk);
18
    }
19
 
20
    if (isAllNulls(columnChunk)) {
21
      // we are looking for records where v eq(someNonNull)
22
      // and this is a column of all nulls, so drop it
23
      return true;
24
    }
25
 
26
    // drop if value &amp;amp;lt; min || value &amp;amp;gt; max
27
    return value.compareTo(stats.genericGetMin()) &amp;amp;lt; 0 || value.compareTo(stats.genericGetMax()) &amp;amp;gt; 0;
28
  }
29

Record Filter

通过RowGroupFilter的parquet blocks,通过InternalParquetRecordReader进行读取,涉及到的核心类如下:

ParquetRecordFilter

Parquet代码里,大量使用Visitor模式,下面需要注意。

在MessageColumnIO.getRecordReader()里,构造了三个关键对象:

  1. FilteringRecordMaterializer,在一个record读取完毕后,会调用它的getCurrentRecord方法,检查predicate是否通过。该类有一个关键属性FilteringGroupConverter对象
  2. ColumnReadStoreImpl,实际读取column的入口类,并且将FilteringGroupConverter对象传递给它
  3. RecordReaderImplementation,它的read方法触发底层读取、解压、反序列化等操作,并在一个record完成后,触发FilteringRecordMaterializer.getCurrentRecord方法

另外,RecordReaderImplementation在初始化时,会针对每一个叶子节点(即存储primitive值的地方)生成对应的ColumnReaderImpl对象,后者包含FilteringPrimitiveConverter属性,是对读入的每个value执行predicate的对象,基于ColumnReadStoreImpl.FilteringGroupConverter获得。

在RecordReaderImplementation.read()里,每读入一个r、d、value tuple,就会调用ColumnReaderImpl.writeCurrentValueToConverter(),进而触发调用对属性binding的writeValue方法的调用,根据value的类型,触发FilteringPrimitiveConverter对应addXXX方法。以boolean类型为例,会执行所有valueInspectors即predicate操作:

 Scala |  copy code |? 
1
  @Override
2
  public void addBoolean(boolean value) {
3
    for (ValueInspector valueInspector : valueInspectors) {
4
      valueInspector.update(value);
5
    }
6
    delegate.addBoolean(value);
7
  }

valueInspectors是由IncrementallyUpdatedFilterPredicateBuilder生成的,其update方法会父类ValueInspector的setResult方法,设置当前predicate对象的result和isKnown两个boolean标记位。

 Scala |  copy code |? 
01
    if (clazz.equals(Boolean.class)) {                                                                      
02
      if (pred.getValue() == null) {
03
        valueInspector = new ValueInspector() {                                                             
04
          @Override
05
          public void updateNull() {                                                                        
06
            setResult(true);                                                                                
07
          }                                                                                                 
08
          
09
          @Override
10
          public void update(boolean value) {                                                               
11
            setResult(false);                                                                               
12
          }                                                                                                 
13
        };
14
      } else {
15
        final boolean target = (Boolean) (Object) pred.getValue();                                          
16
        
17
        valueInspector = new ValueInspector() {                                                             
18
          @Override
19
          public void updateNull() {                                                                        
20
            setResult(false);                                                                               
21
          }                                                                                                 
22
          
23
          @Override
24
          public void update(boolean value) {                                                               
25
            setResult(value == target);                                                                     
26
          }                                                                                                 
27
        };                                                                                                  
28
      }                                                                                                     
29
    }

RecordReaderImplementation.read()里,当完整读完一个record后,调用FilteringRecordMaterializer.getCurrentRecord方法,判断是否通过了所有的pushdown filters,若未通过则read方法返回null。具体而言,最终判断predicate是否通过,是要把inspection数组里所有result结果,通过and、or重新串联起来的,流程如下:

  1. RecordReaderImplementation.read()在完整读取一个record后,调用FilteringRecordMaterializer.getCurrentRecord()
    1. 调用IncrementallyUpdatedFilterPredicateEvaluator.evaluate(predicate根节点)
      1. Visitor模式调用predicate.accept(instance),instance完全是为了使用visitor模式构造出来的空IncrementallyUpdatedFilterPredicateEvaluator对象
        1. 调用IncrementallyUpdatedFilterPredicateEvaluator.visit()重载方法,可接受IncrementallyUpdatedFilterPredicate.And/Or或ValueInspector对象作为参数
        2. 如果是And、Or,则利用left、right IncrementallyUpdatedFilterPredicateEvaluator对象生成结果
        3. 如果是ValueInspector对象,则返回之前生成的result bool值

至此,Parquet完成了record级别的pushdown。