之前提到parquet.block.size所控制的parquet row group大小是一个需要调优的spark参数。其中重要一点,就是控制任务的并发度。

在Hadoop里,任务的并发默认是以hdfs block为单位的,而Spark里多了一种选择,即以RowGroup为基本单位。以下将对比spark 1.4和我厂根据1.5打的patch版本,分析两者的实现方式和优劣。

在调用HiveContext.read.parquet(path)时,会触发ParquetRelation2对象生成SqlNewHadoopRDD对象,并覆盖其中getPartitions()方法,它是本次分析的关键入口。

Spark 1.4社区版

该版本的SqlNewHadoopRDD.getPartitions()里,使用FilteringParquetRowInputFormat类,在spark.sql.parquet.cacheMetadata=true(默认)时,会使用cachedStatus和cachedFooters缓存,覆盖父类的listStatus()和getFooters()方法。后者会触发对parquet footers的扫描

 Scala |  copy code |? 
01
        // Overridden so we can inject our own cached files statuses.
02
        override def getPartitions: Array[SparkPartition] = {
03
          val inputFormat = if (cacheMetadata) {
04
            new FilteringParquetRowInputFormat {
05
              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
06
              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
07
            }
08
          } else {
09
            new FilteringParquetRowInputFormat
10
          }
11
 
12
          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
13
          val rawSplits = inputFormat.getSplits(jobContext)
14
 
15
          Array.tabulate[SparkPartition](rawSplits.size) { i =>
16
            new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
17
          }
18
        }

如下图,在默认配置时,是以hdfsBlock为单位,进行task调度的。但可以通过设置parquet.task.side.metadata=false,使以rowGroup为单位调度(当group太小时,会合并,保证满足minsize)。但即使采用taskSideMeta,默认情况下,这时也读取了footers信息!而这是不必要的。
下图包含3个部分:
  • 中间流程:FilteringParquetRowInputFormat.getSplits(),在ParquetRelation2.buildScan()所构建SqlNewHadoopRDD的getPartitions()里被调用。
  • 左边流程:spark.sql.parquet.cacheMetadata=true(默认),使用getTaskSideSplits()。
  • 右边流程:spark.sql.parquet.cacheMetadata=fasle,使用getClientSideSplits()。

图中蓝色标识了默认分支。

getSplits

这时会产生一个问题,当默认情况下,一个rowGroup横跨多个splits时,怎么办?即可能有多个executor都收到这个RowGroup的处理请求,并分别逻辑持有一部分block数据。

如下图所示,当每个executor收到task对应的split信息时,读取所在文件的footer meta,拿到所有的rowGroups。用split指定的blocks range,去圈定应该自己处理的rowGroups,包括两种:

  • 完全在blocks range里的,肯定是自己处理
  • rowGroup中点在range里的,也是自己处理

getSplits-executor

为什么采取中点呢?在合理设置的parquet.block.size和hdfs.block.size时,两者的值应该比较接近,那么当一个hdfs block持有row group中点时,它至少拥有一多半的数据。从而保证仅有一个task会处理该rowGroup。

如下是parquet-format包里ParquetMetadataConverter.java的关键代码:

 Scala |  copy code |? 
01
  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
02
    List<RowGroup> rowGroups = metaData.getRow_groups();
03
    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
04
    for (RowGroup rowGroup : rowGroups) {
05
      long totalSize = 0;
06
      long startIndex = getOffset(rowGroup.getColumns().get(0));
07
      for (ColumnChunk col : rowGroup.getColumns()) {
08
        totalSize += col.getMeta_data().getTotal_compressed_size();
09
      }
10
      long midPoint = startIndex + totalSize / 2;
11
      if (filter.contains(midPoint)) {
12
        newRowGroups.add(rowGroup);
13
      }
14
    }
15
    metaData.setRow_groups(newRowGroups);
16
    return metaData;
17
  }

近似社区1.5版本

在构造SqlNewHadoopRDD时,不再使用FilteringParquetRowInputFormat而是直接使用ParquetInputFormat,并覆盖其listStatus方法,在spark.sql.parquet.cacheMetadata=true(默认)时,使用cachedStatuses。SqlNewHadoopRDD.getPartitions()触发ParquetInputFormat.getSplits()方法。

默认即parquet.task.side.metadata=true时,直接调用hadoop的FileInputFormat.getSplits()方法,即与hadoop的分片方式一致。

分片大小由以下3者决定:

  • 默认情况就是该hdfs文件的blockSize。
  • 如果设置maxSize比blockSize小,则可以让一个block被多个tasks处理(在executor端,通过上面的方式,保证rowGroup单次处理)。
  • 如果设置的minSize比blockSize大,则可以合并多个blocks由一个task处理。可以看出来,这也足够灵活控制并发了。

以下是hadoop-core里FileInputFormat.computeSplitSize代码:

 Scala |  copy code |? 
1
  protected long computeSplitSize(long goalSize, long minSize,
2
                                       long blockSize) {
3
    return Math.max(minSize, Math.min(goalSize, blockSize));
4
  }

而如果设置了parquet.task.side.metadata=false,则回到社区1.4版本的方式,由driver端扫描footers,根据rowGroups计算splits。

Leave a Reply