上一篇文章从pushdown filter的角度分析了spark与parquet的结合方式,由于其中涉及不少类和数据结构,补充如下。而Dremel论文里已详细解释了repetitionLevel、definitionLevel和value SerDe协议,不再赘述。下面主要从代码实现角度。

ParquetRecordPushDownFilter

在parquet的加载过程中,维持两颗树,一颗是与requested schema对应的fields tree,另一颗是与filters对应的predicate tree。其加载过程就是不断从解压后的二级制流里获取数据,将其对应到上面两颗树的过程。

predicate tree比较简单,如上图中predicate示例,在逻辑上由filterPredicate树和其叶子节点组成的valueInspector数组构成。每读入一个primitive value(可以看到pushdown filter不支持array、map、struct操作),就获取其对应的inspectors,依次计算boolean值存储起来,如下图所示。

在一个record读取完成后,由FilteringRecordMaterializer.getCurrentRecord()触发,递归计算predicate tree的最终结果,若未通过则抛弃缓存中的record值。

在叶子节点的读取过程中,同时也会更新fields tree对应的数据缓存,如下图,针对每个record维持了一个SpecificMutableRow 。这时有两种叶子节点,一种tree level = 1是root下直接的field,如下图中的short节点;另一种tree level > 1,是nested的子节点,如下图array里下的primitive值。对于前者直接在SpecificMutableRow下有对应槽位,而后者又有一个临时的ArrayBuf缓存,仅当该field递归读完后,由end()触发,才会set回SpecificMutableRow的槽位里。

读取完record并且通过filters之后,由CatalystRecordMaterializer.getCurrentRecord()触发将缓存中的数据,装配为spark所需要的row对象,这里涉及codegen过程,虽然有code缓存,但应该仍然是挺耗时的操作。

ParquetRecordPushDownFilter-datastruct

Leave a Reply