Parquet作为目前SparkSQL默认的存储方式,号称在性能与存储空间等方面达到较好地平衡。但为了达到高性能,还需要进一步调优,其中parquet.block.size是经常提到的一个配置。从字面看,它控制了parquet block的数据大小,但不同的block size会带来怎样的红利、是否适合业务要求呢?parquet支持多种encoding和compress方式,这里的block size是encoded and compressed size吗?

前一个话题较大,后续再讨论,但以我的经验而言,并不是像官方文档所说,1GB就一定比256MB好,得看应用场景。这里主要通过对parquet和spark源码的分析,阐释第二个问题。

先给结论

parquet.block.size控制的是parquet编码后、encoding后、大部分Pages compressed后的数据大小。但是是粗略的size,因为check mem时:

  • 可能有部分数据在encoder缓存中,未计算在内
  • 可能采用原始数据大小,而实际写入的是dict编码后的大小
  • 最后一个page的数据还未压缩

Parquet文件和LOG分析

下面示例中,parquet.block.size=256MB, parquet.page.size=1MB,都是默认值。在parquet里,经常混用block和row group的概念,可以认为两者相等。但需注意,这里的block与hdfs.block是两码事。

使用parquet tools可以看到meta信息:

row group 1: RC:9840100 TS:698748990 OFFSET:4

其中RC是RowCount,TS是压缩前的TotalByteSize,OFFSET是该row group的StartingPos。可以看到TS达到666MB,远超配置的256MB。

查看写入日志:

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: mem size 268,650,005 > 268,435,456: flushing 9,418,847 records to disk.

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 267,733,756

Jan 7, 2016 2:03:59 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 151,651,009B for [xxx] BINARY: 9,418,847 values, 336,901,990B raw, 151,629,488B comp, 324 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY, PLAIN], dic { 25,702 entries, 925,241B raw, 25,702B comp}

可以看到当内存用量略超过parquet.block.size时,会触发block的flush。而raw大小一般会远超parquet.block.size。

从这些表象,也可以猜测,block size应该控制的是compressed后的数据大小吧,但为了得到更加精确和确定的答案,还是看源码。

源码解释

在调用df.write.parquet()写入数据时,包括以下关键步骤(以下涉及spark和parquet-mr代码):

Spark-parquet-write-2

Spark-parquet-write从上图可以看到,check时的内存是所有columns buffered size之和,那这些项都是什么含义呢?还是先给结论:

  • repetition level、definition level、data对应最后一个page(还没真的形成page)的size,是未压缩的内存消耗大小。
  • pageWriter.getMemSize 是ColumnChunkPageWriter.getMemSize,即buf.size(),是之前flush完的pages形成的压缩后的二进制字节流的大小。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSizeInMemory() {
3
    return repetitionLevelColumn.getBufferedSize()
4
        + definitionLevelColumn.getBufferedSize()
5
        + dataColumn.getBufferedSize()
6
        + pageWriter.getMemSize();
7
  }

我们先看r level、d level和data size,对应上面代码的前3项,这个比较好理解。

其中,repetitionLevelColumn和definitionLevelColumn根据maxLevel值,可能是RunLengthBitPackingHybridValuesWriter或DevNullValuesWriter对象;dataColumn根据value type由ParquetProperties.getValuesWriter决定,例如逻辑类型string对应到底层数据type binary,一般会使用PlainBinaryDictionaryValuesWriter对象。以下举几个例子。

RunLengthBitPackingHybridValuesWriter的内存消耗实际发生在encoder里,所以直接返回其encoder.size,即RunLengthBitPackingHybridEncoder.getBufferedSize()。通过后者的writeInt(),可以看到并不是每个value都立刻写入baos内存(调用writeOrAppendBitPackedRun方法),之外还有一些缓存未计算,但误差较小。

 Scala |  copy code |? 
01
  public void writeInt(int value) throws IOException {
02
    if (value == previousValue) {
03
      // keep track of how many times we've seen this value
04
      // consecutively
05
      ++repeatCount;
06
 
07
      if (repeatCount >= 8) {
08
        // we've seen this at least 8 times, we're
09
        // certainly going to write an rle−run,
10
        // so just keep on counting repeats for now
11
        return;
12
      }
13
    } else {
14
      // This is a new value, check if it signals the end of
15
      // an rle−run
16
      if (repeatCount >= 8) {
17
        // it does! write an rle−run
18
        writeRleRun();
19
      }
20
 
21
      // this is a new value so we've only seen it once
22
      repeatCount = 1;
23
      // start tracking this value for repeats
24
      previousValue = value;
25
    }
26
 
27
    // We have not seen enough repeats to justify an rle−run yet,
28
    // so buffer this value in case we decide to write a bit−packed−run
29
    bufferedValues[numBufferedValues] = value;
30
    ++numBufferedValues;
31
 
32
    if (numBufferedValues == 8) {
33
      // we've encountered less than 8 repeated values, so
34
      // either start a new bit−packed−run or append to the
35
      // current bit−packed−run
36
      writeOrAppendBitPackedRun();
37
    }
38
  }

PlainBinaryDictionaryValuesWriter等需形成dictionary的writers,使用的不是编码后的value内存消耗(这时就是dict index了),而是原始写入的value bytes size。因为当dict编码不合适时(例如重复率没有那么高),会fallback回原始方式,这时要防止page过大。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSize() {
3
    // use raw data size to decide if we want to flush the page
4
    // so the acutual size of the page written could be much more smaller
5
    // due to dictionary encoding. This prevents page being to big when fallback happens.
6
    return rawDataByteSize;
7
  }

从上面可以看到,check时内存的计算方式,其实是比较粗略的。但计算的size确实是parquet编码(已形成repetition level和definition level),且经过encode了。那压缩发生在此之后吗?(图太大,建议点开看)

Spark-parquet-write
从上面的流程图可以看到,一旦buffered size超过parquet.block.size,就会触发flush,把最后一个分片的内容调用writePage方法写出,将一个page里所有rows的repetition、definition和data以二进制的形式拼接在一起,并针对字节流进行压缩。

还有一个需要注意的点,即dictionary对象是在flush时生成的,其内存消耗也没有计算在check mem里。不过dict的大小由配置决定,一般也就是MB级别,不会太大。

再来看下ColumnChunkPageWriter.getMemSize,为什么说它是编码、压缩后的pages字节流大小呢?

这就得结合着parquet.page.size了。这个值会被compress实现类和ColumnWriteImpl类使用。这里主要关注后者,在该类的每个writeXXX方法的最后都会调用accountForValueWritten方法,该方法检查当前column的内存消耗是否达到page.size,如果达到就也会调用writePage方法。而后者在上图可以看到,会形成字节流并压缩。所以,也可以明白,为什么parquet的document里面说,page是encoding和compress的unit了。

 Scala |  copy code |? 
01
  private void accountForValueWritten() {
02
    ++ valueCount;
03
    if (valueCount > valueCountForNextSizeCheck) {
04
      // not checking the memory used for every value
05
      long memSize = repetitionLevelColumn.getBufferedSize()
06
          + definitionLevelColumn.getBufferedSize()
07
          + dataColumn.getBufferedSize();
08
      if (memSize > pageSizeThreshold) {
09
        // we will write the current page and check again the size at the predicted middle of next page
10
        valueCountForNextSizeCheck = valueCount / 2;
11
        writePage();
12
      } else {
13
        // not reached the threshold, will check again midway
14
        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
15
      }
16
    }
17
  }

从上面还可以看到,一个完整block在写入的时候,是需要全部存放在一个executor内存里的(其实读取时也一样),所以设置时,也需要考虑下自己executor的内存是否充沛。

总结

可以粗略的认为parquet.block.size控制的是最终写入磁盘的数据大小。

由于读写都是以block为单位的,较大的block size在一定的数据量下,会减少磁盘寻址,带来较高的IO吞吐。但需要注意的是,其解压后的大小可能是磁盘大小的好几倍(我们几乎达到10倍),太大也会使后续计算时内存压力变大。

参考资料

  • https://parquet.apache.org/documentation/latest/
  • http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format
  • http://my.oschina.net/jhone/blog/517918
  • http://tajo.apache.org/docs/0.8.0/table_management/parquet.html

One Comment

  1. Parquet 文件格式 – 驴和羊 says:

    […] Parquet.block.size控制的是压缩前or压缩后的数据大小? […]

Leave a Reply