之前提到parquet.block.size所控制的parquet row group大小是一个需要调优的spark参数。其中重要一点,就是控制任务的并发度。

在Hadoop里,任务的并发默认是以hdfs block为单位的,而Spark里多了一种选择,即以RowGroup为基本单位。以下将对比spark 1.4和我厂根据1.5打的patch版本,分析两者的实现方式和优劣。

在调用HiveContext.read.parquet(path)时,会触发ParquetRelation2对象生成SqlNewHadoopRDD对象,并覆盖其中getPartitions()方法,它是本次分析的关键入口。

Spark 1.4社区版

该版本的SqlNewHadoopRDD.getPartitions()里,使用FilteringParquetRowInputFormat类,在spark.sql.parquet.cacheMetadata=true(默认)时,会使用cachedStatus和cachedFooters缓存,覆盖父类的listStatus()和getFooters()方法。后者会触发对parquet footers的扫描

 Scala |  copy code |? 
01
        // Overridden so we can inject our own cached files statuses.
02
        override def getPartitions: Array[SparkPartition] = {
03
          val inputFormat = if (cacheMetadata) {
04
            new FilteringParquetRowInputFormat {
05
              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
06
              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
07
            }
08
          } else {
09
            new FilteringParquetRowInputFormat
10
          }
11
 
12
          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
13
          val rawSplits = inputFormat.getSplits(jobContext)
14
 
15
          Array.tabulate[SparkPartition](rawSplits.size) { i =>
16
            new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
17
          }
18
        }

如下图,在默认配置时,是以hdfsBlock为单位,进行task调度的。但可以通过设置parquet.task.side.metadata=false,使以rowGroup为单位调度(当group太小时,会合并,保证满足minsize)。但即使采用taskSideMeta,默认情况下,这时也读取了footers信息!而这是不必要的。
下图包含3个部分:
  • 中间流程:FilteringParquetRowInputFormat.getSplits(),在ParquetRelation2.buildScan()所构建SqlNewHadoopRDD的getPartitions()里被调用。
  • 左边流程:spark.sql.parquet.cacheMetadata=true(默认),使用getTaskSideSplits()。
  • 右边流程:spark.sql.parquet.cacheMetadata=fasle,使用getClientSideSplits()。

图中蓝色标识了默认分支。

getSplits

这时会产生一个问题,当默认情况下,一个rowGroup横跨多个splits时,怎么办?即可能有多个executor都收到这个RowGroup的处理请求,并分别逻辑持有一部分block数据。

如下图所示,当每个executor收到task对应的split信息时,读取所在文件的footer meta,拿到所有的rowGroups。用split指定的blocks range,去圈定应该自己处理的rowGroups,包括两种:

  • 完全在blocks range里的,肯定是自己处理
  • rowGroup中点在range里的,也是自己处理

getSplits-executor

为什么采取中点呢?在合理设置的parquet.block.size和hdfs.block.size时,两者的值应该比较接近,那么当一个hdfs block持有row group中点时,它至少拥有一多半的数据。从而保证仅有一个task会处理该rowGroup。

如下是parquet-format包里ParquetMetadataConverter.java的关键代码:

 Scala |  copy code |? 
01
  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
02
    List<RowGroup> rowGroups = metaData.getRow_groups();
03
    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
04
    for (RowGroup rowGroup : rowGroups) {
05
      long totalSize = 0;
06
      long startIndex = getOffset(rowGroup.getColumns().get(0));
07
      for (ColumnChunk col : rowGroup.getColumns()) {
08
        totalSize += col.getMeta_data().getTotal_compressed_size();
09
      }
10
      long midPoint = startIndex + totalSize / 2;
11
      if (filter.contains(midPoint)) {
12
        newRowGroups.add(rowGroup);
13
      }
14
    }
15
    metaData.setRow_groups(newRowGroups);
16
    return metaData;
17
  }

近似社区1.5版本

在构造SqlNewHadoopRDD时,不再使用FilteringParquetRowInputFormat而是直接使用ParquetInputFormat,并覆盖其listStatus方法,在spark.sql.parquet.cacheMetadata=true(默认)时,使用cachedStatuses。SqlNewHadoopRDD.getPartitions()触发ParquetInputFormat.getSplits()方法。

默认即parquet.task.side.metadata=true时,直接调用hadoop的FileInputFormat.getSplits()方法,即与hadoop的分片方式一致。

分片大小由以下3者决定:

  • 默认情况就是该hdfs文件的blockSize。
  • 如果设置maxSize比blockSize小,则可以让一个block被多个tasks处理(在executor端,通过上面的方式,保证rowGroup单次处理)。
  • 如果设置的minSize比blockSize大,则可以合并多个blocks由一个task处理。可以看出来,这也足够灵活控制并发了。

以下是hadoop-core里FileInputFormat.computeSplitSize代码:

 Scala |  copy code |? 
1
  protected long computeSplitSize(long goalSize, long minSize,
2
                                       long blockSize) {
3
    return Math.max(minSize, Math.min(goalSize, blockSize));
4
  }

而如果设置了parquet.task.side.metadata=false,则回到社区1.4版本的方式,由driver端扫描footers,根据rowGroups计算splits。

Parquet作为目前SparkSQL默认的存储方式,号称在性能与存储空间等方面达到较好地平衡。但为了达到高性能,还需要进一步调优,其中parquet.block.size是经常提到的一个配置。从字面看,它控制了parquet block的数据大小,但不同的block size会带来怎样的红利、是否适合业务要求呢?parquet支持多种encoding和compress方式,这里的block size是encoded and compressed size吗?

前一个话题较大,后续再讨论,但以我的经验而言,并不是像官方文档所说,1GB就一定比256MB好,得看应用场景。这里主要通过对parquet和spark源码的分析,阐释第二个问题。

先给结论

parquet.block.size控制的是parquet编码后、encoding后、大部分Pages compressed后的数据大小。但是是粗略的size,因为check mem时:

  • 可能有部分数据在encoder缓存中,未计算在内
  • 可能采用原始数据大小,而实际写入的是dict编码后的大小
  • 最后一个page的数据还未压缩

Parquet文件和LOG分析

下面示例中,parquet.block.size=256MB, parquet.page.size=1MB,都是默认值。在parquet里,经常混用block和row group的概念,可以认为两者相等。但需注意,这里的block与hdfs.block是两码事。

使用parquet tools可以看到meta信息:

row group 1: RC:9840100 TS:698748990 OFFSET:4

其中RC是RowCount,TS是压缩前的TotalByteSize,OFFSET是该row group的StartingPos。可以看到TS达到666MB,远超配置的256MB。

查看写入日志:

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: mem size 268,650,005 > 268,435,456: flushing 9,418,847 records to disk.

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 267,733,756

Jan 7, 2016 2:03:59 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 151,651,009B for [xxx] BINARY: 9,418,847 values, 336,901,990B raw, 151,629,488B comp, 324 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY, PLAIN], dic { 25,702 entries, 925,241B raw, 25,702B comp}

可以看到当内存用量略超过parquet.block.size时,会触发block的flush。而raw大小一般会远超parquet.block.size。

从这些表象,也可以猜测,block size应该控制的是compressed后的数据大小吧,但为了得到更加精确和确定的答案,还是看源码。

源码解释

在调用df.write.parquet()写入数据时,包括以下关键步骤(以下涉及spark和parquet-mr代码):

Spark-parquet-write-2

Spark-parquet-write从上图可以看到,check时的内存是所有columns buffered size之和,那这些项都是什么含义呢?还是先给结论:

  • repetition level、definition level、data对应最后一个page(还没真的形成page)的size,是未压缩的内存消耗大小。
  • pageWriter.getMemSize 是ColumnChunkPageWriter.getMemSize,即buf.size(),是之前flush完的pages形成的压缩后的二进制字节流的大小。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSizeInMemory() {
3
    return repetitionLevelColumn.getBufferedSize()
4
        + definitionLevelColumn.getBufferedSize()
5
        + dataColumn.getBufferedSize()
6
        + pageWriter.getMemSize();
7
  }

我们先看r level、d level和data size,对应上面代码的前3项,这个比较好理解。

其中,repetitionLevelColumn和definitionLevelColumn根据maxLevel值,可能是RunLengthBitPackingHybridValuesWriter或DevNullValuesWriter对象;dataColumn根据value type由ParquetProperties.getValuesWriter决定,例如逻辑类型string对应到底层数据type binary,一般会使用PlainBinaryDictionaryValuesWriter对象。以下举几个例子。

RunLengthBitPackingHybridValuesWriter的内存消耗实际发生在encoder里,所以直接返回其encoder.size,即RunLengthBitPackingHybridEncoder.getBufferedSize()。通过后者的writeInt(),可以看到并不是每个value都立刻写入baos内存(调用writeOrAppendBitPackedRun方法),之外还有一些缓存未计算,但误差较小。

 Scala |  copy code |? 
01
  public void writeInt(int value) throws IOException {
02
    if (value == previousValue) {
03
      // keep track of how many times we've seen this value
04
      // consecutively
05
      ++repeatCount;
06
 
07
      if (repeatCount >= 8) {
08
        // we've seen this at least 8 times, we're
09
        // certainly going to write an rle−run,
10
        // so just keep on counting repeats for now
11
        return;
12
      }
13
    } else {
14
      // This is a new value, check if it signals the end of
15
      // an rle−run
16
      if (repeatCount >= 8) {
17
        // it does! write an rle−run
18
        writeRleRun();
19
      }
20
 
21
      // this is a new value so we've only seen it once
22
      repeatCount = 1;
23
      // start tracking this value for repeats
24
      previousValue = value;
25
    }
26
 
27
    // We have not seen enough repeats to justify an rle−run yet,
28
    // so buffer this value in case we decide to write a bit−packed−run
29
    bufferedValues[numBufferedValues] = value;
30
    ++numBufferedValues;
31
 
32
    if (numBufferedValues == 8) {
33
      // we've encountered less than 8 repeated values, so
34
      // either start a new bit−packed−run or append to the
35
      // current bit−packed−run
36
      writeOrAppendBitPackedRun();
37
    }
38
  }

PlainBinaryDictionaryValuesWriter等需形成dictionary的writers,使用的不是编码后的value内存消耗(这时就是dict index了),而是原始写入的value bytes size。因为当dict编码不合适时(例如重复率没有那么高),会fallback回原始方式,这时要防止page过大。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSize() {
3
    // use raw data size to decide if we want to flush the page
4
    // so the acutual size of the page written could be much more smaller
5
    // due to dictionary encoding. This prevents page being to big when fallback happens.
6
    return rawDataByteSize;
7
  }

从上面可以看到,check时内存的计算方式,其实是比较粗略的。但计算的size确实是parquet编码(已形成repetition level和definition level),且经过encode了。那压缩发生在此之后吗?(图太大,建议点开看)

Spark-parquet-write
从上面的流程图可以看到,一旦buffered size超过parquet.block.size,就会触发flush,把最后一个分片的内容调用writePage方法写出,将一个page里所有rows的repetition、definition和data以二进制的形式拼接在一起,并针对字节流进行压缩。

还有一个需要注意的点,即dictionary对象是在flush时生成的,其内存消耗也没有计算在check mem里。不过dict的大小由配置决定,一般也就是MB级别,不会太大。

再来看下ColumnChunkPageWriter.getMemSize,为什么说它是编码、压缩后的pages字节流大小呢?

这就得结合着parquet.page.size了。这个值会被compress实现类和ColumnWriteImpl类使用。这里主要关注后者,在该类的每个writeXXX方法的最后都会调用accountForValueWritten方法,该方法检查当前column的内存消耗是否达到page.size,如果达到就也会调用writePage方法。而后者在上图可以看到,会形成字节流并压缩。所以,也可以明白,为什么parquet的document里面说,page是encoding和compress的unit了。

 Scala |  copy code |? 
01
  private void accountForValueWritten() {
02
    ++ valueCount;
03
    if (valueCount > valueCountForNextSizeCheck) {
04
      // not checking the memory used for every value
05
      long memSize = repetitionLevelColumn.getBufferedSize()
06
          + definitionLevelColumn.getBufferedSize()
07
          + dataColumn.getBufferedSize();
08
      if (memSize > pageSizeThreshold) {
09
        // we will write the current page and check again the size at the predicted middle of next page
10
        valueCountForNextSizeCheck = valueCount / 2;
11
        writePage();
12
      } else {
13
        // not reached the threshold, will check again midway
14
        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
15
      }
16
    }
17
  }

从上面还可以看到,一个完整block在写入的时候,是需要全部存放在一个executor内存里的(其实读取时也一样),所以设置时,也需要考虑下自己executor的内存是否充沛。

总结

可以粗略的认为parquet.block.size控制的是最终写入磁盘的数据大小。

由于读写都是以block为单位的,较大的block size在一定的数据量下,会减少磁盘寻址,带来较高的IO吞吐。但需要注意的是,其解压后的大小可能是磁盘大小的好几倍(我们几乎达到10倍),太大也会使后续计算时内存压力变大。

参考资料

  • https://parquet.apache.org/documentation/latest/
  • http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format
  • http://my.oschina.net/jhone/blog/517918
  • http://tajo.apache.org/docs/0.8.0/table_management/parquet.html

问题描述

我们有一个基于Spark的项目,接收用户的输入条件,生成spark job。其中有一段逻辑是df.groupby(cols).agg(…),平时cols都是非空的,还可以正常执行,但有一天,用户选择了一个空的cols,job就hang在那儿了~

假设stage 1对应shuffle stage,stage 2对应后面的result stage,那么stage 1可以顺利跑完,且生成了默认的200个output splits。而stage 2里,199个都可以正常完成,只有一个task,每跑必挂。fail的output:

FetchFailed(BlockManagerId(42, host1, 15656), shuffleId=0, mapId=15, reduceId=169, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to host1

问题分析

从上面的日志,看起来像是stage 2的executor请求shuffle output时挂掉,但这时shuffle server在做什么呢?从spark UI里可以看到该fail的shuffle server在stage 2里也有执行task,并且也fail了!

登录到executor的container里,可以看到其gc日志,young和old都基本使用了100%,达到15GB左右,说明数据量非常大,且都是active的数据,无法被flush掉。但stage 1总共的shuffle output也不过8.9GB。

再细看该executor的stderr log,可以看到大量生成shuffle output请求的日志,累计也差不多8.9GB了!

15/12/15 13:17:30 DEBUG storage.ShuffleBlockFetcherIterator: Creating fetch request of 97451434 at BlockManagerId 

为什么会有如此大的数据倾斜发生呢?

原因定位

看一下spark里agg的实现,最终形成物理执行计划时,对应execution/Aggregate,它对shuffle的影响如下:

 Scala |  copy code |? 
01
  override def requiredChildDistribution: List[Distribution] = {
02
    if (partial) {
03
      UnspecifiedDistribution :: Nil
04
    } else {
05
      if (groupingExpressions == Nil) {
06
        AllTuples :: Nil
07
      } else {
08
        ClusteredDistribution(groupingExpressions) :: Nil
09
      }
10
    }
11
  }

原因不言而明,这时groupingExp为空,于是需要AllTuples的分布,即所有数据在一个分片上!

经验教训

在数据量超过一个节点承受范围的时候,不要直接使用DataFrame.agg,好歹使用DataFrame.groupBy().agg(),且要注意groupBy的cols一定非空!否则只能开多多的资源,让其他executor站旁边看热闹了,造成极大的资源浪费,还会使任务有高概率挂掉。

本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。

实例伪码

hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
     .mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()

SparkUI截图

在最初观察以下UI时,有几个疑问:
  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
  3. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个

直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。

9E59B789-DDA2-495F-9998-4B85648C69C2
Job 6:
008DEE8C-F1E6-4546-9640-55F00EE1D77D
Job 7:
A9EE2B86-463F-4F6C-B296-1692C578CF48

解析全览

以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。

  1. 白色图标代表coding时的API。
  2. 灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
  3. 蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
  4. 黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。
B2FA598C-82C6-4639-9D0A-8944560CE591

在我们调用spark API时,背后发生了什么呢?

这个问题得分开看。

在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。

但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。

但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。

toJavaRDD的效果

调用该方法时,会触发dataframe的解析(全览图标注为第1步):

 Scala |  copy code |? 
1
lazy val rdd: RDD[Row] = {
2
  // use a local variable to make sure the map closure doesn't capture the whole DataFrame
3
  val schema = this.schema
4
  queryExecution.executedPlan.execute().mapPartitions { rows =>
5
    val converter = CatalystTypeConverters.createToScalaConverter(schema)
6
    rows.map(converter(_).asInstanceOf[Row])
7
  }
8
}

上面的queryExecution.executedPlan会触发以下一系列动作(注意,不包含execute()调用),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。

20426B48-26EC-47CE-982A-23408377025A

plan.execute()调用的效果

这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:

 Scala |  copy code |? 
01
/*Partitioner.RangePartitioner */
02
 
03
  def sketch[: ClassTag](
04
      rdd: RDD[K],
05
      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
06
    val shift = rdd.id
07
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
08
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
09
      val seed = byteswap32(idx ^ (shift << 16))
10
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
11
        iter, sampleSizePerPartition, seed)
12
      Iterator((idx, n, sample))
13
    }.collect()
14
    val numItems = sketched.map(_._2.toLong).sum
15
    (numItems, sketched)
16
  }

该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。

该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。

更详细地看下job6和7的RDD编号:

263C92DB-EC5D-4257-88E1-3C7A97AEA12C

  1. #279(含)之前的RDD都是主子job复用的
  2. 子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。
 Scala |  copy code |? 
1
  protected override def doExecute(): RDD[Row] = attachTree(this"sort") {
2
    child.execute().mapPartitions( { iterator =>

由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。

rdd.collect()调用的效果

终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。

问题解答

  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
    1. 因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
    1. stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
  • stage 6:hc.read.parquet(paths).filter(…).select(…)  + groupBy(all fileds).count()的前半段
  • stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
  • stage 8:被跳过,复用了stage6
  • stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
  • stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
  1. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
    1. 解答与上面一样

经验与教训

1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?

这时同样会生成两个job,且都是从hdfs读取数据了~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。

2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。

最近在做一个spark应用程序验证时,发现一个奇怪的事情,抽样处理后再sort的数据居然是无序的!最后由公司内负责spark的同学予以修复,但在追查的过程中我也恰好看了看SparkSQL的处理过程,理解了SparkSQL里DataFrame的解析、partitioner与Distribution之间的关系。

问题描述

仅当如下代码时,才会出现sort无效的情况:

 Python |  copy code |? 
1
df = hc.read.parquet(path).filter("trade1='Automobile'").filter("trade2='Default'")\
2
.filter("pdate='2015−06−03'").select("userid").limit(count).repartition(1024)\
3
.sort("userid")

而如果sort前没有同时出现limit和repartition,sort的结果就是有序的。

排查过程

直觉上如果sort结果完全无序,压根过不了spark社区,不可能发布,所以一定是在某些特定场景下才会出问题,故经过尝试首先确定了以上描述的特定错误场景。

那么它与正确场景有什么区别呢?例如hc.read.parquet().filter().select().sort()?打印它们的执行计划可以看到:

72 Sort [baiduid#87 ASC], true

73  Repartition 1024, true

74   Limit 100

75    Project [baiduid#87]

76     Filter (((trade1#96 = Automobile) && (trade2#97 = Default)) && (pdate#98 = 2015-06-03))

77      PhysicalRDD [baiduid#87,trade1#96,trade2#97,pdate#98], UnionRDD[63] at toString at NativeMethodAccessorImpl.java:-2

 

而正常情况下:

150 Sort [baiduid#267 ASC], true

151  Exchange (RangePartitioning 200)

152   Repartition 1024, true

153    Project [baiduid#267]

154     Filter (((trade1#276 = Automobile) && (trade2#277 = Default)) && (pdate#278 = 2015-06-03))

155      PhysicalRDD [baiduid#267,trade1#276,trade2#277,pdate#278], UnionRDD[203] at toString at NativeMethodAccessorImpl.java:-2

可以看到正常时多了一个Exchange步骤,但我们的代码里肯定没有显式调用它。

修复方式及原因总结

在execution/basicOperators.scala的Repartition添加如下代码:
 Scala |  copy code |? 
1
  override def outputPartitioning = {
2
    if (numPartitions == 1) SinglePartition
3
    else UnknownPartitioning(numPartitions)
4
  }

原因是如果没有override,该值=child.outputPartitioning,在此场景是Limit.outputParititioning = SinglePartition,意味着满足sort分布要求(但实际却是无序的)。

要想修复这个bug,还得对spark的执行过程有所了解,所以我们先来看它。

SparkSQL解析过程分析

我们使用hc.read.parquet()初始化一个dataframe时,此时需关注的是LogicalPlan:

 Scala |  copy code |? 
1
      sqlContext.baseRelationToDataFrame(
2
        new ParquetRelation2(
3
          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))

即此时的logicalPlan = LogicalRelation(ParquetRelation2)

而filter、select、sort等操作,都会将之前的dataframe.logicalPlan作为child,再生成包含新的logicalPlan的新的DataFrame。在我们的问题示例代码里,拥有这些LogicalPlan(按生成的先后顺序):

  1. LogicalRelation(ParquetRelation2) — parquet生成
  2. Filter 3个
  3. Project
  4. Limit
  5. Repartition
  6. Sort  — 最终的LogicalPlan

假定我们此时通过针对Sort的dataframe.collect()触发spark执行,其实是触发了

queryExecution.executedPlan.executeCollect()的执行,这儿是对SQLContext内部类QueryExecution的lazy变量executedPlan的调用,并间接触发了一系列过程(为了描述方便,忽略了细节):

  1. analyzed = analyzer.execute(logical)
  2. optimizedPlan = optimizer.execute(withCachedData)
  3. sparkPlan = planner.plan(optimizedPlan).next()
  4. executedPlan = prepareForExecution.execute(sparkPlan)

(当然,如果调用的不是DataFrame api而是sql的话,还需要parseSQL过程,这里忽略了)

Analyzer过程

它会调用一堆filter chain来处理这些logicalPlan tree,通过以下代码可以看到:

 Bash |  copy code |? 
01
/* Analyzer.scala */
02
  lazy val batches: Seq[Batch] = Seq(
03
    Batch("Substitution", fixedPoint,
04
      CTESubstitution ::
05
      WindowsSubstitution ::
06
      Nil : _*),
07
    Batch("Resolution", fixedPoint,
08
      ResolveRelations ::
09
      ResolveReferences ::
10
      ResolveGroupingAnalytics ::
11
      ResolveSortReferences ::
12
      ResolveGenerate ::
13
      ResolveFunctions ::
14
      ExtractWindowExpressions ::
15
      GlobalAggregates ::
16
      UnresolvedHavingClauseAttributes ::
17
      TrimGroupingAliases ::
18
      typeCoercionRules ++
19
      extendedResolutionRules : _*)
20
  )
21
 
22
/* HiveTypeCoercion */
23
  val typeCoercionRules =
24
    PropagateTypes ::
25
    ConvertNaNs ::
26
    InConversion ::
27
    WidenTypes ::
28
    PromoteStrings ::
29
    DecimalPrecision ::
30
    BooleanComparisons ::
31
    BooleanCasts ::
32
    StringToIntegralCasts ::
33
    FunctionArgumentConversion ::
34
    CaseWhenCoercion ::
35
    Division ::
36
    PropagateTypes ::
37
    ExpectedInputConversion ::
38
    Nil
39
 
40
/* SQLContext.scala */
41
    new Analyzer(catalog, functionRegistry, conf) {
42
      override val extendedResolutionRules =
43
        ExtractPythonUdfs ::
44
        sources.PreInsertCastAndRename ::
45
        Nil
46
 
47
      override val extendedCheckRules = Seq(
48
        sources.PreWriteCheck(catalog)
49
      )
50
    }

 

该过程主要将各个table filed绑定到真正的table结构上去,也会发现field错误、类型错误、类型适配等。但整体而言,它只是完成分析过程,不会改变执行计划。

Optimizer过程

故名思议,这一阶段会进行优化,如果sql或api调用写的太烂,这一阶段完成,可能会跟原始语句有较大变化。但本质上,它只是把本地可以得出结论的语句优化掉(例如针对非nullable的字段判断is null,那肯定是true的),或者把可合并的合并了(例如多个filter、select等),也会稍微调整filter、union等的顺序。

 Scala |  copy code |? 
01
  val batches =
02
    // SubQueries are only needed for analysis and can be removed before execution.
03
    Batch("Remove SubQueries", FixedPoint(100),
04
      EliminateSubQueries) ::
05
    Batch("Operator Reordering", FixedPoint(100),
06
      UnionPushdown,
07
      CombineFilters,
08
      PushPredicateThroughProject,
09
      PushPredicateThroughJoin,
10
      PushPredicateThroughGenerate,
11
      ColumnPruning,
12
      ProjectCollapsing,
13
      CombineLimits) ::
14
    Batch("ConstantFolding", FixedPoint(100),
15
      NullPropagation,
16
      OptimizeIn,
17
      ConstantFolding,
18
      LikeSimplification,
19
      BooleanSimplification,
20
      SimplifyFilters,
21
      SimplifyCasts,
22
      SimplifyCaseConversionExpressions) ::
23
    Batch("Decimal Optimizations", FixedPoint(100),
24
      DecimalAggregates) ::
25
    Batch("LocalRelation", FixedPoint(100),
26
      ConvertToLocalRelation) :: Nil

prepareForExcution阶段

它只做了一件很关键的事情:

 Scala |  copy code |? 
1
    val batches =
2
      Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil

在这个EnsureRequirements类里,会检查children们的partitioner是否匹配要求的Distribution!我们的错误示例中,就是因为Repartition错误的设置了自己的outputPartition才出的问题。继续往下看。

Partitioning与Distribution的关系

EnsureRequirements类的作用:

 * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
 * of input data meets the
 * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
 * each operator by inserting [[Exchange]] Operators where required.  Also ensure that the
 * required input partition ordering requirements are met.
确保paritition的input data满足Distribution的要求,以及排序要求。
先来看下什么是Distribution要求吧:
  • Distribution 基类
 * Specifies how tuples that share common expressions will be distributed when a query is executed
描述了当一个query在一组服务器并行执行后,拥有相同expressions的tuples会如何分布。
 * in parallel on many machines.  Distribution can be used to refer to two distinct physical
 * properties:
可被用于两种物理属性:
 *  – Inter-node partitioning of data: In this case the distribution describes how tuples are
 *    partitioned across physical machines in a cluster.  Knowing this property allows some
 *    operators (e.g., Aggregate) to perform partition local operations instead of global ones.
节点内的分片:在这种情况下,描述了tuples在物理集群里是如何分片的。了解这个属性后,一些算子(例如aggregate)就可以在分片里执行操作,而非集群规模。
 *  – Intra-partition ordering of data: In this case the distribution describes guarantees made
 *    about how tuples are distributed within a single partition.
节点间的分片:在这种情况下,描述了tuples在一个分片内,是如何分布的
  • UnspecifiedDistribution 直接继承Distribution,代表没有顺序和分布的保证
  • AllTuples 代表只有一个分片
  • ClusteredDistribution 代表相同“key”会放到同一个分片,或是连续分布在一个仅有的partition
  • OrderedDistribution 代表相同“key”会在同一分片,且是连续分布的
再看下什么是Partition:
  • Partitioning 基类
  • UnknownPartitioning
  • SinglePartition
  • BroadcastPartitioning
  • HashPartitioning
  • RangePartitioning
从partitioning.scala里可以看到不同的partition方式都满足那些Distribution要求:
  • UnknownPartitioning.satisfies(UnspecifiedDistribution) == true
  • SinglePartition.satisfies(ALL) == true
  • BroadcastPartitioning.satisfies(ALL) == true
  • HashPartitioning.satisfies(UnspecifiedDistribution) == true
  • HashPartitioning.satisfies(ClusteredDistribution) 在hash的expression是required的子集时 == true
  • RangePartitioning.satisfies(UnspecifiedDistribution) == true
  • RangePartitioning.satisfies(OrderedDistribution) 在排序fields 前者包含后者,或后者包含前者时,且顺序一致时 == true
  • RangePartitioning.satisfies(ClusteredDistribution) 在Range的expression是required的子集时 == true
  • 其他都是false

针对每个child,会计算是否满足distribution要求:val valid = child.outputPartitioning.satisfies(required)

并且打印如下log:(current是child.outputPartitioning)

15/11/12 15:10:03 DEBUG EnsureRequirements: Valid distribution,required: OrderedDistribution(List(userid#0 ASC)) current: SinglePartition

从上面可以看到,只有SinglePartition和RangePartitioning才满足已排序分布的要求,而我们的错误示例里,居然已经是SinglePartition了!

在org.apache.spark.sql.execution.Limit里,可以看到其override def outputPartitioning: Partitioning = SinglePartition,即只要limit操作,就会变成一个分片,这也make sense。

但org.apache.spark.sql.execution.Repartition里,由于没有override这个method,所以继承的是父类的值:override def outputPartitioning: Partitioning = child.outputPartitioning!而它的child是Limit……,所以repartition完了,该值也是SinglePartition了,满足sort排序的要求……(但实际是不满足的)

将Repartition的outputpart修改为UnknownPartitioning就可以了。这时由于不满足分布要求,EnsureRequirement会插入Exchange执行计划。

Exchange类

* Performs a shuffle that will result in the desired `newPartitioning`.  Optionally sorts each

* resulting partition based on expressions from the partition key.  It is invalid to construct an

* exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.

该类会通过shuffle获得新的分片。可选的按照partitionKey对结果分片进行排序。使用不可作用于partitionKey的newOrdering,来构造一个exchange operator是不合法的。

 

该类的doExecute()方法,分别针对HashPartitioning、RangePartitioning、SinglePartition进行处理。在这些case的必要情况下,会提前把待shuffle的对象作copy,再发送给externsorter。

 

 

* Determines whether records must be defensively copied before being sent to the shuffle.

决定在进行shuffle前是否需要保守地复制。

* Several of Spark’s shuffle components will buffer deserialized Java objects in memory. The

一些spark的shuffle组件会在内存中,缓存一定的反序列化后的java对象。

* shuffle code assumes that objects are immutable and hence does not perform its own defensive

这些shuffle代码假设对象是不可变的,所以不自行进行保守地复制。

* copying. In Spark SQL, however, operators’ iterators return the same mutable `Row` object. In

但是在sparksql里,算子的迭代器返回了可变的Row对象。

* order to properly shuffle the output of these operators, we need to perform our own copying

为了正确排序这些sparksql算子的output,我们需要先把数据copy出来。

* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it

这些copy是高代价的,所以要尽量避免它。

* whenever possible. This method encapsulates the logic for choosing when to copy.

这个方法封装了是否需要copy的逻辑。

*

* In the long run, we might want to push this logic into core’s shuffle APIs so that we don’t

* have to rely on knowledge of core internals here in SQL.

在未来,这部分逻辑会移入核心shuffle API。

*

* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.

 

宏观架构,关注系统层面的设计。

《面向模式的软件体系结构》共5卷,集大成者,内容非常全面(虽然略显陈旧),对于想要建立架构观念的同学非常好。我看的时候,已经绝版,还是从淘宝买的复印版,翻译不好,但望文生义地猜去,仍然学到不少。目前amazon上有前3卷可以买。

架构之美》,几个架构实例,从企业级到互联网,覆盖了需求分析、技术选型、折衷、架构设计,思考和陈述方式值得借鉴。

架构实战 – 软件架构设计的过程》,不过多涉及技术,而是通过实例完整列出架构过程,如何开始、各阶段参与的角色、产出啥、如何验证等等,对于刚开始做架构的同学,至少知道按图索骥,不至于落了东西。

思考软件,创新设计 – A端架构师的思考技术》,高焕堂出品。在我苦闷的思考,业务架构师价值、如何才能牛X的时候,给了一盏还算明亮的小灯。读来轻松,可以一试。

微观架构,关注代码层面的设计。

重构 – 改善既有代码的设计》,大神Martin Fowler的著作,必须一读。这本书让我爱上重构,知道什么是漂亮的代码,也在对代码的不断优化中进步。

设计模式》,如此经典的书籍,没看过2、3遍都不好意思吧?首先,说明了平时编码中用到的那些小技术都是啥;而一旦完成抽象这一步,就可以在大脑里比较各种实现方式的优劣,完成模式选型。同时,也便于交流,说一个生产消费就知道要干啥了。

代码整洁之道》,还是讲什么是好代码的书。

《代码大全》,很多人推崇的书,巨厚。说实话没完整阅读过。

不管是架构师,还是工程师,代码是看家本领,所以以上几本书还是建议仔细、反复阅读,并在coding当中理解。

其他补充推荐

架构师除了关注技术,流程、可测试性、人、资源协调等等都需要cover到,所以再补充几本书。

人月神话》,经典书籍,不管懂不懂都值得一读。

《JUnit实战》、《渗透测试实践指南》等,没看完,想强调的是,架构师得了解并关注测试。

UNIX编程艺术》,你可能不写C,但UNIX的设计思想,强调至简至美的调调,绝对值得学习,尤其这本书还很好笑。

金字塔原理》,作为架构师,可能会经常写PPT、文档,与人沟通,这本书教会我说人话。

再来本打鸡血的书,让我们缅怀着黑客的美好岁月前行:

黑客》,前半本就是硬件与自由软件的黄金岁月,后半本我读不下去。

最后,我认为,一个好的架构师,除了耍嘴皮,必然也对所在领域有非常深入的了解,所以除了这里推荐的架构书籍,请务必深挖你本身就擅长的技术。

 

How-to: Tune Your Apache Spark Jobs (Part 1)

Learn techniques for tuning your Apache Spark jobs for optimal efficiency.

When you write Apache Spark code and page through the public APIs, you come across words like transformation,action, and RDD. Understanding Spark at this level is vital for writing Spark programs. Similarly, when things start to fail, or when you venture into the web UI to try to understand why your application is taking so long, you’re confronted with a new vocabulary of words like job, stage, and task. Understanding Spark at this level is vital for writing goodSpark programs, and of course by good, I mean fast. To write a Spark program that will execute efficiently, it is very, very helpful to understand Spark’s underlying execution model.

In this post, you’ll learn the basics of how Spark programs are actually executed on a cluster. Then, you’ll get some practical recommendations about what Spark’s execution model means for writing efficient programs.

How Spark Executes Your Program

A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.

The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running, although dynamic resource allocation changes that for the latter. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime. Deploying these processes on the cluster is up to the cluster manager in use (YARN, Mesos, or Spark Standalone), but the driver and executor themselves exist in every Spark application.

At the top of the execution hierarchy are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it. To decide what this job looks like, Spark examines the graph of RDDs on which that action depends and formulates an execution plan. This plan starts with the farthest-back RDDs—that is, those that depend on no other RDDs or reference already-cached data–and culminates in the final RDD required to produce the action’s results.

The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

What determines whether data needs to be shuffled? Recall that an RDD comprises a fixed number of partitions, each of which comprises a number of records. For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD. Each object is only dependent on a single object in the parent. Operations like coalesce can result in a task processing multiple input partitions, but the transformation is still considered narrow because the input records used to compute any single output record can still only reside in a limited subset of the partitions.

However, Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

For example, consider the following code:

It executes a single action, which depends on a sequence of transformations on an RDD derived from a text file. This code would execute in a single stage, because none of the outputs of these three operations depend on data that can come from different partitions than their inputs.

In contrast, this code finds how many times each character appears in all the words that appear more than 1,000 times in a text file.

This process would break down into three stages. The reduceByKey operations result in stage boundaries, because computing their outputs requires repartitioning the data by keys.

Here is a more complicated transformation graph including a join transformation with multiple dependencies.

The pink boxes show the resulting stage graph used to execute it.

At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible. The number of data partitions in the parent stage may be different than the number of partitions in the child stage. Transformations that may trigger a stage boundary typically accept a numPartitionsargument that determines how many partitions to split the data into in the child stage.

Just as the number of reducers is an important parameter in tuning MapReduce jobs, tuning the number of partitions at stage boundaries can often make or break an application’s performance. We’ll delve deeper into how to tune this number in a later section.

Picking the Right Operators

When trying to accomplish something with Spark, a developer can usually choose from many arrangements of actions and transformations that will produce the same results. However, not all these arrangements will result in the same performance: avoiding common pitfalls and picking the right arrangement can make a world of difference in an application’s performance. A few rules and insights will help you orient yourself when these choices come up.

Recent work in SPARK-5097 began stabilizing SchemaRDD, which will open up Spark’s Catalyst optimizer to programmers using Spark’s core APIs, allowing Spark to make some higher-level choices about which operators to use. When SchemaRDD becomes a stable component, users will be shielded from needing to make some of these decisions.

The primary goal when choosing an arrangement of operators is to reduce the number of shuffles and the amount of data shuffled. This is because shuffles are fairly expensive operations; all shuffle data must be written to disk and then transferred over the network. repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. Not all these operations are equal, however, and a few of the most common performance pitfalls for novice Spark developers arise from picking the wrong one:

  • Avoid groupByKey when performing an associative reductive operation. For example,rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _). However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
  • Avoid reduceByKey When the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. One way would be to use map to transform each element into a Set and then combine the Sets with reduceByKey:

    This code results in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:

  • Avoid the flatMap-join-groupBy pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.

When Shuffles Don’t Happen

It’s also useful to be aware of the cases in which the above transformations will not result in shuffles. Spark knows to avoid a shuffle when a previous transformation has already partitioned the data according to the same partitioner. Consider the following flow:

Because no partitioner is passed to reduceByKey, the default partitioner will be used, resulting in rdd1 and rdd2 both hash-partitioned. These two reduceByKeys will result in two shuffles. If the RDDs have the same number of partitions, the join will require no additional shuffling. Because the RDDs are partitioned identically, the set of keys in any single partition of rdd1 can only show up in a single partition of rdd2. Therefore, the contents of any single output partition of rdd3 will depend only on the contents of a single partition in rdd1 and single partition in rdd2, and a third shuffle is not required.

For example, if someRdd has four partitions, someOtherRdd has two partitions, and both the reduceByKeys use three partitions, the set of tasks that execute would look like:

What if rdd1 and rdd2 use different partitioners or use the default (hash) partitioner with different numbers partitions?  In that case, only one of the rdds (the one with the fewer number of partitions) will need to be reshuffled for the join.

Same transformations, same inputs, different number of partitions:

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor. A map transformation can then reference the hash table to do lookups.

When More Shuffles are Better

There is an occasional exception to the rule of minimizing the number of shuffles. An extra shuffle can be advantageous to performance when it increases parallelism. For example, if your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions (which will trigger a shuffle) after loading the data will allow the operations that come after it to leverage more of the cluster’s CPU.

Another instance of this exception can arise when using the reduce or aggregate action to aggregate data into the driver. When aggregating over a high number of partitions, the computation can quickly become bottlenecked on a single thread in the driver merging all the results together. To loosen the load on the driver, one can first usereduceByKey or aggregateByKey to carry out a round of distributed aggregation that divides the dataset into a smaller number of partitions. The values within each partition are merged with each other in parallel, before sending their results to the driver for a final round of aggregation. Take a look at treeReduce and treeAggregate for examples of how to do that. (Note that in 1.2, the most recent version at the time of this writing, these are marked as developer APIs, but SPARK-5430 seeks to add stable versions of them in core.)

This trick is especially useful when the aggregation is already grouped by a key. For example, consider an app that wants to count the occurrences of each word in a corpus and pull the results into the driver as a map.  One approach, which can be accomplished with the aggregate action, is to compute a local map at each partition and then merge the maps at the driver. The alternative approach, which can be accomplished with aggregateByKey, is to perform the count in a fully distributed way, and then simply collectAsMap the results to the driver.

Secondary Sort

Another important capability to be aware of is the repartitionAndSortWithinPartitions transformation. It’s a transformation that sounds arcane, but seems to come up in all sorts of strange situations. This transformation pushes sorting down into the shuffle machinery, where large amounts of data can be spilled efficiently and sorting can be combined with other operations.

For example, Apache Hive on Spark uses this transformation inside its join implementation. It also acts as a vital building block in the secondary sort pattern, in which you want to both group records by key and then, when iterating over the values that correspond to a key, have them show up in a particular order. This issue comes up in algorithms that need to group events by user and then analyze the events for each user based on the order they occurred in time. Taking advantage of repartitionAndSortWithinPartitions to do secondary sort currently requires a bit of legwork on the part of the user, but SPARK-3655 will simplify things vastly.

Conclusion

You should now have a good understanding of the basic factors in involved in creating a performance-efficient Spark program! In Part 2, we’ll cover tuning resource requests, parallelism, and data structures.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Spark committer, and an Apache Hadoop PMC member. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

How-to: Tune Your Apache Spark Jobs (Part 2)

In the conclusion to this series, learn how resource tuning, parallelism, and data representation affect Spark job performance.

In this post, we’ll finish what we started in “How to Tune Your Apache Spark Jobs (Part 1)”. I’ll try to cover pretty much everything you could care to know about making a Spark program run fast. In particular, you’ll learn about resource tuning, or configuring Spark to take advantage of everything the cluster has to offer. Then we’ll move to tuning parallelism, the most difficult as well as most important parameter in job performance. Finally, you’ll learn about representing the data itself, in the on-disk form which Spark will read (spoiler alert: use Apache Avro or Apache Parquet) as well as the in-memory format it takes as it’s cached or moves through the system.

Tuning Resource Allocation

The Spark user list is a litany of questions to the effect of “I have a 500-node cluster, but when I run my application, I see only two tasks executing at a time. HALP.” Given the number of parameters that control Spark’s resource utilization, these questions aren’t unfair, but in this section you’ll learn how to squeeze every last bit of juice out of your cluster. The recommendations and configurations here differ a little bit between Spark’s cluster managers (YARN, Mesos, and Spark Standalone), but we’re going to focus only on YARN, which Cloudera recommends to all users.

For some background on what it looks like to run Spark on YARN, check out my post on this topic.

The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.

Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on aSparkConf object. Similarly, the heap size can be controlled with the --executor-cores flag or thespark.executor.memory property. The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.

The --num-executors command-line flag or spark.executor.instances configuration property control the number of executors requested. Starting in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamicAllocation.enabled property. Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.

It’s also important to think about how the resources requested by Spark will fit into what YARN has available. The relevant YARN properties are:

  • yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.

Asking for five executor cores will result in a request to YARN for five virtual cores. The memory requested from YARN is a little more complex for a couple reasons:

  • --executor-memory/spark.executor.memory controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of thespark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
  • YARN may round the requested memory up a little. YARN’s yarn.scheduler.minimum-allocation-mb andyarn.scheduler.increment-allocation-mb properties control the minimum and increment request values respectively.

The following (not to scale with defaults) shows the hierarchy of memory properties in Spark and YARN:

And if that weren’t enough to think about, a few final concerns when sizing Spark executors:

  • The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the --driver-memory and --driver-cores properties.
  • Running executors with too much memory often results in excessive garbage collection delays. 64GB is a rough guess at a good upper limit for a single executor.
  • I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.
  • Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb andyarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically.

The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:

  • 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
  • The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
  • 15 cores per executor can lead to bad HDFS I/O throughput.

A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?

  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
  • --executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07 = 1.47.  21 – 1.47 ~ 19.

Tuning Parallelism

Spark, as you have likely figured out by this point, is a parallel processing engine. What is maybe less obvious is that Spark is not a “magic” parallel processing engine, and is limited in its ability to figure out the optimal amount of parallelism. Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.

How is this number determined? The way Spark groups RDDs into stages is described in the previous post. (As a quick reminder, transformations like repartition and reduceByKey induce stage boundaries.) The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: thecoalescetransformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.

What about RDDs with no parents? RDDs produced by textFile or hadoopFile have their partitions determined by the underlying MapReduce InputFormat that’s used. Typically there will be a partition for each HDFS block being read. Partitions for RDDs produced by parallelize come from the parameter given by the user, orspark.default.parallelism if none is given.

To determine the number of partitions in an RDD, you can always call rdd.partitions().size().

The primary concern is that the number of tasks will be too small. If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available.

A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task. Any join, cogroup, or *ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger.

When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue. First, holding many records in these data structures puts pressure on garbage collection, which can lead to pauses down the line. Second, when the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting. This overhead during large shuffles is probably the number one cause of job stalls I have seen at Cloudera customers.

So how do you increase the number of partitions? If the stage in question is reading from Hadoop, your options are:

  • Use the repartition transformation, which will trigger a shuffle.
  • Configure your InputFormat to create more splits.
  • Write the input data out to HDFS with a smaller block size.

If the stage is getting its input from another stage, the transformation that triggered the stage boundary will accept anumPartitions argument, such as

What should “X” be? The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving.

There is also a more principled way of calculating X, but it’s difficult to apply a priori because some of the quantities are difficult to calculate. I’m including it here not because it’s recommended for daily use, but because it helps with understanding what’s going on. The main goal is to run enough tasks so that the data destined for each task fits in the memory available to that task.

The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively.

The in-memory size of the total shuffle data is harder to determine. The closest heuristic is to find the ratio between Shuffle Spill (Memory) metric and the Shuffle Spill (Disk) for a stage that ran. Then multiply the total shuffle write by this number. However, this can be somewhat compounded if the stage is doing a reduction:

Then round up a bit because too many partitions is usually better than too few partitions.

In fact, when in doubt, it’s almost always better to err on the side of a larger number of tasks (and thus partitions). This advice is in contrast to recommendations for MapReduce, which requires you to be more conservative with the number of tasks. The difference stems from the fact that MapReduce has a high startup overhead for tasks, while Spark does not.

Slimming Down Your Data Structures

Data flows through Spark in the form of records. A record has two representations: a deserialized Java object representation and a serialized binary representation. In general, Spark uses the deserialized representation for records in memory and the serialized representation for records stored on disk or being transferred over the network. There is work planned to store some in-memory shuffle data in serialized form.

The spark.serializer property controls the serializer that’s used to convert between these two representations. The Kryo serializer, org.apache.spark.serializer.KryoSerializer, is the preferred option. It is unfortunately not the default, because of some instabilities in Kryo during earlier versions of Spark and a desire not to break compatibility, but the Kryo serializer should always be used

The footprint of your records in these two representations has a massive impact on Spark performance. It’s worthwhile to review the data types that get passed around and look for places to trim some fat.

Bloated deserialized objects will result in Spark spilling data to disk more often and reduce the number of deserialized records Spark can cache (e.g. at the MEMORY storage level). The Spark tuning guide has a great section on slimming these down.

Bloated serialized objects will result in greater disk and network I/O, as well as reduce the number of serialized records Spark can cache (e.g. at the MEMORY_SER storage level.)  The main action item here is to make sure to register any custom classes you define and pass around using the SparkConf#registerKryoClasses API.

Data Formats

Whenever you have the power to make the decision about how data is stored on disk, use an extensible binary format like Avro, Parquet, Thrift, or Protobuf. Pick one of these formats and stick to it. To be clear, when one talks about using Avro, Thrift, or Protobuf on Hadoop, they mean that each record is a Avro/Thrift/Protobuf struct stored in a sequence file. JSON is just not worth it.

Every time you consider storing lots of data in JSON, think about the conflicts that will be started in the Middle East, the beautiful rivers that will be dammed in Canada, or the radioactive fallout from the nuclear plants that will be built in the American heartland to power the CPU cycles spent parsing your files over and over and over again. Also, try to learn people skills so that you can convince your peers and superiors to do this, too.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Spark committer, and an Apache Hadoop PMC member. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

 

zz from:

  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

业务场景及问题描述

一个分析类项目,由上层用户定制多种分析需求,每个需求可能涉及GB甚至TB级别数据。源数据是被打上多种标签的用户行为日志,如果每个标签KEY对应一列,则可能存在几千列,且较为稀疏。由于标签是策略挖掘出来的,故经常发生变化,对新入库数据产生影响。

这就要求:

  1. 底层存储读取性能高,不拖累计算耗时;压缩比高,尽量节省存储空间
  2. 数据Schema支持动态扩展,且后向兼容性好

计算平台基于Spark,存储结构自然想到使用columnar结构,在多种paper和性能测试里看到parquet基本满足需求,倾向于选择它。但是使用tags map<string, array>存储标签数据,还是tag_xxx array的格式呢?两者是否都能使用到spark的pushdown功能,以跳过不需要的columns呢?

以下将从源码和日志两方面进行分析,结论是都可以使用到pushdown,性能理论上基本持平(未数据验证)。而显然map方式的扩展性更好,map key的增删可以完全由业务层控制。

SparkSQL源码分析

网上对SparkSQL的解析处理过程分析较多,例如:

  • http://www.csdn.net/article/2014-07-15/2820658/2
  • http://blog.csdn.net/oopsoom/article/details/37658021
  • http://mmicky.blog.163.com/blog/static/150290154201487102349800/ (共十篇,比较全面)

SparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

sparkSQL1.1总体上由四个模块组成:

  • core 处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
  • catalyst 处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
  • hive 对hive数据的处理
  • hive-ThriftServer 提供CLI和JDBC/ODBC接口

saveAsParquetFile

以下直接深入与Parquet文件读写相关代码,假设我们调用了schemaRDD.saveAsParquetFile(…),调用链上核心的类如下:

sparksql-uml-class

关键的调用关系如图:

sparksql-saveAsParquetFile

结合代码来看。

 Scala |  copy code |? 
1
  def saveAsParquetFile(path: String): Unit = {
2
    sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
3
  }

可以看到saveAsParquetFile生成了WriteToFile这个LogicPlan子类,Lazy调度后,在ParquetOperations.apply()方法里,会触发真正的执行过程:

 Scala |  copy code |? 
1
  object ParquetOperations extends Strategy {
2
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3
      // TODO: need to support writing to other types of files.  Unify the below code paths.
4
      case logical.WriteToFile(path, child) =&gt;
5
        val relation =
6
          ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
7
        // Note: overwrite=false because otherwise the metadata we just created will be deleted
8
        InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil

在InsertIntoParquetTable.execute()方法里会调用self的saveAsHadoopFile(),这个方法针对RDD每一行,调用writeShard执行写入:

 Scala |  copy code |? 
01
    def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
02
      // Hadoop wants a 32−bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
03
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
04
      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
05
      /* "reduce task" &lt;split #&gt; &lt;attempt # = spark task #&gt; */
06
      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
07
        attemptNumber)
08
      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
09
      val format = new AppendingParquetOutputFormat(taskIdOffset)
10
      val committer = format.getOutputCommitter(hadoopContext)
11
      committer.setupTask(hadoopContext)
12
      val writer = format.getRecordWriter(hadoopContext)
13
      try {
14
        while (iter.hasNext) {
15
          val row = iter.next()
16
          writer.write(null, row)
17
        }
18
      } finally {
19
        writer.close(hadoopContext)
20
      }
21
      committer.commitTask(hadoopContext)
22
      1
23
    }

这里有几个重要的变量,都标注在核心类那张图上了。writer.write依次触发了ParquetRecordWriter.write()、InternalParquetRecordWriter.write()、RowWriteSupport.write()将数据写入内存,并“定期”调用InternalParquetRecordWriter.checkBlockSizeReached()检查是否需要格式化并flush到磁盘(一般是HDFS)。

schema生成

在说write和flush前,得先确定schema在哪里生成的。writeShard里调用val writer = format.getRecordWriter(hadoopContext),触发:

 Scala |  copy code |? 
1
    WriteContext init = writeSupport.init(conf);
2
    ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);

其中很关键的方法是ParquetTypes.fromDataType()

public static parquet.schema.Type fromDataType(org.apache.spark.sql.catalyst.types.DataType ctype,
                               String name,
                               boolean nullable,
                               boolean inArray)

Converts a given Catalyst DataType into the corresponding Parquet Type.The conversion follows the rules below:

  • Primitive types are converted into Parquet’s primitive types.
  • StructTypes are converted into Parquet’s GroupType with the corresponding field types.
  • ArrayTypes are converted into a 2-level nested group, where the outer group has the inner group as sole field. The inner group has name values and repetition level REPEATED and has the element type of the array as schema. We use Parquet’s ConversionPatterns for this purpose.
  • MapTypes are converted into a nested (2-level) Parquet GroupType with two fields: a key type and a value type. The nested group has repetition level REPEATED and name map. We use Parquet’sConversionPatterns for this purpose

Parquet’s repetition level is generally set according to the following rule:

  • If the call to fromDataType is recursive inside an enclosing ArrayType or MapType, then the repetition level is set to REPEATED.
  • Otherwise, if the attribute whose type is converted is nullable, the Parquet type gets repetition level OPTIONAL and otherwise REQUIRED.

write过程

回到write这条线,会递归调用writeValue()、writePrimitive()、writeStruct()、writeMap()、writeArray()将基本类型和struct、map、array保存起来。其中根据schema,又会触发MessageColumnIORecordConsumer内部类的startMessage()、startField()、startGroup()等方法。这3个概念很重要,应该说就是基于这些才实现了dremel论文里的数据结构:

  • Message: a new record,对应一行
  • Field: a field in a group or message. if the field is repeated the field is started only once and all values added in between start and end,对应一个kv对,key是field name,value可以是group,从而嵌套其他数据结构
  • Group: a group in a field

由于采用了抽象嵌套的方式,map<string, array>首先是一个field整体,而其中每一个string, array也构成一个子field。在底层存储和建索引的时候,应该是nested最深的field对应一个column,针对这些column计算r和d的值。

flush过程:

再来看flush这条线:

 Scala |  copy code |? 
1
# InternalParquetRecordWriter
2
  public void write(T value) throws IOException, InterruptedException {
3
    writeSupport.write(value);
4
    ++ recordCount;
5
    checkBlockSizeReached();
6
  }

如果需要flush,则checkBlockSizeReached会再调用flushRowGroupToStore()落地磁盘,并调用initStore()重新初始化内存存储区。

SparkSQL日志分析

以上源码分析的推论是,nested结构的每一个field都会被存储为一列,以下通过写入和查询日志再予以证明。

数据结构类似:

 SQL |  copy code |? 
01
CREATE TABLE IF NOT EXISTS orc_qiche (
02
    query string,
03
    uid string,
04
    country string,
05
    province string,
06
    city string,
07
    sttime string,
08
    properties map&lt;string, array&lt;string&gt;&gt;,
09
    intension array&lt;string&gt;
10
    )
11

其中properties含有brand、effects等key。

写入日志:

 Scala |  copy code |? 
1
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 88B for [properties, brand, array] BINARY: 2 values, 34B raw, 50B comp, 1 pages,
2
encodings: [RLE, PLAIN]
3
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 85B for [properties, effects, array] BINARY: 2 values, 33B raw, 48B comp, 1 pages
4
, encodings: [RLE, PLAIN]
5
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 69B for [properties, brand, array] BINARY: 1 values, 21B raw, 36B comp, 1 pages,
6
encodings: [RLE, PLAIN]
7
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 72B for [query] BINARY: 1 values, 16B raw, 35B comp, 1 pages, encodings: [RLE, BI
8
T_PACKED, PLAIN]

查询日志:

8A542FE1-9484-4B2C-9184-DEF07113B83B

from: http://blog.thislongrun.com/2015/07/Forfeit-Partition-Tolerance-Distributed-System-CAP-Theorem.html?m=1
The CA–consistent, available, but not network partition tolerant–category in CAP has a very specific history. Not only forfeiting “network partition tolerance” can be understood as impossible in theory and crazy in practice (P as an illusion of a choice), but there is also an overlap between the CA and CP categories. As a result, many consider that it’s impossible to build a production CA system. But you can actually build a system without network partition tolerance, and sometimes you should.

A brief history of the CA category

Let’s look at the academic history of the CA category in CAP:
  • In 2000, Eric Brewer presents the CAP conjecture. In his presentation, CA exists, for example for systems using the two-phase commit protocol. He considers that “‹the whole space is useful.”
  • In 2002, Seth Gilbert and Nancy Lynch publish the CAP proof. CA exists: “Systems that run on intranets and LANs are an example of these types of algorithms.
  • In 2010, Daniel Abadi raises the point that there is an overlap between CA and CP: “What does ‘not tolerant’ mean? In practice, it means that they lose availability if there is a partition. Hence CP and CA are essentially identical.
  • Still in 2010, Michael Stonebraker publishes multiple documents around the limited importance of partitions, with the tagline “Myth #6: In CAP, choose AP over CA”, considering that with the capacity of modern hardware, small distributed systems can solve most real-life issues, and that “it doesn’t much matter what you do when confronted with network partitions.”
  • And again in 2010, Coda Hale publishes a blog post: “You cannot sacrifice partition tolerance”, explaining that only AP and CP are possible.
  • This triggers a feedback from Stonebraker, who restates all his points.
  • 2 years later, in 2012, referring to these works, Eric Brewer states that “exactly what it means to forfeit P is unclear” and then clarifies: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures, such as disasters or multiple simultaneous faults.”


So we need to sort out the following issues:
  • There is an overlap between CP and CA.
  • There is a theoretical impossibility: network partitions are a given, you can choose between ‘A’ and ‘C’ when a partition happens but not if partitions happen.
  • There is a practical impossibility: network partitions are too likely to happen on a real life system to be ignored, so CA is impossible in practice.
What does CA mean?
CA is about “forfeiting network partition tolerance”, i.e. being “network partition intolerant”. Partition intolerance does not mean that network partitions cannot happen, it means network partitions are a critical issue. It’s a bit like gluten: being “gluten intolerant” does not mean that you cannot eat any, it means that you should not. Like for gluten, a CA system should also have a means of recovery should a network partition actually happen. The two-phase commit is a perfect example: it comes with a repair tool to fix the transactions broken by an heuristic resolution.

The fact that CA does not mean “I have a network that cannot be partitioned” is important, because it implies a partition can actually happen. This is stressed by Brewer: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures.” To estimate this probability you must be quite clear about what a partition actually is. This whole post is only about network partitions.

Let’s summarize: CA describes the specification of an operating range, and not a behavior. CP, AP describe the behavior when a partition occurs. This obviously leaves room for an overlap between CP and CA. Let’s look at this overlap now.
The overlap between CP and CA
It’s the point identified by Abadi: “What does “not tolerant” mean? In practice, it means that they lose availability if there is a partition. Hence CP and CA are essentially identical.” A system that does not do anything once partitioned is trivially CP: it does not present a non-consistent history. Such a system could also be considered as CA: it stops working when there is a partition–hence the overlap. This overlap is minimal however:
  • Many CA systems are not CP: for example, the two-phase commit protocol is not consistent (nor available, nor ACID-atomic) when there is a partition.
  • Many CP systems are not CA: for example, a consensus server like ZooKeeper is totally tolerant to partitions.
Systems that belong to these two categories are only systems that stop working during the partition, but are consistent once the partition is fixed (trivially a webserver connected to a database). I personally prefer to call these systems ‘CA’ rather than ‘CP’, even if the CAP theorem allows for both: this expresses that a partition is a severe issue for the system. Ultimately, it’s your choice.
Partitions are a given in the CAP theorem
That’s exactly CAP: if there is a partition, you have to choose between ‘A’ and ‘C’. We have a model that allows partitions, and a theorem that says we have to choose ‘A’ or ‘C’ when there is a partition, so we cannot “refuse to see partitions”.  But actually “forfeiting partitions” is exactly that: it’s removing partitions from the model and building our application on a brand new model without partitions.


From a theoretical point of view, forfeiting partitions means removing them from the model. They will never happen in our theoretical model.
From a practical point of view, forfeiting partitions means removing them from the operating range. They may happen in reality.


By definition a model differs from reality. The question is always: is this model a good representation of reality?

Partitions happen too often in real life to be ignored

Well, here ended the debate between Coda Hale and Michael Stonebraker: Hale saying that there are a lot of partitions in his datacenters, Stonebraker saying that there are problems more probable than partitions that are not fixed anyway, and that surviving partitions will not “move the needle” on availability.
Without data agreed upon, there is no real way out from this debate. The good news is we don’t have to revive it to say that CA can be used to describe a distributed system: a CA system is a system built by someone who thinks he can forfeit partitions.
But the key point of the discussion is the difficulty to reason about failures without describing the system. In the debate above, Hale was speaking about systems of “any interesting scale”, while Stonebraker was considering small systems of high range servers on a LAN (“if you need 200 nodes to support a specific SQL application, then VoltDB can probably do the same application on 4 nodes”). But these two types of distributed systems are totally different animals. When discussing a design remember the old programming rule–“fancy algorithms are slow when n is small, and n is usually small”, and check the value of n.

When to use CA

The branch can be partitioned from the tree, but it
may not be the monkey’s main concern.


Let’s recall what Brewer wrote in 2012: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures, such as disasters or multiple simultaneous faults.”


Eric Brewer detailed in a mail he sent me (quoted here with his permission):
I tend to explain it a few different ways:
1) it is trivial to get CA in a non-distributed system, such as a single node
2) it is also fine to assume CA on a LAN, especially if it is (over) engineered for multiple paths or even for fail stop.  The CM-5 had an over-engineered network that would halt if it detected any errors, but it almost never did (in fact I don’t know of case where it actually stopped, but there probably were some).  The CM-5 case thus really was an operating range argument.
3) If the probability of a partition is lower than other major system failures that would take out an application, then you can claim CA.  For example, you might lose a quorum due to correlated failures (such as power or a disaster), which would also lose availability even though not a partition.  If your network is 5 9s, you can probably ignore the partition case in terms of the code you write (but you should at least detect it!).


“CA should mean that the probability of a partition is far less than that of other systemic failures” says we can call a system CA if the “probability of a partition “ is minimal–the non distributed or over-engineered network case. These systems are often not of “any interesting scale” but that doesn’t mean they don’t have any business value.


There is a more complex case: the probability of “multiple simultaneous faults” depends on many things, including the software itself. Many non-critical software are more likely to get a data corruption from a software bug than from a network partition, just because simple error scenarios like wrong user-inputs are not tested enough. A complicated administration interface is also a common source of downtime. In other words, choosing CA depends on the network quality and the software quality itself.


Network partition tolerance is a feature like any other. It has to be planned, implemented and tested. And, as any feature, the decision to implement it or not must take into account the benefits of the feature compared to its implementation cost. For such a feature it is:


expected number of partitions * cost per partition (unavailability, reputation, repair …)
vs.
cost of supporting partitions (testing effort included).


Even if the ratio is positive, i.e. the system should be partition tolerant, there could be other features that have a better ratio and they will be prioritized. That’s a well known engineering drama: it’s not because a feature is useful and brings value that it’s implemented in the end.


An example of such CA systems would be those GPU-based machine learning systems. The one built by Baidu was “comprised of 36 server nodes, each with 2 six-core Intel Xeon E5-2620 processors. Each server contains 4 Nvidia Tesla K40m GPUs and one FDR InfiniBand (56Gb/s) which is a high-performance low-latency interconnection and supports RDMA. The peak single precision floating point performance of each GPU is 4.29TFlops and each GPU has 12GB of memory.” For such a system, partition tolerance is not an imperious necessity: if a partition occurs the calculation can be restarted from scratch once the partition is fixed. As already stated, this does not mean partition tolerance is not useful. Partition tolerance would be typically useful should the calculation take weeks. But such systems can also exist without partition tolerance.

Conclusion: “the whole space is useful”

Being partition tolerant is comfortable. You have to be Stonebraker to claim partition intolerance. On the other hand, Kyle ‘’Aphyr’ Kingsbury proves regularly with smart but simple tests that many systems used in production are not network partition tolerant.
It’s not really that network partition tolerance can be easily forfeited, especially if the system is of “any interesting scale.” But first it is worth checking the system’s size: is it of “any interesting scale?” Exactly like a system that does not need to be distributed should not be distributed, a distributed system that can be kept small should be kept small.
There is also a catch in how CAP is sometimes (mis)understood: “node failures, processes crashes and network partitions are partitions so you have to be partition tolerant”. This is not only false but also dangerous: it hides the fact that each of these faults could be tackled independently with a specific priority. Before trying to be available during network partition, you should first validate that you don’t lose data with a single process crash. With fault tolerance like with any other problem, decomposing it makes it easier to fix. Network partition is just one type of fault out of many.
So, sometimes using CA just makes sense. As already stated by Eric Brewer: “the whole space is useful.
原文comments:https://aphyr.com/posts/325-comments-on-you-do-it-too

zz from: http://www.cloudera.com/content/cloudera/zh-CN/documentation/core/v5-3-x/topics/impala_parquet.html

Impala 可帮助您创建、管理和查询 Parquet 表。Parquet 是一种面向列的二进制文件格式,其设计目标是为 Impala 最擅长的大规模查询类型提供高效率支持。Parquet 对于在表中扫描特定列的查询特别有效,例如查询一个包含许多列的表,或执行需要处理列中绝大部分或全部的值的如SUM()AVG() 等聚合操作。每个数据文件包含一系列行(行组)的值。在数据文件里,每列的值被重新组织以便它们相邻,从而对这些列的值进行良好的压缩。针对 Parquet 表的查询可以快速并以最小的 I/O 从任意列快速获取并分析这些数据。

Table 1. Impala 支持 Parquet 文件
文件类型 格式化 压缩编码解码器 Impala 能否支持 CREATE? Impala 能否支持 INSERT?
Parquet 结构化 Snappy、gzip;当前默认为 Snappy 是。 是:CREATE TABLEINSERTLOAD DATA 和查询。

Continue reading:

在 Impala 中创建 Parquet 表

要创建一个名为 PARQUET_TABLE 的 Parquet 格式的表,请使用类似下方的命令,并替换为你自己的表名、列名和数据类型:

[impala-host:21000] > create table parquet_table_name (x INT, y STRING) STORED AS PARQUET;

或者,要克隆一个现有表的列名和数据类型:

[impala-host:21000] > create table parquet_table_name LIKE other_table_name STORED AS PARQUET;

在 Impala 1.4.0 及更高版本中,您可以从原始 Parquet 数据文件中导出列定义,即使没有一个现存的 Impala 表。例如,您可以基于该目录中某文件的列定义,创建一个指向 HDFS 目录的外部表:

CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET
  LOCATION '/user/etl/destination';

或者,您可以参考现有的数据文件,创建带有合适列名的新的空表。 然后使用 INSERT 创建新的数据文件 或者使用 LOAD DATA 将现有数据文件 导入至新表。

CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET;

新创建的表的默认属性与其它任何 CREATE TABLE 语句相同。 例如,默认的文件格式是文本文件;如果您希望新建的表使用 Parquet 文件格式, 请再加上STORED AS PARQUET

本例中,新建的表根据年、月、日进行分区。这些分区关键列不是数据文件的一部分,因此您需要在 CREATE TABLE 语句中指定:

CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  PARTITION (year INT, month TINYINT, day TINYINT)
  STORED AS PARQUET;

参见 创建表语句 了解 CREATE TABLE LIKE PARQUET 语法的详细信息。

当创建了表后,请使用类似下面的命令向表中插入数据,请再次使用您自己的表名:

[impala-host:21000] > insert overwrite table parquet_table_name select * from other_table_name;

如果 Parquet 表具有与其它表不同数量的列或不同的列名,请在对其它表的 SELECT 语句中指定列名,以代替 *

加载数据到 Parquet 表

根据原始数据是存在于 Impala 表中,还是存在于 Impala 表之外的数据文件中,选择不同技术将数据加载到 Parquet 表里。

如果您的数据已经在 Impala 或 Hive 表里,可能是在不同的文件格式或分区模式下,您可以使用 Impala 语法 INSERT…SELECT 将这些数据导入 Parquet 表。可以在同一个 INSERT 语句中,对数据执行转换、过滤、重新分区,以及其它类似操作。参见 Parquet 数据文件的 Snappy 和 GZip 压缩 查看一些示例,了解如何在 Parquet 表中插入数据。

插入分区表时,尤其是使用 Parquet 文件格式时,可以在 INSERT 语句中包含提示来微调操作的整体性能及其资源使用率:

  • 这些提示在 Impala 1.2.2 及更高版本中可用。
  • 只有当由于容量限制而导致 INSERT 插入分区 Parquet 表的操作失败时,或者 INSERT 虽然成功却低于最佳性能时,才可以使用这些提示。
  • 要使用这些提示,请将提示关键字 [SHUFFLE][NOSHUFFLE](包括方括号)放置在 PARTITION 子句之后,SELECT 关键字之前。
  • [SHUFFLE] 选择的执行计划应能将同时写入到 HDFS 的文件数量以及保留分区数据的内存缓冲区的数量降到最少。它允许某些原本有可能失败的INSERT 操作成功执行,从而降低了 INSERT 操作的整体资源使用率。它涉及节点之间的一些数据传输,以便在同一个节点上构建某个特定分区的数据文件。
  • [NOSHUFFLE] 选择的执行计划应整体速度更快,但也能够生成大量的小数据文件或超出容量限制而导致 INSERT 操作失败。当由于所有节点尝试构建所有分区的数据而导致 INSERT 语句失败或运行无效时,使用 [SHUFFLE]
  • 如果 INSERT … SELECT 查询中提及的来源表的任何分区键列不显示列统计数据,则 Impala 会自动使用 [SHUFFLE] 方法。在这种情况下,仅使用[NOSHUFFLE] 提示是不会带来什么影响的。
  • 如果 INSERT … SELECT 查询中提及的来源表的所有分区键列均显示列统计数据,Impala 会根据这些列中预计的独特值数量以及 INSERT 操作中涉及的节点数量,选择是使用 [SHUFFLE] 还是 [NOSHUFFLE]。在这种情况下,您可能需要 [SHUFFLE][NOSHUFFLE] 提示来替代 Impala 选择的执行计划。

Parquet 表的任何 INSERT 语句都需要在 HDFS 文件系统中有足够的空间才能写入一个块。由于默认情况下 Parquet 数据文件使用的块大小为 1 GB,因此如果 HDFS 的运行空间不足,INSERT 就有可能失败(即使是极少量的数据)。

避免对 Parquet 表使用 INSERT…VALUES 语法,因为 INSERT…VALUES 会为每一个 INSERT…VALUES 语句产生一个极小的单独数据文件,而 Parquet 的强项在于它可以 块的方式处理数据(压缩、并行等操作)。

假如您有一个或多个 Impala 之外生成的 Parquet 数据文件,可通过以下方法之一,快速地让这些数据可以在 Impala 中查询:

  • 使用 LOAD DATA 语句,将一个单独文件或某个目录下所有数据文件移动到 Impala 表对应的数据目录中。这不会验证或转换数据。原始数据文件必须位于 HDFS 中,而不能是本地文件系统中。
  • 使用包含 LOCATION 子句的 CREATE TABLE 语句创建一个表,将数据继续存放在 Impala 数据目录之外。原始数据文件必须位于 HDFS 中,而不能是本地文件系统中。为加强安全性,假如这些数据长时间存在并被其他应用重用,您可以使用 CREATE EXTERNAL TABLE 语法,使得这些数据文件不会被 Impala 语句 DROP TABLE 删除。
  • 假如 Parquet 表已经存在,您可以直接复制 Parquet 数据文件到表的目录中,然后使用 REFRESH 语句使得 Impala 得以识别新添加的数据。请记住使用 hdfs distcp -pb 命令而不是 -put-cp 操作,以保留 Parquet 数据文件的块大小。参见 复制 Parquet 数据文件示例 了解以上操作的示例。

如果数据存在于 Impala 之外,并且是其它格式,请结合使用之前提到的两种技术。首先,使用 LOAD DATACREATE EXTERNAL TABLE … LOCATION语句把数据导入使用对应文件格式的 Impala 表中。然后,使用 INSERT…SELECT 语句将数据复制到 Parquet 表中,并转换为 Parquet 格式。

加载数据到 Parquet 表是内存密集型操作,因为输入数据会在内存中被缓存,直到其大小达到 一个数据块,然后这些块的数据会进行组织和压缩,最终写出。把数据插入到分区 Parquet 表时,内存消耗会更大,因为对分区关键列值的每种组合都要写入一个单独的数据文件中,可能需要内存同时操作几个 区块。

当向分区 Parquet 表插入数据时,Impala 会在节点之间重新分布数据以减少内存消耗。但是在插入操作时,您可能仍然需要临时增加 Impala 专用的内存量,或者把加载操作拆分到几个 INSERT 语句中,或者两种方式都采用。

  Note: 之前所有的技术都假定你所加载的数据与你的目标表的结构相匹配,包括列的顺序,列的名称,以及分区布局 (layout)。要转换或重组数据,请先将数据加载到与其底层数据相匹配的 Parquet 表中,然后使用如 CREATE TABLE AS SELECTINSERT … SELECT 之一的表复制技术,对列进行重新排序或重命名,将数据拆分到多个分区等等。举例来说,要加载单个包含所有数据的 Parquet 文件到分区表中,您应当使用包含动态分区的 INSERT … SELECT 语句,让 Impala 创建具有合适分区数值的单独的数据文件;示例参见 INSERT 语句

Impala Parquet 表的查询性能

Parquet 表的查询性能,取决于处理SELECT 列表和 WHERE 子句 时所需的列的个数,数据被分为 块大小等同于文件大小的大数据文件的方式,读取压缩格式下每列数据时 I/O 的降低,可以跳过哪些数据文件(分区表),以及解压每列数据时的 CPU 负载。

例如,以下对 Parquet 表的查询是高效的:
select avg(income) from census_data where state = 'CA';

这个查询只处理大量列中的两个列。假如表是根据 STATE 分区的,它甚至更有效率,因为查询仅仅需要对每个数据文件读取和解码 1 列,并且它可以只读取 state ‘CA’分区目录下的数据文件,跳过所有其它 state 的、物理上的位于其它目录的所有数据文件。
以下对 Parquet 表的查询相对低效:
select * from census_data;

Impala 不得不读取每个 数据文件的全部内容,并解压每一个行组中每一列的内容,浪费了面向列格式的 I/O 优化。与其它文件格式的表相比,该查询对 Parquet 表可能还是会更快,但它没有利用 Parquet 数据文件格式的独特优势。

Impala 可以优化对 Parquet 表的查询,特别是在联接查询方面,涉及对全部的表进行统计时效果更加明细。当表中加载或附加了大量数据后,对每个表发出COMPUTE STATS 语句。详细信息,请参见 COMPUTE STATS 语句

  Note: 目前,某个已知问题 (IMPALA-488) 会导致在 Parquet 表上执行 COMPUTE STATS 操作时使用的内存过多。解决办法是,发布 COMPUTE STATS语句前,在 impala-shell 中发布命令 SET NUM_SCANNER_THREADS=2。接着,发布 UNSET NUM_SCANNER_THREADS,然后继续查询。

Parquet 表的分区

正如 Impala 表分区所述,对 Impala 而言,分区是一项重要而通用的性能技术。本章节介绍一些关于分区 Parquet 表的性能考虑。

Parquet 文件格式非常适合包含许多列,并且绝大部分查询只涉及少数几列的表。正如 Parquet 数据文件如何组织所述,Parquet 数据文件的物理分布使得对于许多查询 Impala 只需读取数据的一小部分。当您结合使用 Parquet 表和分区时,这种方法的性能优势会进一步凸显。基于 WHERE 子句引用的分区关键列,Impala 可以完全跳过特定分区的数据文件。例如,分区表上的查询通常基于YEARMONTH,和/或 DAY列进行期间趋势分析,或是地理区域分析。请记住,Parquet 数据文件使用了 块尺寸,所以在确定如何精细地分区数据时,请尝试找到一个粒度,使得每个分区包含 256 MB 或更多数据,而不是创建大量属于多个分区的小文件。

插入到分区 Parquet 表的操作可能是一个资源密集型操作,因为对于每个不同分区关键列的组合,每个 Impala 节点都可能会将一个单独的数据文件写入 HDFS。大量同时打开的文件数可能会达到 HDFS transceivers 限制。考虑采用以下技术,避免达到这一限制:

  • 使用单独的 INSERT 语句加载不同的数据子集,每个语句的 PARTITION 子句包含特定值,例如 PARTITION (year=2010)
  • 增加 HDFS 的 transceivers 值,有时候写作 xcievers (sic)。即配置文件 hdfs-site.xml 中的 dfs.datanode.max.transfer.threads属性。例如,如果您加载 12 年的数据,根据年、月、日进行分区,那么即使该值设为 4096 也不够。这篇 博文 使用 HBase 例子作为说明,探讨了增加或减小这一数值的考虑。
  • 使用 COMPUTE STATS 语句在数据被复制的源表上采集 列统计信息,这样 Impala 查询可以评估分区关键列不同数值的个数,从而均衡工作负载。
  Note: 目前,某个已知问题 (IMPALA-488) 会导致在 Parquet 表上执行 COMPUTE STATS 操作时使用的内存过多。解决办法是,发布 COMPUTE STATS语句前,在 impala-shell 中发布命令 SET NUM_SCANNER_THREADS=2。接着,发布 UNSET NUM_SCANNER_THREADS,然后继续查询。

Parquet 数据文件的 Snappy 和 GZip 压缩

当 Impala 使用 INSERT 语句写入 Parquet 数据文件时,底层的压缩受 COMPRESSION_CODEC 查询选项控制。(Impala 2.0 版本以前,该查询选项的名称为PARQUET_COMPRESSION_CODEC。)这一查询选项允许的值包括 snappy(默认值)、gzipnone。选项值不区分大小写。如果该选项设为了其它无法识别的值,那么所有类型的查询都会因无效的选项设置而失败,不仅仅是涉及 Parquet 表的查询。

使用 Snappy 压缩的 Parquet 表示例

默认情况下,Parquet 表的底层数据文件采用 Snappy 压缩。快速压缩和解压,对于许多数据集来说是一个好选择。为确保使用了 Snappy 压缩,例如在试验了其它压缩编解码之后,请在插入数据之前设置 COMPRESSION_CODEC 查询选项为 snappy

[localhost:21000] > create database parquet_compression;
[localhost:21000] > use parquet_compression;
[localhost:21000] > create table parquet_snappy like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=snappy;
[localhost:21000] > insert into parquet_snappy select * from raw_text_data;
Inserted 1000000000 rows in 181.98s

使用 GZip 压缩的 Parquet 表示例

如果您需要更密集的压缩(代价是,查询时需要更多的 CPU 周期以进行解压),请在插入数据之前设置 COMPRESSION_CODEC 查询选项为 gzip

[localhost:21000] > create table parquet_gzip like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=gzip;
[localhost:21000] > insert into parquet_gzip select * from raw_text_data;
Inserted 1000000000 rows in 1418.24s

未压缩 Parquet 表示例

假如您的数据压缩效果非常有限,或者您想彻底避免压缩和解压缩带来的 CPU 负载,请在插入数据前设置 COMPRESSION_CODEC 查询选项为none

[localhost:21000] > create table parquet_none like raw_text_data;
[localhost:21000] > insert into parquet_none select * from raw_text_data;
Inserted 1000000000 rows in 146.90s

已压缩 Parquet 表的大小和速度示例

下面的例子演示了 10 亿条复合数据在数据大小和查询速度方面的差异,他们分别使用了不同的编解码器进行压缩。与往常一样,使用你自己真实的数据集进行类似的测试。实际的压缩比、对应的插入和查询速度,根据实际数据特征的不同而有所不同。

本例中,压缩方式从 Snappy 改为 GZip,数据大小减少了 40%,而从 Snappy 改为不压缩,大小增加了 40%:

$ hdfs dfs -du -h /user/hive/warehouse/parquet_compression.db
23.1 G  /user/hive/warehouse/parquet_compression.db/parquet_snappy
13.5 G  /user/hive/warehouse/parquet_compression.db/parquet_gzip
32.8 G  /user/hive/warehouse/parquet_compression.db/parquet_none

因为 Parquet 数据文件通常大小是 ,每一个目录都包含不同数量的数据文件并安排不同的行组。

同时,压缩比更小,解压速度就更快。在上面包含 10 亿行记录的表中,对于评估特定列全部数值的查询,不使用压缩的比使用 Snappy 压缩的快,使用 Snappy 压缩的比使用 Gzip 压缩的快。查询性能依赖于多个不同因素,因此请如往常一样,使用你自己的数据进行自己的基准测试,以获得数据大小、CPU 效率、以及插入和查询操作的速度等方面的理想均衡。

[localhost:21000] > desc parquet_snappy;
Query finished, fetching results ...
+-----------+---------+---------+
| name      | type    | comment |
+-----------+---------+---------+
| id        | int     |         |
| val       | int     |         |
| zfill     | string  |         |
| name      | string  |         |
| assertion | boolean |         |
+-----------+---------+---------+
Returned 5 row(s) in 0.14s
[localhost:21000] > select avg(val) from parquet_snappy;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 4.29s
[localhost:21000] > select avg(val) from parquet_gzip;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 6.97s
[localhost:21000] > select avg(val) from parquet_none;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 3.67s

复制 Parquet 数据文件示例

下面是最后一个示例,演示了使用不同压缩编解码器的数据文件在读操作上是如何相互兼容的。压缩格式的相关元数据会写入到每个数据文件中,无论当时COMPRESSION_CODEC 设置为什么值,在查询过程中都可以正常解码。本例中,我们从之前示例中使用的 PARQUET_SNAPPYPARQUET_GZIP,和PARQUET_NONE 表中复制数据文件,这几个表中每个表都包含 10 亿行记录,全都复制到新表 PARQUET_EVERYTHING的数据目录中。一对简单的查询表明,新表包含了 30 亿行记录,并且数据文件使用了不同的压缩编解码器。

首先,我们在 Impala 中创建表,以便在 HDFS 中有一个存放数据文件的目标目录:

[localhost:21000] > create table parquet_everything like parquet_snappy;
Query: create table parquet_everything like parquet_snappy

然后在 shell 中,我们将对应的数据文件复制到新表的数据目录中。 不采用 hdfs dfs -cp 这一常规文件的操作方式,我们采用 hdfs distcp -pb 命令以确保 Parquet 数据文件特有的 块大小 继续保留。

$ hdfs distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_snappy \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hdfs distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_gzip  \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hdfs distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_none  \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...

返回 impala-shell 编译器,我们采用 REFRESH 语句让 Impala 服务器识别表中新的数据文件,然后运行查询,结果表明数据文件包含 30 亿行记录,并且其中某个数值列的值与原来的小表相匹配:

[localhost:21000] > refresh parquet_everything;
Query finished, fetching results ...

Returned 0 row(s) in 0.32s
[localhost:21000] > select count(*) from parquet_everything;
Query finished, fetching results ...
+------------+
| _c0        |
+------------+
| 3000000000 |
+------------+
Returned 1 row(s) in 8.18s
[localhost:21000] > select avg(val) from parquet_everything;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 13.35s

与其他 Hadoop 组件交换 Parquet 数据文件

自 CDH 4.5 开始,您可以在 Hive、Pig、MapReduce 中读取和写入 Parquet 数据文件。参考 CDH 4 安装指南 了解详细信息。

之前,不支持在 Impala 中创建 Parquet 数据然后在 Hive 中重用这个表。现在 CDH 4.5 中,Hive 开始支持 Parquet,在 Hive 中重用已有的 Impala Parquet 数据文件需要更新表的元数据。假如您已经使用 Impala 1.1.1 或更高版本,请使用以下命令:

ALTER TABLE table_name SET FILEFORMAT PARQUET;

假如您使用比 Impala 1.1.1 老的版本,通过 Hive 执行元数据的更新:

ALTER TABLE table_name SET SERDE 'parquet.hive.serde.ParquetHiveSerDe';
ALTER TABLE table_name SET FILEFORMAT
  INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
  OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat";

Impala 1.1.1 及以上版本可以重用 Hive 中创建的 Parquet 数据文件,不需要执行任何操作。

Impala 支持标量数据类型,您可以在 Parquet 数据文件中进行编码,但不支持复合 (composite) 或嵌套 (nested) 类型,如 maps/arrays。假如表的任何一列采用了不受支持的类型,Impala 将无法访问该表。

假如你在不同节点、乃至在相同节点的不同目录复制 Parquet 数据文件,请使用hadoop distcp -pb命令以确保保留原有的块大小。发出命令hdfs fsck -blocks HDFS_path_of_impala_table_dir 并检查平均块大小是否接近 256 MB(或是由 PARQUET_FILE_SIZE 查询设置所定义的其它任何大小),以验证是否保留了块大小。( hadoop distcp 操作通常会生出一些子目录,名称为 _distcp_logs_*,您可以之后从目标目录中删除这些目录)。发出hadoop distcp 命令,了解 distcp 命令的语法详情。

Parquet 数据文件如何组织

尽管 Parquet 是一个面向列的文件格式,但不要期望每列一个数据文件。Parquet 在同一个数据文件中保存一行中的所有数据,以确保在处理过程中,某行的所有列在同一个节点上都可用。Parquet 所做的是设置 HDFS 块大小和与之相匹配的最大数据文件大小,以确保 I/O 和网络传输请求适用于大批量数据。

在数据文件中,多个行的数据会重新排列,以便第一列的所有值都会被重新组织到一个连续的块中,然后是第二列的所有值,依此类推。相同列的值彼此相邻,从而 Impala 可以对这些列的值使用高效的压缩技术。

  Note:

Impala INSERT 语句通过一个大小 与数据文件大小相匹配的 HDFS 块来写入 Parquet 数据文件,以确保每个数据文件对应一个 HDFS 块,并且整个文件可以在单个节点上处理,不需要任何远程读取。

如果您不是通过 Impala 创建的 Parquet 数据文件,例如 MapReduce 或 Pig job,请确保 HDFS 块大小比文件要大或相同,从而维持 每个块一个文件 关系。将 dfs.block.size 或者dfs.blocksize 属性设置为足够大,使得每个文件可与单独的 HDFS 块大小匹配,即使该大小比正常的 HDFS 块要大。

如果在复制文件时块大小被重设为较低的值,你会发现涉及这些文件的查询性能会更低,并且 PROFILE 语句可通过远程读取揭示哪些 I/O 不是最优的。参见 复制 Parquet 数据文件示例 示例,了解在复制 Parquet 数据文件时,如何保留块大小。

当 Impala 检索或测试特定列的数据时,它会打开所有数据文件,但只会读取每个文件中包含该列数据的部分。这些列的值是连续存放,使得处理同一列的数据所需的 I/O 最小化。如果其它的列在 SELECT 列表或 WHERE 子句中列出,在同一个数据文件中同一行的所有列的数据都可用。

假如一个 INSERT 语句导入的数据小于 一个 Parquet 数据块的大小,那么最终的数据文件将小于理想大小。因此,如果您把一个 ETL 作业拆分成多个INSERT 语句,请尽量确保每一个 INSERT 语句插入的数据量接近 256 MB,或 256 MB 的倍数

Parquet 数据文件的 RLE 编码和字典编码

Parquet 基于实际数据值的分析,使用一些自动压缩技术,例如游程编码 (RLE) 和字典编码。当数据值被编码成紧凑的格式后,使用压缩算法,编码的数据可能会被进一步压缩。Impala 创建的 Parquet 数据文件 可以使用 Snappy、GZip,或不进行压缩;Parquet 规格还支持 LZO 压缩,但是目前不支持 LZO 压缩的 Parquet 文件。

除了应用到整个数据文件的 Snappy 或 GZip 压缩之外,RLE 和字典编码是 Impala 自动应用到 Parquet 数据值组的压缩技术。这些自动优化可以节省您的时间,并可省去传统数据仓库通常需要的规划。例如,字典编码降低了创建数值型 ID 作为长字符串缩写的需求。

游程编码压缩了一组重复的数据值。例如,如果多个连续行都包含相同的国家编码,那么可以用该值以及连续出现的次数来表示这些重复的值。

字典编码取出存在于列中的不同的值,并用紧凑的 2-字节替代原始值进行表示,而原始值可能有多个字节。(对压缩后的值还可进行进一步压缩,以节省更多空间。)当列的不同值的个数少于 2**16 (16,384) 时,使用这一类型的编码。对于 BOOLEAN数据类型的列不会应用该编码,因为原始值已经足够短。TIMESTAMP 列有时每行的值都不同,这时可能很快就超过 2**16 个不同值的限制。2**16 的列不同值限制可为每个数据文件进行重新设置,因此如果有多个不同的数据文件,每个文件都包含 10,000 个不同的城市名,每一个数据文件中的城市名一列依然可以使用字典编码来进行压缩。

为 Parquet 表压缩数据文件

如果您对 Parquet 表重用了现有的表结构或 ETL 过程,您可能会遇到 很多小文件 的情况,这时查询效率不是最优的。例如,类似以下的语句可能会产生组织低效的数据文件:

-- In an N-node cluster, each node produces a data file
-- for the INSERT operation. If you have less than
-- N GB of data to copy, some files are likely to be
-- much smaller than the default Parquet block size.
insert into parquet_table select * from text_table;

-- Even if this operation involves an overall large amount of data,
-- when split up by year/month/day, each partition might only
-- receive a small amount of data. Then the data files for
-- the partition might be divided between the N nodes in the cluster.
-- A multi-gigabyte copy operation might produce files of only
-- a few MB each.
insert into partitioned_parquet_table partition (year, month, day)
  select year, month, day, url, referer, user_agent, http_code, response_time
  from web_stats;

以下技术可帮助你在 ParquetINSERT 操作中产生大的数据文件,并压缩现有的过小的数据文件:

  • 向分区 Parquet 表中插入数据时,请使用静态分区INSERT 语句,这样分区关键值将被指定为常量。理想情况下,为每个分区使用一个单独的 INSERT语句。

  • 执行 INSERTCREATE TABLE AS SELECT 语句期间,可以暂时将 NUM_NODES 选项设为 1。通常情况下,这些语句针对每个数据节点生成一个或多个数据文件。如果写入操作涉及少量数据、Parquet 表和/或分区表,则默认的行为是当您凭直觉只希望输出一个文件时生成多个小文件。SET NUM_NODES=1 关闭写入操作的已分配方面,使其更有可能只生成一个或几个数据文件。

  • 与你习惯的传统分析数据库系统相比,做好减少分区关键列个数的准备。

  • 不要期望 Impala-写入的 Parquet 文件会填满整个 Parquet 区块大小。Impala 在计算为每个 Parquet 文件写入多少数据时,会采取保守策略。典型情况下,通过 Parquet 文件格式的压缩和解压缩技术,磁盘内存中的未压缩数据会显著减少。最终的数据文件大小取决于数据的压缩情况。因此,如果一个256 MB 的文本文件被分为 2 个 Parquet 数据文件,并且每个文件都小于 256 MB,这将是正常现象。

  • 如果不巧一个表的末尾是很多小的数据文件,可以考虑使用前述技术中的一种或多种,通过CREATE TABLE AS SELECTINSERT … SELECT 语句,将所有数据复制到一个新的 Parquet 表中。

    为避免重复查询更改表名,你可以采用一个约定,始终在一个视图上运行重要的查询。立即更改视图定义,所有后继查询都使用新的基础表:

    create view production_table as select * from table_with_many_small_files;
    -- CTAS or INSERT...SELECT all the data into a more efficient layout...
    alter view production_table as select * from table_with_few_big_files;
    select * from production_table where c1 = 100 and c2 < 50 and ...;
    

Parquet 表的模式 (Schema) 演进

模式演进是指使用ALTER TABLE … REPLACE COLUMNS 语句,更改表的名称、数据类型或列的个数。您可以根据以下步骤,执行 Parquet 表的模式演进:

  • Impala ALTER TABLE 语句不会更改表中的任何数据文件。对 Impala 而言,模式演进涉及根据新的表定义,对相同的数据文件进行解释。某些类型的模式变化合乎情理,可以正确表示。其它类型的模式变化可能无法以合理的方式表示,会在查询过程中产生特殊的结果值或是出现转换错误。

  • 语句 INSERT 总是以最新的表定义创建数据。如果你依次执行了INSERTALTER TABLE … REPLACE COLUMNS 语句,你可能得到列数不同或内部数据表示不同的数据文件。

  • 如果你在末尾使用 ALTER TABLE … REPLACE COLUMNS 来定义更多的列,当原始数据文件在查询中被使用时,最后添加的列将全部被认为是NULL 值。

  • 如果你使用 ALTER TABLE … REPLACE COLUMNS 定义了比过去更少的列,当原始数据文件在查询中被使用时,未使用的列在数据文件中仍然会被忽略。

  • Parquet 视 TINYINTSMALLINT,和 INT 为相同的内部类型,都以 32-位整数的方式存储。

    • 这意味着很容易将 TINYINT 列升级为 SMALLINTINT,或是将 SMALLINT 列升级为 INT。在数据文件中数字将以完全相同的方式展示,升级后的列不会包含任何超出范围的值。
    • 如果你把其中任何一个列类型更改为小一些的类型,那么在新类型下超出范围的值,其返回值将发生错误,典型地为负数。

    • 你无法将 TINYINTSMALLINTINT 列更改为 BIGINT,或其它类似方式。尽管 ALTER TABLE 执行成功,但是任何对这些列的查询都会导致转换错误。

    • 查询过程中,列的任何其它类型的转换都会在产生转换错误。例如, INT 转换至 STRINGFLOAT 转换至 DOUBLETIMESTAMP 转换至STRINGDECIMAL(9,0) 转换至 DECIMAL(5,2),等待。

Parquet 表的数据类型考虑

Parquet 格式定义了一系列数据类型,其名称与相应的 Impala 数据类型名称不同。如果你在准备 Parquet 文件过程中使用的是其它 Hadoop 组件(如 Pig 或 MapReduce),你可能需要使用 Parquet 定义的类型名称。下图列出了 Parquet 定义的类型,以及对应的 Impala 类型。

基本类型:

BINARY -> STRING
BOOLEAN -> BOOLEAN
DOUBLE -> DOUBLE
FLOAT -> FLOAT
INT32 -> INT
INT64 -> BIGINT
INT96 -> TIMESTAMP

逻辑类型:

BINARY + OriginalType UTF8 -> STRING
BINARY + OriginalType DECIMAL -> DECIMAL