Archive for 一月, 2016

之前提到parquet.block.size所控制的parquet row group大小是一个需要调优的spark参数。其中重要一点,就是控制任务的并发度。

在Hadoop里,任务的并发默认是以hdfs block为单位的,而Spark里多了一种选择,即以RowGroup为基本单位。以下将对比spark 1.4和我厂根据1.5打的patch版本,分析两者的实现方式和优劣。

在调用HiveContext.read.parquet(path)时,会触发ParquetRelation2对象生成SqlNewHadoopRDD对象,并覆盖其中getPartitions()方法,它是本次分析的关键入口。

Spark 1.4社区版

该版本的SqlNewHadoopRDD.getPartitions()里,使用FilteringParquetRowInputFormat类,在spark.sql.parquet.cacheMetadata=true(默认)时,会使用cachedStatus和cachedFooters缓存,覆盖父类的listStatus()和getFooters()方法。后者会触发对parquet footers的扫描

 Scala |  copy code |? 
01
        // Overridden so we can inject our own cached files statuses.
02
        override def getPartitions: Array[SparkPartition] = {
03
          val inputFormat = if (cacheMetadata) {
04
            new FilteringParquetRowInputFormat {
05
              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
06
              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
07
            }
08
          } else {
09
            new FilteringParquetRowInputFormat
10
          }
11
 
12
          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
13
          val rawSplits = inputFormat.getSplits(jobContext)
14
 
15
          Array.tabulate[SparkPartition](rawSplits.size) { i =>
16
            new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
17
          }
18
        }

如下图,在默认配置时,是以hdfsBlock为单位,进行task调度的。但可以通过设置parquet.task.side.metadata=false,使以rowGroup为单位调度(当group太小时,会合并,保证满足minsize)。但即使采用taskSideMeta,默认情况下,这时也读取了footers信息!而这是不必要的。
下图包含3个部分:
  • 中间流程:FilteringParquetRowInputFormat.getSplits(),在ParquetRelation2.buildScan()所构建SqlNewHadoopRDD的getPartitions()里被调用。
  • 左边流程:spark.sql.parquet.cacheMetadata=true(默认),使用getTaskSideSplits()。
  • 右边流程:spark.sql.parquet.cacheMetadata=fasle,使用getClientSideSplits()。

图中蓝色标识了默认分支。

getSplits

这时会产生一个问题,当默认情况下,一个rowGroup横跨多个splits时,怎么办?即可能有多个executor都收到这个RowGroup的处理请求,并分别逻辑持有一部分block数据。

如下图所示,当每个executor收到task对应的split信息时,读取所在文件的footer meta,拿到所有的rowGroups。用split指定的blocks range,去圈定应该自己处理的rowGroups,包括两种:

  • 完全在blocks range里的,肯定是自己处理
  • rowGroup中点在range里的,也是自己处理

getSplits-executor

为什么采取中点呢?在合理设置的parquet.block.size和hdfs.block.size时,两者的值应该比较接近,那么当一个hdfs block持有row group中点时,它至少拥有一多半的数据。从而保证仅有一个task会处理该rowGroup。

如下是parquet-format包里ParquetMetadataConverter.java的关键代码:

 Scala |  copy code |? 
01
  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
02
    List<RowGroup> rowGroups = metaData.getRow_groups();
03
    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
04
    for (RowGroup rowGroup : rowGroups) {
05
      long totalSize = 0;
06
      long startIndex = getOffset(rowGroup.getColumns().get(0));
07
      for (ColumnChunk col : rowGroup.getColumns()) {
08
        totalSize += col.getMeta_data().getTotal_compressed_size();
09
      }
10
      long midPoint = startIndex + totalSize / 2;
11
      if (filter.contains(midPoint)) {
12
        newRowGroups.add(rowGroup);
13
      }
14
    }
15
    metaData.setRow_groups(newRowGroups);
16
    return metaData;
17
  }

近似社区1.5版本

在构造SqlNewHadoopRDD时,不再使用FilteringParquetRowInputFormat而是直接使用ParquetInputFormat,并覆盖其listStatus方法,在spark.sql.parquet.cacheMetadata=true(默认)时,使用cachedStatuses。SqlNewHadoopRDD.getPartitions()触发ParquetInputFormat.getSplits()方法。

默认即parquet.task.side.metadata=true时,直接调用hadoop的FileInputFormat.getSplits()方法,即与hadoop的分片方式一致。

分片大小由以下3者决定:

  • 默认情况就是该hdfs文件的blockSize。
  • 如果设置maxSize比blockSize小,则可以让一个block被多个tasks处理(在executor端,通过上面的方式,保证rowGroup单次处理)。
  • 如果设置的minSize比blockSize大,则可以合并多个blocks由一个task处理。可以看出来,这也足够灵活控制并发了。

以下是hadoop-core里FileInputFormat.computeSplitSize代码:

 Scala |  copy code |? 
1
  protected long computeSplitSize(long goalSize, long minSize,
2
                                       long blockSize) {
3
    return Math.max(minSize, Math.min(goalSize, blockSize));
4
  }

而如果设置了parquet.task.side.metadata=false,则回到社区1.4版本的方式,由driver端扫描footers,根据rowGroups计算splits。

Parquet作为目前SparkSQL默认的存储方式,号称在性能与存储空间等方面达到较好地平衡。但为了达到高性能,还需要进一步调优,其中parquet.block.size是经常提到的一个配置。从字面看,它控制了parquet block的数据大小,但不同的block size会带来怎样的红利、是否适合业务要求呢?parquet支持多种encoding和compress方式,这里的block size是encoded and compressed size吗?

前一个话题较大,后续再讨论,但以我的经验而言,并不是像官方文档所说,1GB就一定比256MB好,得看应用场景。这里主要通过对parquet和spark源码的分析,阐释第二个问题。

先给结论

parquet.block.size控制的是parquet编码后、encoding后、大部分Pages compressed后的数据大小。但是是粗略的size,因为check mem时:

  • 可能有部分数据在encoder缓存中,未计算在内
  • 可能采用原始数据大小,而实际写入的是dict编码后的大小
  • 最后一个page的数据还未压缩

Parquet文件和LOG分析

下面示例中,parquet.block.size=256MB, parquet.page.size=1MB,都是默认值。在parquet里,经常混用block和row group的概念,可以认为两者相等。但需注意,这里的block与hdfs.block是两码事。

使用parquet tools可以看到meta信息:

row group 1: RC:9840100 TS:698748990 OFFSET:4

其中RC是RowCount,TS是压缩前的TotalByteSize,OFFSET是该row group的StartingPos。可以看到TS达到666MB,远超配置的256MB。

查看写入日志:

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: mem size 268,650,005 > 268,435,456: flushing 9,418,847 records to disk.

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 267,733,756

Jan 7, 2016 2:03:59 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 151,651,009B for [xxx] BINARY: 9,418,847 values, 336,901,990B raw, 151,629,488B comp, 324 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY, PLAIN], dic { 25,702 entries, 925,241B raw, 25,702B comp}

可以看到当内存用量略超过parquet.block.size时,会触发block的flush。而raw大小一般会远超parquet.block.size。

从这些表象,也可以猜测,block size应该控制的是compressed后的数据大小吧,但为了得到更加精确和确定的答案,还是看源码。

源码解释

在调用df.write.parquet()写入数据时,包括以下关键步骤(以下涉及spark和parquet-mr代码):

Spark-parquet-write-2

Spark-parquet-write从上图可以看到,check时的内存是所有columns buffered size之和,那这些项都是什么含义呢?还是先给结论:

  • repetition level、definition level、data对应最后一个page(还没真的形成page)的size,是未压缩的内存消耗大小。
  • pageWriter.getMemSize 是ColumnChunkPageWriter.getMemSize,即buf.size(),是之前flush完的pages形成的压缩后的二进制字节流的大小。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSizeInMemory() {
3
    return repetitionLevelColumn.getBufferedSize()
4
        + definitionLevelColumn.getBufferedSize()
5
        + dataColumn.getBufferedSize()
6
        + pageWriter.getMemSize();
7
  }

我们先看r level、d level和data size,对应上面代码的前3项,这个比较好理解。

其中,repetitionLevelColumn和definitionLevelColumn根据maxLevel值,可能是RunLengthBitPackingHybridValuesWriter或DevNullValuesWriter对象;dataColumn根据value type由ParquetProperties.getValuesWriter决定,例如逻辑类型string对应到底层数据type binary,一般会使用PlainBinaryDictionaryValuesWriter对象。以下举几个例子。

RunLengthBitPackingHybridValuesWriter的内存消耗实际发生在encoder里,所以直接返回其encoder.size,即RunLengthBitPackingHybridEncoder.getBufferedSize()。通过后者的writeInt(),可以看到并不是每个value都立刻写入baos内存(调用writeOrAppendBitPackedRun方法),之外还有一些缓存未计算,但误差较小。

 Scala |  copy code |? 
01
  public void writeInt(int value) throws IOException {
02
    if (value == previousValue) {
03
      // keep track of how many times we've seen this value
04
      // consecutively
05
      ++repeatCount;
06
 
07
      if (repeatCount >= 8) {
08
        // we've seen this at least 8 times, we're
09
        // certainly going to write an rle−run,
10
        // so just keep on counting repeats for now
11
        return;
12
      }
13
    } else {
14
      // This is a new value, check if it signals the end of
15
      // an rle−run
16
      if (repeatCount >= 8) {
17
        // it does! write an rle−run
18
        writeRleRun();
19
      }
20
 
21
      // this is a new value so we've only seen it once
22
      repeatCount = 1;
23
      // start tracking this value for repeats
24
      previousValue = value;
25
    }
26
 
27
    // We have not seen enough repeats to justify an rle−run yet,
28
    // so buffer this value in case we decide to write a bit−packed−run
29
    bufferedValues[numBufferedValues] = value;
30
    ++numBufferedValues;
31
 
32
    if (numBufferedValues == 8) {
33
      // we've encountered less than 8 repeated values, so
34
      // either start a new bit−packed−run or append to the
35
      // current bit−packed−run
36
      writeOrAppendBitPackedRun();
37
    }
38
  }

PlainBinaryDictionaryValuesWriter等需形成dictionary的writers,使用的不是编码后的value内存消耗(这时就是dict index了),而是原始写入的value bytes size。因为当dict编码不合适时(例如重复率没有那么高),会fallback回原始方式,这时要防止page过大。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSize() {
3
    // use raw data size to decide if we want to flush the page
4
    // so the acutual size of the page written could be much more smaller
5
    // due to dictionary encoding. This prevents page being to big when fallback happens.
6
    return rawDataByteSize;
7
  }

从上面可以看到,check时内存的计算方式,其实是比较粗略的。但计算的size确实是parquet编码(已形成repetition level和definition level),且经过encode了。那压缩发生在此之后吗?(图太大,建议点开看)

Spark-parquet-write
从上面的流程图可以看到,一旦buffered size超过parquet.block.size,就会触发flush,把最后一个分片的内容调用writePage方法写出,将一个page里所有rows的repetition、definition和data以二进制的形式拼接在一起,并针对字节流进行压缩。

还有一个需要注意的点,即dictionary对象是在flush时生成的,其内存消耗也没有计算在check mem里。不过dict的大小由配置决定,一般也就是MB级别,不会太大。

再来看下ColumnChunkPageWriter.getMemSize,为什么说它是编码、压缩后的pages字节流大小呢?

这就得结合着parquet.page.size了。这个值会被compress实现类和ColumnWriteImpl类使用。这里主要关注后者,在该类的每个writeXXX方法的最后都会调用accountForValueWritten方法,该方法检查当前column的内存消耗是否达到page.size,如果达到就也会调用writePage方法。而后者在上图可以看到,会形成字节流并压缩。所以,也可以明白,为什么parquet的document里面说,page是encoding和compress的unit了。

 Scala |  copy code |? 
01
  private void accountForValueWritten() {
02
    ++ valueCount;
03
    if (valueCount > valueCountForNextSizeCheck) {
04
      // not checking the memory used for every value
05
      long memSize = repetitionLevelColumn.getBufferedSize()
06
          + definitionLevelColumn.getBufferedSize()
07
          + dataColumn.getBufferedSize();
08
      if (memSize > pageSizeThreshold) {
09
        // we will write the current page and check again the size at the predicted middle of next page
10
        valueCountForNextSizeCheck = valueCount / 2;
11
        writePage();
12
      } else {
13
        // not reached the threshold, will check again midway
14
        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
15
      }
16
    }
17
  }

从上面还可以看到,一个完整block在写入的时候,是需要全部存放在一个executor内存里的(其实读取时也一样),所以设置时,也需要考虑下自己executor的内存是否充沛。

总结

可以粗略的认为parquet.block.size控制的是最终写入磁盘的数据大小。

由于读写都是以block为单位的,较大的block size在一定的数据量下,会减少磁盘寻址,带来较高的IO吞吐。但需要注意的是,其解压后的大小可能是磁盘大小的好几倍(我们几乎达到10倍),太大也会使后续计算时内存压力变大。

参考资料

  • https://parquet.apache.org/documentation/latest/
  • http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format
  • http://my.oschina.net/jhone/blog/517918
  • http://tajo.apache.org/docs/0.8.0/table_management/parquet.html