Hadoop生态主要是基于GitHub和jira构建开源社区,今年希望可以参与进去,记录使用方法如下。

由于每个开源项目的要求都可能不同,所以在开始之前,必须先阅读其Contribute页面,一般从README里就可以找到链接。

JIRA操作

当有了代码修改想法时,不论是bugfix,还是功能改进,都可以到项目JIRA页面上,提交一个issue,用英文大致描述想要做的事情。这里需要注意两点:

  1. 提交issue前,先用英文关键词search一下,确认该功能没有实现、且没有其他人提交类似issue
  2. issue粒度最好足够细,一个独立的小功能就好,类似我们敏捷中的task卡片。

如果想自己动手提交代码,可以将该issue assign给自己。真正动手时,可以将issue状态修改为 in progress。

GitHub 操作

在正式coding前,得在github上找到该项目,点击fork按钮fork出一个自己的分支,这样后续在这个分支上的工作只要不被管理员merge回去,就不会有任何影响。

git操作

这时可以建立本地代码仓库了:

git clone https://<path-to-your-repo> <your-local-prj-name> 将远端项目拉取到本地

cd <your-local-prj-name>

git checkout -b <new-branch-name> 建立一个分支,该issue相关的功能都会在这个分支里进行(git与svn不同,git的分支很轻量级,可以认为是功能隔离的单位)

git push origin <new-branch-name>提交新分支,这时github上自己的project下这个分支可见、但为空。

这时可以在新分支上编码了,完成并通过自测后,可以先提交到分支上

git add <new-files>

git commit -m ‘comments’

git push origin <new-branch-name> 这时代码在新分支上可见,但放心,不会影响到社区版本

如果编码持续了一段时间,可能需要从社区版本更新代码下来

git remote add upstream https://<path-to-public-repo> 添加社区版本为upstream源

git fetch upstream 下载社区版本的更新到本地隐藏目录

git checkout master 切换到master分支

git merge upstream/master 合并代码到本地master分支

git push origin master 将合并的结果提交到自己远端的master分支

GitHub操作

这时需要让别人看到自己的代码了,在github自己的project页面上,点击 pull request发起请求。在收到别人回复时,可以进行交流、重复代码修改过程。

JIRA操作

这时还需要将pull request与JIRA关联起来,点击issue页面More/Link,添加一个Web Link,将pull reqeust的uri填进去,link text可以写PR #xxx。

最后,当pull request完成,即被merge回社区或被彻底拒绝后,可以在git里删除分支,并且关闭issue。

之前提到parquet.block.size所控制的parquet row group大小是一个需要调优的spark参数。其中重要一点,就是控制任务的并发度。

在Hadoop里,任务的并发默认是以hdfs block为单位的,而Spark里多了一种选择,即以RowGroup为基本单位。以下将对比spark 1.4和我厂根据1.5打的patch版本,分析两者的实现方式和优劣。

在调用HiveContext.read.parquet(path)时,会触发ParquetRelation2对象生成SqlNewHadoopRDD对象,并覆盖其中getPartitions()方法,它是本次分析的关键入口。

Spark 1.4社区版

该版本的SqlNewHadoopRDD.getPartitions()里,使用FilteringParquetRowInputFormat类,在spark.sql.parquet.cacheMetadata=true(默认)时,会使用cachedStatus和cachedFooters缓存,覆盖父类的listStatus()和getFooters()方法。后者会触发对parquet footers的扫描

 Scala |  copy code |? 
01
        // Overridden so we can inject our own cached files statuses.
02
        override def getPartitions: Array[SparkPartition] = {
03
          val inputFormat = if (cacheMetadata) {
04
            new FilteringParquetRowInputFormat {
05
              override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
06
              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
07
            }
08
          } else {
09
            new FilteringParquetRowInputFormat
10
          }
11
 
12
          val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
13
          val rawSplits = inputFormat.getSplits(jobContext)
14
 
15
          Array.tabulate[SparkPartition](rawSplits.size) { i =&gt;
16
            new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
17
          }
18
        }

如下图,在默认配置时,是以hdfsBlock为单位,进行task调度的。但可以通过设置parquet.task.side.metadata=false,使以rowGroup为单位调度(当group太小时,会合并,保证满足minsize)。但即使采用taskSideMeta,默认情况下,这时也读取了footers信息!而这是不必要的。
下图包含3个部分:
  • 中间流程:FilteringParquetRowInputFormat.getSplits(),在ParquetRelation2.buildScan()所构建SqlNewHadoopRDD的getPartitions()里被调用。
  • 左边流程:spark.sql.parquet.cacheMetadata=true(默认),使用getTaskSideSplits()。
  • 右边流程:spark.sql.parquet.cacheMetadata=fasle,使用getClientSideSplits()。

图中蓝色标识了默认分支。

getSplits

这时会产生一个问题,当默认情况下,一个rowGroup横跨多个splits时,怎么办?即可能有多个executor都收到这个RowGroup的处理请求,并分别逻辑持有一部分block数据。

如下图所示,当每个executor收到task对应的split信息时,读取所在文件的footer meta,拿到所有的rowGroups。用split指定的blocks range,去圈定应该自己处理的rowGroups,包括两种:

  • 完全在blocks range里的,肯定是自己处理
  • rowGroup中点在range里的,也是自己处理

getSplits-executor

为什么采取中点呢?在合理设置的parquet.block.size和hdfs.block.size时,两者的值应该比较接近,那么当一个hdfs block持有row group中点时,它至少拥有一多半的数据。从而保证仅有一个task会处理该rowGroup。

如下是parquet-format包里ParquetMetadataConverter.java的关键代码:

 Scala |  copy code |? 
01
  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
02
    List&lt;RowGroup&gt; rowGroups = metaData.getRow_groups();
03
    List&lt;RowGroup&gt; newRowGroups = new ArrayList&lt;RowGroup&gt;();
04
    for (RowGroup rowGroup : rowGroups) {
05
      long totalSize = 0;
06
      long startIndex = getOffset(rowGroup.getColumns().get(0));
07
      for (ColumnChunk col : rowGroup.getColumns()) {
08
        totalSize += col.getMeta_data().getTotal_compressed_size();
09
      }
10
      long midPoint = startIndex + totalSize / 2;
11
      if (filter.contains(midPoint)) {
12
        newRowGroups.add(rowGroup);
13
      }
14
    }
15
    metaData.setRow_groups(newRowGroups);
16
    return metaData;
17
  }

近似社区1.5版本

在构造SqlNewHadoopRDD时,不再使用FilteringParquetRowInputFormat而是直接使用ParquetInputFormat,并覆盖其listStatus方法,在spark.sql.parquet.cacheMetadata=true(默认)时,使用cachedStatuses。SqlNewHadoopRDD.getPartitions()触发ParquetInputFormat.getSplits()方法。

默认即parquet.task.side.metadata=true时,直接调用hadoop的FileInputFormat.getSplits()方法,即与hadoop的分片方式一致。

分片大小由以下3者决定:

  • 默认情况就是该hdfs文件的blockSize。
  • 如果设置maxSize比blockSize小,则可以让一个block被多个tasks处理(在executor端,通过上面的方式,保证rowGroup单次处理)。
  • 如果设置的minSize比blockSize大,则可以合并多个blocks由一个task处理。可以看出来,这也足够灵活控制并发了。

以下是hadoop-core里FileInputFormat.computeSplitSize代码:

 Scala |  copy code |? 
1
  protected long computeSplitSize(long goalSize, long minSize,
2
                                       long blockSize) {
3
    return Math.max(minSize, Math.min(goalSize, blockSize));
4
  }

而如果设置了parquet.task.side.metadata=false,则回到社区1.4版本的方式,由driver端扫描footers,根据rowGroups计算splits。

Parquet作为目前SparkSQL默认的存储方式,号称在性能与存储空间等方面达到较好地平衡。但为了达到高性能,还需要进一步调优,其中parquet.block.size是经常提到的一个配置。从字面看,它控制了parquet block的数据大小,但不同的block size会带来怎样的红利、是否适合业务要求呢?parquet支持多种encoding和compress方式,这里的block size是encoded and compressed size吗?

前一个话题较大,后续再讨论,但以我的经验而言,并不是像官方文档所说,1GB就一定比256MB好,得看应用场景。这里主要通过对parquet和spark源码的分析,阐释第二个问题。

先给结论

parquet.block.size控制的是parquet编码后、encoding后、大部分Pages compressed后的数据大小。但是是粗略的size,因为check mem时:

  • 可能有部分数据在encoder缓存中,未计算在内
  • 可能采用原始数据大小,而实际写入的是dict编码后的大小
  • 最后一个page的数据还未压缩

Parquet文件和LOG分析

下面示例中,parquet.block.size=256MB, parquet.page.size=1MB,都是默认值。在parquet里,经常混用block和row group的概念,可以认为两者相等。但需注意,这里的block与hdfs.block是两码事。

使用parquet tools可以看到meta信息:

row group 1: RC:9840100 TS:698748990 OFFSET:4

其中RC是RowCount,TS是压缩前的TotalByteSize,OFFSET是该row group的StartingPos。可以看到TS达到666MB,远超配置的256MB。

查看写入日志:

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: mem size 268,650,005 > 268,435,456: flushing 9,418,847 records to disk.

Jan 7, 2016 2:03:58 AM INFO: org.apache.parquet.hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 267,733,756

Jan 7, 2016 2:03:59 AM INFO: org.apache.parquet.hadoop.ColumnChunkPageWriteStore: written 151,651,009B for [xxx] BINARY: 9,418,847 values, 336,901,990B raw, 151,629,488B comp, 324 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY, PLAIN], dic { 25,702 entries, 925,241B raw, 25,702B comp}

可以看到当内存用量略超过parquet.block.size时,会触发block的flush。而raw大小一般会远超parquet.block.size。

从这些表象,也可以猜测,block size应该控制的是compressed后的数据大小吧,但为了得到更加精确和确定的答案,还是看源码。

源码解释

在调用df.write.parquet()写入数据时,包括以下关键步骤(以下涉及spark和parquet-mr代码):

Spark-parquet-write-2

Spark-parquet-write从上图可以看到,check时的内存是所有columns buffered size之和,那这些项都是什么含义呢?还是先给结论:

  • repetition level、definition level、data对应最后一个page(还没真的形成page)的size,是未压缩的内存消耗大小。
  • pageWriter.getMemSize 是ColumnChunkPageWriter.getMemSize,即buf.size(),是之前flush完的pages形成的压缩后的二进制字节流的大小。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSizeInMemory() {
3
    return repetitionLevelColumn.getBufferedSize()
4
        + definitionLevelColumn.getBufferedSize()
5
        + dataColumn.getBufferedSize()
6
        + pageWriter.getMemSize();
7
  }

我们先看r level、d level和data size,对应上面代码的前3项,这个比较好理解。

其中,repetitionLevelColumn和definitionLevelColumn根据maxLevel值,可能是RunLengthBitPackingHybridValuesWriter或DevNullValuesWriter对象;dataColumn根据value type由ParquetProperties.getValuesWriter决定,例如逻辑类型string对应到底层数据type binary,一般会使用PlainBinaryDictionaryValuesWriter对象。以下举几个例子。

RunLengthBitPackingHybridValuesWriter的内存消耗实际发生在encoder里,所以直接返回其encoder.size,即RunLengthBitPackingHybridEncoder.getBufferedSize()。通过后者的writeInt(),可以看到并不是每个value都立刻写入baos内存(调用writeOrAppendBitPackedRun方法),之外还有一些缓存未计算,但误差较小。

 Scala |  copy code |? 
01
  public void writeInt(int value) throws IOException {
02
    if (value == previousValue) {
03
      // keep track of how many times we've seen this value
04
      // consecutively
05
      ++repeatCount;
06
 
07
      if (repeatCount &amp;amp;amp;amp;gt;= 8) {
08
        // we've seen this at least 8 times, we're
09
        // certainly going to write an rle−run,
10
        // so just keep on counting repeats for now
11
        return;
12
      }
13
    } else {
14
      // This is a new value, check if it signals the end of
15
      // an rle−run
16
      if (repeatCount &amp;amp;amp;amp;gt;= 8) {
17
        // it does! write an rle−run
18
        writeRleRun();
19
      }
20
 
21
      // this is a new value so we've only seen it once
22
      repeatCount = 1;
23
      // start tracking this value for repeats
24
      previousValue = value;
25
    }
26
 
27
    // We have not seen enough repeats to justify an rle−run yet,
28
    // so buffer this value in case we decide to write a bit−packed−run
29
    bufferedValues[numBufferedValues] = value;
30
    ++numBufferedValues;
31
 
32
    if (numBufferedValues == 8) {
33
      // we've encountered less than 8 repeated values, so
34
      // either start a new bit−packed−run or append to the
35
      // current bit−packed−run
36
      writeOrAppendBitPackedRun();
37
    }
38
  }

PlainBinaryDictionaryValuesWriter等需形成dictionary的writers,使用的不是编码后的value内存消耗(这时就是dict index了),而是原始写入的value bytes size。因为当dict编码不合适时(例如重复率没有那么高),会fallback回原始方式,这时要防止page过大。

 Scala |  copy code |? 
1
  @Override
2
  public long getBufferedSize() {
3
    // use raw data size to decide if we want to flush the page
4
    // so the acutual size of the page written could be much more smaller
5
    // due to dictionary encoding. This prevents page being to big when fallback happens.
6
    return rawDataByteSize;
7
  }

从上面可以看到,check时内存的计算方式,其实是比较粗略的。但计算的size确实是parquet编码(已形成repetition level和definition level),且经过encode了。那压缩发生在此之后吗?(图太大,建议点开看)

Spark-parquet-write
从上面的流程图可以看到,一旦buffered size超过parquet.block.size,就会触发flush,把最后一个分片的内容调用writePage方法写出,将一个page里所有rows的repetition、definition和data以二进制的形式拼接在一起,并针对字节流进行压缩。

还有一个需要注意的点,即dictionary对象是在flush时生成的,其内存消耗也没有计算在check mem里。不过dict的大小由配置决定,一般也就是MB级别,不会太大。

再来看下ColumnChunkPageWriter.getMemSize,为什么说它是编码、压缩后的pages字节流大小呢?

这就得结合着parquet.page.size了。这个值会被compress实现类和ColumnWriteImpl类使用。这里主要关注后者,在该类的每个writeXXX方法的最后都会调用accountForValueWritten方法,该方法检查当前column的内存消耗是否达到page.size,如果达到就也会调用writePage方法。而后者在上图可以看到,会形成字节流并压缩。所以,也可以明白,为什么parquet的document里面说,page是encoding和compress的unit了。

 Scala |  copy code |? 
01
  private void accountForValueWritten() {
02
    ++ valueCount;
03
    if (valueCount &amp;gt; valueCountForNextSizeCheck) {
04
      // not checking the memory used for every value
05
      long memSize = repetitionLevelColumn.getBufferedSize()
06
          + definitionLevelColumn.getBufferedSize()
07
          + dataColumn.getBufferedSize();
08
      if (memSize &amp;gt; pageSizeThreshold) {
09
        // we will write the current page and check again the size at the predicted middle of next page
10
        valueCountForNextSizeCheck = valueCount / 2;
11
        writePage();
12
      } else {
13
        // not reached the threshold, will check again midway
14
        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
15
      }
16
    }
17
  }

从上面还可以看到,一个完整block在写入的时候,是需要全部存放在一个executor内存里的(其实读取时也一样),所以设置时,也需要考虑下自己executor的内存是否充沛。

总结

可以粗略的认为parquet.block.size控制的是最终写入磁盘的数据大小。

由于读写都是以block为单位的,较大的block size在一定的数据量下,会减少磁盘寻址,带来较高的IO吞吐。但需要注意的是,其解压后的大小可能是磁盘大小的好几倍(我们几乎达到10倍),太大也会使后续计算时内存压力变大。

参考资料

  • https://parquet.apache.org/documentation/latest/
  • http://www.infoq.com/cn/articles/in-depth-analysis-of-parquet-column-storage-format
  • http://my.oschina.net/jhone/blog/517918
  • http://tajo.apache.org/docs/0.8.0/table_management/parquet.html

问题描述

我们有一个基于Spark的项目,接收用户的输入条件,生成spark job。其中有一段逻辑是df.groupby(cols).agg(…),平时cols都是非空的,还可以正常执行,但有一天,用户选择了一个空的cols,job就hang在那儿了~

假设stage 1对应shuffle stage,stage 2对应后面的result stage,那么stage 1可以顺利跑完,且生成了默认的200个output splits。而stage 2里,199个都可以正常完成,只有一个task,每跑必挂。fail的output:

FetchFailed(BlockManagerId(42, host1, 15656), shuffleId=0, mapId=15, reduceId=169, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to host1

问题分析

从上面的日志,看起来像是stage 2的executor请求shuffle output时挂掉,但这时shuffle server在做什么呢?从spark UI里可以看到该fail的shuffle server在stage 2里也有执行task,并且也fail了!

登录到executor的container里,可以看到其gc日志,young和old都基本使用了100%,达到15GB左右,说明数据量非常大,且都是active的数据,无法被flush掉。但stage 1总共的shuffle output也不过8.9GB。

再细看该executor的stderr log,可以看到大量生成shuffle output请求的日志,累计也差不多8.9GB了!

15/12/15 13:17:30 DEBUG storage.ShuffleBlockFetcherIterator: Creating fetch request of 97451434 at BlockManagerId 

为什么会有如此大的数据倾斜发生呢?

原因定位

看一下spark里agg的实现,最终形成物理执行计划时,对应execution/Aggregate,它对shuffle的影响如下:

 Scala |  copy code |? 
01
  override def requiredChildDistribution: List[Distribution] = {
02
    if (partial) {
03
      UnspecifiedDistribution :: Nil
04
    } else {
05
      if (groupingExpressions == Nil) {
06
        AllTuples :: Nil
07
      } else {
08
        ClusteredDistribution(groupingExpressions) :: Nil
09
      }
10
    }
11
  }

原因不言而明,这时groupingExp为空,于是需要AllTuples的分布,即所有数据在一个分片上!

经验教训

在数据量超过一个节点承受范围的时候,不要直接使用DataFrame.agg,好歹使用DataFrame.groupBy().agg(),且要注意groupBy的cols一定非空!否则只能开多多的资源,让其他executor站旁边看热闹了,造成极大的资源浪费,还会使任务有高概率挂掉。

本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。

实例伪码

hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
     .mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()

SparkUI截图

在最初观察以下UI时,有几个疑问:
  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
  3. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个

直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。

9E59B789-DDA2-495F-9998-4B85648C69C2
Job 6:
008DEE8C-F1E6-4546-9640-55F00EE1D77D
Job 7:
A9EE2B86-463F-4F6C-B296-1692C578CF48

解析全览

以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。

  1. 白色图标代表coding时的API。
  2. 灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
  3. 蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
  4. 黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。
B2FA598C-82C6-4639-9D0A-8944560CE591

在我们调用spark API时,背后发生了什么呢?

这个问题得分开看。

在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。

但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。

但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。

toJavaRDD的效果

调用该方法时,会触发dataframe的解析(全览图标注为第1步):

 Scala |  copy code |? 
1
lazy val rdd: RDD[Row] = {
2
  // use a local variable to make sure the map closure doesn't capture the whole DataFrame
3
  val schema = this.schema
4
  queryExecution.executedPlan.execute().mapPartitions { rows =&gt;
5
    val converter = CatalystTypeConverters.createToScalaConverter(schema)
6
    rows.map(converter(_).asInstanceOf[Row])
7
  }
8
}

上面的queryExecution.executedPlan会触发以下一系列动作(注意,不包含execute()调用),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。

20426B48-26EC-47CE-982A-23408377025A

plan.execute()调用的效果

这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:

 Scala |  copy code |? 
01
/*Partitioner.RangePartitioner */
02
 
03
  def sketch[: ClassTag](
04
      rdd: RDD[K],
05
      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
06
    val shift = rdd.id
07
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
08
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =&gt;
09
      val seed = byteswap32(idx ^ (shift &lt;&lt; 16))
10
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
11
        iter, sampleSizePerPartition, seed)
12
      Iterator((idx, n, sample))
13
    }.collect()
14
    val numItems = sketched.map(_._2.toLong).sum
15
    (numItems, sketched)
16
  }

该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。

该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。

更详细地看下job6和7的RDD编号:

263C92DB-EC5D-4257-88E1-3C7A97AEA12C

  1. #279(含)之前的RDD都是主子job复用的
  2. 子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。
 Scala |  copy code |? 
1
  protected override def doExecute(): RDD[Row] = attachTree(this"sort") {
2
    child.execute().mapPartitions( { iterator =&gt;

由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。

rdd.collect()调用的效果

终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。

问题解答

  1. 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
    1. 因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
  2. 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
    1. stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
  • stage 6:hc.read.parquet(paths).filter(…).select(…)  + groupBy(all fileds).count()的前半段
  • stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
  • stage 8:被跳过,复用了stage6
  • stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
  • stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
  1. 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
    1. 解答与上面一样

经验与教训

1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?

这时同样会生成两个job,且都是从hdfs读取数据了~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。

2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。

最近在做一个spark应用程序验证时,发现一个奇怪的事情,抽样处理后再sort的数据居然是无序的!最后由公司内负责spark的同学予以修复,但在追查的过程中我也恰好看了看SparkSQL的处理过程,理解了SparkSQL里DataFrame的解析、partitioner与Distribution之间的关系。

问题描述

仅当如下代码时,才会出现sort无效的情况:

 Python |  copy code |? 
1
df = hc.read.parquet(path).filter("trade1='Automobile'").filter("trade2='Default'")\
2
.filter("pdate='2015−06−03'").select("userid").limit(count).repartition(1024)\
3
.sort("userid")

而如果sort前没有同时出现limit和repartition,sort的结果就是有序的。

排查过程

直觉上如果sort结果完全无序,压根过不了spark社区,不可能发布,所以一定是在某些特定场景下才会出问题,故经过尝试首先确定了以上描述的特定错误场景。

那么它与正确场景有什么区别呢?例如hc.read.parquet().filter().select().sort()?打印它们的执行计划可以看到:

72 Sort [baiduid#87 ASC], true

73  Repartition 1024, true

74   Limit 100

75    Project [baiduid#87]

76     Filter (((trade1#96 = Automobile) && (trade2#97 = Default)) && (pdate#98 = 2015-06-03))

77      PhysicalRDD [baiduid#87,trade1#96,trade2#97,pdate#98], UnionRDD[63] at toString at NativeMethodAccessorImpl.java:-2

 

而正常情况下:

150 Sort [baiduid#267 ASC], true

151  Exchange (RangePartitioning 200)

152   Repartition 1024, true

153    Project [baiduid#267]

154     Filter (((trade1#276 = Automobile) && (trade2#277 = Default)) && (pdate#278 = 2015-06-03))

155      PhysicalRDD [baiduid#267,trade1#276,trade2#277,pdate#278], UnionRDD[203] at toString at NativeMethodAccessorImpl.java:-2

可以看到正常时多了一个Exchange步骤,但我们的代码里肯定没有显式调用它。

修复方式及原因总结

在execution/basicOperators.scala的Repartition添加如下代码:
 Scala |  copy code |? 
1
  override def outputPartitioning = {
2
    if (numPartitions == 1) SinglePartition
3
    else UnknownPartitioning(numPartitions)
4
  }

原因是如果没有override,该值=child.outputPartitioning,在此场景是Limit.outputParititioning = SinglePartition,意味着满足sort分布要求(但实际却是无序的)。

要想修复这个bug,还得对spark的执行过程有所了解,所以我们先来看它。

SparkSQL解析过程分析

我们使用hc.read.parquet()初始化一个dataframe时,此时需关注的是LogicalPlan:

 Scala |  copy code |? 
1
      sqlContext.baseRelationToDataFrame(
2
        new ParquetRelation2(
3
          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))

即此时的logicalPlan = LogicalRelation(ParquetRelation2)

而filter、select、sort等操作,都会将之前的dataframe.logicalPlan作为child,再生成包含新的logicalPlan的新的DataFrame。在我们的问题示例代码里,拥有这些LogicalPlan(按生成的先后顺序):

  1. LogicalRelation(ParquetRelation2) — parquet生成
  2. Filter 3个
  3. Project
  4. Limit
  5. Repartition
  6. Sort  — 最终的LogicalPlan

假定我们此时通过针对Sort的dataframe.collect()触发spark执行,其实是触发了

queryExecution.executedPlan.executeCollect()的执行,这儿是对SQLContext内部类QueryExecution的lazy变量executedPlan的调用,并间接触发了一系列过程(为了描述方便,忽略了细节):

  1. analyzed = analyzer.execute(logical)
  2. optimizedPlan = optimizer.execute(withCachedData)
  3. sparkPlan = planner.plan(optimizedPlan).next()
  4. executedPlan = prepareForExecution.execute(sparkPlan)

(当然,如果调用的不是DataFrame api而是sql的话,还需要parseSQL过程,这里忽略了)

Analyzer过程

它会调用一堆filter chain来处理这些logicalPlan tree,通过以下代码可以看到:

 Bash |  copy code |? 
01
/* Analyzer.scala */
02
  lazy val batches: Seq[Batch] = Seq(
03
    Batch("Substitution", fixedPoint,
04
      CTESubstitution ::
05
      WindowsSubstitution ::
06
      Nil : _*),
07
    Batch("Resolution", fixedPoint,
08
      ResolveRelations ::
09
      ResolveReferences ::
10
      ResolveGroupingAnalytics ::
11
      ResolveSortReferences ::
12
      ResolveGenerate ::
13
      ResolveFunctions ::
14
      ExtractWindowExpressions ::
15
      GlobalAggregates ::
16
      UnresolvedHavingClauseAttributes ::
17
      TrimGroupingAliases ::
18
      typeCoercionRules ++
19
      extendedResolutionRules : _*)
20
  )
21
 
22
/* HiveTypeCoercion */
23
  val typeCoercionRules =
24
    PropagateTypes ::
25
    ConvertNaNs ::
26
    InConversion ::
27
    WidenTypes ::
28
    PromoteStrings ::
29
    DecimalPrecision ::
30
    BooleanComparisons ::
31
    BooleanCasts ::
32
    StringToIntegralCasts ::
33
    FunctionArgumentConversion ::
34
    CaseWhenCoercion ::
35
    Division ::
36
    PropagateTypes ::
37
    ExpectedInputConversion ::
38
    Nil
39
 
40
/* SQLContext.scala */
41
    new Analyzer(catalog, functionRegistry, conf) {
42
      override val extendedResolutionRules =
43
        ExtractPythonUdfs ::
44
        sources.PreInsertCastAndRename ::
45
        Nil
46
 
47
      override val extendedCheckRules = Seq(
48
        sources.PreWriteCheck(catalog)
49
      )
50
    }

 

该过程主要将各个table filed绑定到真正的table结构上去,也会发现field错误、类型错误、类型适配等。但整体而言,它只是完成分析过程,不会改变执行计划。

Optimizer过程

故名思议,这一阶段会进行优化,如果sql或api调用写的太烂,这一阶段完成,可能会跟原始语句有较大变化。但本质上,它只是把本地可以得出结论的语句优化掉(例如针对非nullable的字段判断is null,那肯定是true的),或者把可合并的合并了(例如多个filter、select等),也会稍微调整filter、union等的顺序。

 Scala |  copy code |? 
01
  val batches =
02
    // SubQueries are only needed for analysis and can be removed before execution.
03
    Batch("Remove SubQueries", FixedPoint(100),
04
      EliminateSubQueries) ::
05
    Batch("Operator Reordering", FixedPoint(100),
06
      UnionPushdown,
07
      CombineFilters,
08
      PushPredicateThroughProject,
09
      PushPredicateThroughJoin,
10
      PushPredicateThroughGenerate,
11
      ColumnPruning,
12
      ProjectCollapsing,
13
      CombineLimits) ::
14
    Batch("ConstantFolding", FixedPoint(100),
15
      NullPropagation,
16
      OptimizeIn,
17
      ConstantFolding,
18
      LikeSimplification,
19
      BooleanSimplification,
20
      SimplifyFilters,
21
      SimplifyCasts,
22
      SimplifyCaseConversionExpressions) ::
23
    Batch("Decimal Optimizations", FixedPoint(100),
24
      DecimalAggregates) ::
25
    Batch("LocalRelation", FixedPoint(100),
26
      ConvertToLocalRelation) :: Nil

prepareForExcution阶段

它只做了一件很关键的事情:

 Scala |  copy code |? 
1
    val batches =
2
      Batch("Add exchange", Once, EnsureRequirements(self)) :: Nil

在这个EnsureRequirements类里,会检查children们的partitioner是否匹配要求的Distribution!我们的错误示例中,就是因为Repartition错误的设置了自己的outputPartition才出的问题。继续往下看。

Partitioning与Distribution的关系

EnsureRequirements类的作用:

 * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
 * of input data meets the
 * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
 * each operator by inserting [[Exchange]] Operators where required.  Also ensure that the
 * required input partition ordering requirements are met.
确保paritition的input data满足Distribution的要求,以及排序要求。
先来看下什么是Distribution要求吧:
  • Distribution 基类
 * Specifies how tuples that share common expressions will be distributed when a query is executed
描述了当一个query在一组服务器并行执行后,拥有相同expressions的tuples会如何分布。
 * in parallel on many machines.  Distribution can be used to refer to two distinct physical
 * properties:
可被用于两种物理属性:
 *  – Inter-node partitioning of data: In this case the distribution describes how tuples are
 *    partitioned across physical machines in a cluster.  Knowing this property allows some
 *    operators (e.g., Aggregate) to perform partition local operations instead of global ones.
节点内的分片:在这种情况下,描述了tuples在物理集群里是如何分片的。了解这个属性后,一些算子(例如aggregate)就可以在分片里执行操作,而非集群规模。
 *  – Intra-partition ordering of data: In this case the distribution describes guarantees made
 *    about how tuples are distributed within a single partition.
节点间的分片:在这种情况下,描述了tuples在一个分片内,是如何分布的
  • UnspecifiedDistribution 直接继承Distribution,代表没有顺序和分布的保证
  • AllTuples 代表只有一个分片
  • ClusteredDistribution 代表相同“key”会放到同一个分片,或是连续分布在一个仅有的partition
  • OrderedDistribution 代表相同“key”会在同一分片,且是连续分布的
再看下什么是Partition:
  • Partitioning 基类
  • UnknownPartitioning
  • SinglePartition
  • BroadcastPartitioning
  • HashPartitioning
  • RangePartitioning
从partitioning.scala里可以看到不同的partition方式都满足那些Distribution要求:
  • UnknownPartitioning.satisfies(UnspecifiedDistribution) == true
  • SinglePartition.satisfies(ALL) == true
  • BroadcastPartitioning.satisfies(ALL) == true
  • HashPartitioning.satisfies(UnspecifiedDistribution) == true
  • HashPartitioning.satisfies(ClusteredDistribution) 在hash的expression是required的子集时 == true
  • RangePartitioning.satisfies(UnspecifiedDistribution) == true
  • RangePartitioning.satisfies(OrderedDistribution) 在排序fields 前者包含后者,或后者包含前者时,且顺序一致时 == true
  • RangePartitioning.satisfies(ClusteredDistribution) 在Range的expression是required的子集时 == true
  • 其他都是false

针对每个child,会计算是否满足distribution要求:val valid = child.outputPartitioning.satisfies(required)

并且打印如下log:(current是child.outputPartitioning)

15/11/12 15:10:03 DEBUG EnsureRequirements: Valid distribution,required: OrderedDistribution(List(userid#0 ASC)) current: SinglePartition

从上面可以看到,只有SinglePartition和RangePartitioning才满足已排序分布的要求,而我们的错误示例里,居然已经是SinglePartition了!

在org.apache.spark.sql.execution.Limit里,可以看到其override def outputPartitioning: Partitioning = SinglePartition,即只要limit操作,就会变成一个分片,这也make sense。

但org.apache.spark.sql.execution.Repartition里,由于没有override这个method,所以继承的是父类的值:override def outputPartitioning: Partitioning = child.outputPartitioning!而它的child是Limit……,所以repartition完了,该值也是SinglePartition了,满足sort排序的要求……(但实际是不满足的)

将Repartition的outputpart修改为UnknownPartitioning就可以了。这时由于不满足分布要求,EnsureRequirement会插入Exchange执行计划。

Exchange类

* Performs a shuffle that will result in the desired `newPartitioning`.  Optionally sorts each

* resulting partition based on expressions from the partition key.  It is invalid to construct an

* exchange operator with a `newOrdering` that cannot be calculated using the partitioning key.

该类会通过shuffle获得新的分片。可选的按照partitionKey对结果分片进行排序。使用不可作用于partitionKey的newOrdering,来构造一个exchange operator是不合法的。

 

该类的doExecute()方法,分别针对HashPartitioning、RangePartitioning、SinglePartition进行处理。在这些case的必要情况下,会提前把待shuffle的对象作copy,再发送给externsorter。

 

 

* Determines whether records must be defensively copied before being sent to the shuffle.

决定在进行shuffle前是否需要保守地复制。

* Several of Spark’s shuffle components will buffer deserialized Java objects in memory. The

一些spark的shuffle组件会在内存中,缓存一定的反序列化后的java对象。

* shuffle code assumes that objects are immutable and hence does not perform its own defensive

这些shuffle代码假设对象是不可变的,所以不自行进行保守地复制。

* copying. In Spark SQL, however, operators’ iterators return the same mutable `Row` object. In

但是在sparksql里,算子的迭代器返回了可变的Row对象。

* order to properly shuffle the output of these operators, we need to perform our own copying

为了正确排序这些sparksql算子的output,我们需要先把数据copy出来。

* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it

这些copy是高代价的,所以要尽量避免它。

* whenever possible. This method encapsulates the logic for choosing when to copy.

这个方法封装了是否需要copy的逻辑。

*

* In the long run, we might want to push this logic into core’s shuffle APIs so that we don’t

* have to rely on knowledge of core internals here in SQL.

在未来,这部分逻辑会移入核心shuffle API。

*

* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.

 

宏观架构,关注系统层面的设计。

《面向模式的软件体系结构》共5卷,集大成者,内容非常全面(虽然略显陈旧),对于想要建立架构观念的同学非常好。我看的时候,已经绝版,还是从淘宝买的复印版,翻译不好,但望文生义地猜去,仍然学到不少。目前amazon上有前3卷可以买。

架构之美》,几个架构实例,从企业级到互联网,覆盖了需求分析、技术选型、折衷、架构设计,思考和陈述方式值得借鉴。

架构实战 – 软件架构设计的过程》,不过多涉及技术,而是通过实例完整列出架构过程,如何开始、各阶段参与的角色、产出啥、如何验证等等,对于刚开始做架构的同学,至少知道按图索骥,不至于落了东西。

思考软件,创新设计 – A端架构师的思考技术》,高焕堂出品。在我苦闷的思考,业务架构师价值、如何才能牛X的时候,给了一盏还算明亮的小灯。读来轻松,可以一试。

微观架构,关注代码层面的设计。

重构 – 改善既有代码的设计》,大神Martin Fowler的著作,必须一读。这本书让我爱上重构,知道什么是漂亮的代码,也在对代码的不断优化中进步。

设计模式》,如此经典的书籍,没看过2、3遍都不好意思吧?首先,说明了平时编码中用到的那些小技术都是啥;而一旦完成抽象这一步,就可以在大脑里比较各种实现方式的优劣,完成模式选型。同时,也便于交流,说一个生产消费就知道要干啥了。

代码整洁之道》,还是讲什么是好代码的书。

《代码大全》,很多人推崇的书,巨厚。说实话没完整阅读过。

不管是架构师,还是工程师,代码是看家本领,所以以上几本书还是建议仔细、反复阅读,并在coding当中理解。

其他补充推荐

架构师除了关注技术,流程、可测试性、人、资源协调等等都需要cover到,所以再补充几本书。

人月神话》,经典书籍,不管懂不懂都值得一读。

《JUnit实战》、《渗透测试实践指南》等,没看完,想强调的是,架构师得了解并关注测试。

UNIX编程艺术》,你可能不写C,但UNIX的设计思想,强调至简至美的调调,绝对值得学习,尤其这本书还很好笑。

金字塔原理》,作为架构师,可能会经常写PPT、文档,与人沟通,这本书教会我说人话。

再来本打鸡血的书,让我们缅怀着黑客的美好岁月前行:

黑客》,前半本就是硬件与自由软件的黄金岁月,后半本我读不下去。

最后,我认为,一个好的架构师,除了耍嘴皮,必然也对所在领域有非常深入的了解,所以除了这里推荐的架构书籍,请务必深挖你本身就擅长的技术。

 

How-to: Tune Your Apache Spark Jobs (Part 1)

Learn techniques for tuning your Apache Spark jobs for optimal efficiency.

When you write Apache Spark code and page through the public APIs, you come across words like transformation,action, and RDD. Understanding Spark at this level is vital for writing Spark programs. Similarly, when things start to fail, or when you venture into the web UI to try to understand why your application is taking so long, you’re confronted with a new vocabulary of words like job, stage, and task. Understanding Spark at this level is vital for writing goodSpark programs, and of course by good, I mean fast. To write a Spark program that will execute efficiently, it is very, very helpful to understand Spark’s underlying execution model.

In this post, you’ll learn the basics of how Spark programs are actually executed on a cluster. Then, you’ll get some practical recommendations about what Spark’s execution model means for writing efficient programs.

How Spark Executes Your Program

A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.

The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running, although dynamic resource allocation changes that for the latter. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime. Deploying these processes on the cluster is up to the cluster manager in use (YARN, Mesos, or Spark Standalone), but the driver and executor themselves exist in every Spark application.

At the top of the execution hierarchy are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it. To decide what this job looks like, Spark examines the graph of RDDs on which that action depends and formulates an execution plan. This plan starts with the farthest-back RDDs—that is, those that depend on no other RDDs or reference already-cached data–and culminates in the final RDD required to produce the action’s results.

The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

What determines whether data needs to be shuffled? Recall that an RDD comprises a fixed number of partitions, each of which comprises a number of records. For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD. Each object is only dependent on a single object in the parent. Operations like coalesce can result in a task processing multiple input partitions, but the transformation is still considered narrow because the input records used to compute any single output record can still only reside in a limited subset of the partitions.

However, Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

For example, consider the following code:

It executes a single action, which depends on a sequence of transformations on an RDD derived from a text file. This code would execute in a single stage, because none of the outputs of these three operations depend on data that can come from different partitions than their inputs.

In contrast, this code finds how many times each character appears in all the words that appear more than 1,000 times in a text file.

This process would break down into three stages. The reduceByKey operations result in stage boundaries, because computing their outputs requires repartitioning the data by keys.

Here is a more complicated transformation graph including a join transformation with multiple dependencies.

The pink boxes show the resulting stage graph used to execute it.

At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible. The number of data partitions in the parent stage may be different than the number of partitions in the child stage. Transformations that may trigger a stage boundary typically accept a numPartitionsargument that determines how many partitions to split the data into in the child stage.

Just as the number of reducers is an important parameter in tuning MapReduce jobs, tuning the number of partitions at stage boundaries can often make or break an application’s performance. We’ll delve deeper into how to tune this number in a later section.

Picking the Right Operators

When trying to accomplish something with Spark, a developer can usually choose from many arrangements of actions and transformations that will produce the same results. However, not all these arrangements will result in the same performance: avoiding common pitfalls and picking the right arrangement can make a world of difference in an application’s performance. A few rules and insights will help you orient yourself when these choices come up.

Recent work in SPARK-5097 began stabilizing SchemaRDD, which will open up Spark’s Catalyst optimizer to programmers using Spark’s core APIs, allowing Spark to make some higher-level choices about which operators to use. When SchemaRDD becomes a stable component, users will be shielded from needing to make some of these decisions.

The primary goal when choosing an arrangement of operators is to reduce the number of shuffles and the amount of data shuffled. This is because shuffles are fairly expensive operations; all shuffle data must be written to disk and then transferred over the network. repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. Not all these operations are equal, however, and a few of the most common performance pitfalls for novice Spark developers arise from picking the wrong one:

  • Avoid groupByKey when performing an associative reductive operation. For example,rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _). However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
  • Avoid reduceByKey When the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. One way would be to use map to transform each element into a Set and then combine the Sets with reduceByKey:

    This code results in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:

  • Avoid the flatMap-join-groupBy pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.

When Shuffles Don’t Happen

It’s also useful to be aware of the cases in which the above transformations will not result in shuffles. Spark knows to avoid a shuffle when a previous transformation has already partitioned the data according to the same partitioner. Consider the following flow:

Because no partitioner is passed to reduceByKey, the default partitioner will be used, resulting in rdd1 and rdd2 both hash-partitioned. These two reduceByKeys will result in two shuffles. If the RDDs have the same number of partitions, the join will require no additional shuffling. Because the RDDs are partitioned identically, the set of keys in any single partition of rdd1 can only show up in a single partition of rdd2. Therefore, the contents of any single output partition of rdd3 will depend only on the contents of a single partition in rdd1 and single partition in rdd2, and a third shuffle is not required.

For example, if someRdd has four partitions, someOtherRdd has two partitions, and both the reduceByKeys use three partitions, the set of tasks that execute would look like:

What if rdd1 and rdd2 use different partitioners or use the default (hash) partitioner with different numbers partitions?  In that case, only one of the rdds (the one with the fewer number of partitions) will need to be reshuffled for the join.

Same transformations, same inputs, different number of partitions:

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor. A map transformation can then reference the hash table to do lookups.

When More Shuffles are Better

There is an occasional exception to the rule of minimizing the number of shuffles. An extra shuffle can be advantageous to performance when it increases parallelism. For example, if your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions (which will trigger a shuffle) after loading the data will allow the operations that come after it to leverage more of the cluster’s CPU.

Another instance of this exception can arise when using the reduce or aggregate action to aggregate data into the driver. When aggregating over a high number of partitions, the computation can quickly become bottlenecked on a single thread in the driver merging all the results together. To loosen the load on the driver, one can first usereduceByKey or aggregateByKey to carry out a round of distributed aggregation that divides the dataset into a smaller number of partitions. The values within each partition are merged with each other in parallel, before sending their results to the driver for a final round of aggregation. Take a look at treeReduce and treeAggregate for examples of how to do that. (Note that in 1.2, the most recent version at the time of this writing, these are marked as developer APIs, but SPARK-5430 seeks to add stable versions of them in core.)

This trick is especially useful when the aggregation is already grouped by a key. For example, consider an app that wants to count the occurrences of each word in a corpus and pull the results into the driver as a map.  One approach, which can be accomplished with the aggregate action, is to compute a local map at each partition and then merge the maps at the driver. The alternative approach, which can be accomplished with aggregateByKey, is to perform the count in a fully distributed way, and then simply collectAsMap the results to the driver.

Secondary Sort

Another important capability to be aware of is the repartitionAndSortWithinPartitions transformation. It’s a transformation that sounds arcane, but seems to come up in all sorts of strange situations. This transformation pushes sorting down into the shuffle machinery, where large amounts of data can be spilled efficiently and sorting can be combined with other operations.

For example, Apache Hive on Spark uses this transformation inside its join implementation. It also acts as a vital building block in the secondary sort pattern, in which you want to both group records by key and then, when iterating over the values that correspond to a key, have them show up in a particular order. This issue comes up in algorithms that need to group events by user and then analyze the events for each user based on the order they occurred in time. Taking advantage of repartitionAndSortWithinPartitions to do secondary sort currently requires a bit of legwork on the part of the user, but SPARK-3655 will simplify things vastly.

Conclusion

You should now have a good understanding of the basic factors in involved in creating a performance-efficient Spark program! In Part 2, we’ll cover tuning resource requests, parallelism, and data structures.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Spark committer, and an Apache Hadoop PMC member. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

How-to: Tune Your Apache Spark Jobs (Part 2)

In the conclusion to this series, learn how resource tuning, parallelism, and data representation affect Spark job performance.

In this post, we’ll finish what we started in “How to Tune Your Apache Spark Jobs (Part 1)”. I’ll try to cover pretty much everything you could care to know about making a Spark program run fast. In particular, you’ll learn about resource tuning, or configuring Spark to take advantage of everything the cluster has to offer. Then we’ll move to tuning parallelism, the most difficult as well as most important parameter in job performance. Finally, you’ll learn about representing the data itself, in the on-disk form which Spark will read (spoiler alert: use Apache Avro or Apache Parquet) as well as the in-memory format it takes as it’s cached or moves through the system.

Tuning Resource Allocation

The Spark user list is a litany of questions to the effect of “I have a 500-node cluster, but when I run my application, I see only two tasks executing at a time. HALP.” Given the number of parameters that control Spark’s resource utilization, these questions aren’t unfair, but in this section you’ll learn how to squeeze every last bit of juice out of your cluster. The recommendations and configurations here differ a little bit between Spark’s cluster managers (YARN, Mesos, and Spark Standalone), but we’re going to focus only on YARN, which Cloudera recommends to all users.

For some background on what it looks like to run Spark on YARN, check out my post on this topic.

The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.

Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on aSparkConf object. Similarly, the heap size can be controlled with the --executor-cores flag or thespark.executor.memory property. The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.

The --num-executors command-line flag or spark.executor.instances configuration property control the number of executors requested. Starting in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamicAllocation.enabled property. Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.

It’s also important to think about how the resources requested by Spark will fit into what YARN has available. The relevant YARN properties are:

  • yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.

Asking for five executor cores will result in a request to YARN for five virtual cores. The memory requested from YARN is a little more complex for a couple reasons:

  • --executor-memory/spark.executor.memory controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of thespark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
  • YARN may round the requested memory up a little. YARN’s yarn.scheduler.minimum-allocation-mb andyarn.scheduler.increment-allocation-mb properties control the minimum and increment request values respectively.

The following (not to scale with defaults) shows the hierarchy of memory properties in Spark and YARN:

And if that weren’t enough to think about, a few final concerns when sizing Spark executors:

  • The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the --driver-memory and --driver-cores properties.
  • Running executors with too much memory often results in excessive garbage collection delays. 64GB is a rough guess at a good upper limit for a single executor.
  • I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.
  • Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb andyarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically.

The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:

  • 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
  • The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
  • 15 cores per executor can lead to bad HDFS I/O throughput.

A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?

  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
  • --executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07 = 1.47.  21 – 1.47 ~ 19.

Tuning Parallelism

Spark, as you have likely figured out by this point, is a parallel processing engine. What is maybe less obvious is that Spark is not a “magic” parallel processing engine, and is limited in its ability to figure out the optimal amount of parallelism. Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.

How is this number determined? The way Spark groups RDDs into stages is described in the previous post. (As a quick reminder, transformations like repartition and reduceByKey induce stage boundaries.) The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: thecoalescetransformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.

What about RDDs with no parents? RDDs produced by textFile or hadoopFile have their partitions determined by the underlying MapReduce InputFormat that’s used. Typically there will be a partition for each HDFS block being read. Partitions for RDDs produced by parallelize come from the parameter given by the user, orspark.default.parallelism if none is given.

To determine the number of partitions in an RDD, you can always call rdd.partitions().size().

The primary concern is that the number of tasks will be too small. If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available.

A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task. Any join, cogroup, or *ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger.

When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue. First, holding many records in these data structures puts pressure on garbage collection, which can lead to pauses down the line. Second, when the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting. This overhead during large shuffles is probably the number one cause of job stalls I have seen at Cloudera customers.

So how do you increase the number of partitions? If the stage in question is reading from Hadoop, your options are:

  • Use the repartition transformation, which will trigger a shuffle.
  • Configure your InputFormat to create more splits.
  • Write the input data out to HDFS with a smaller block size.

If the stage is getting its input from another stage, the transformation that triggered the stage boundary will accept anumPartitions argument, such as

What should “X” be? The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving.

There is also a more principled way of calculating X, but it’s difficult to apply a priori because some of the quantities are difficult to calculate. I’m including it here not because it’s recommended for daily use, but because it helps with understanding what’s going on. The main goal is to run enough tasks so that the data destined for each task fits in the memory available to that task.

The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively.

The in-memory size of the total shuffle data is harder to determine. The closest heuristic is to find the ratio between Shuffle Spill (Memory) metric and the Shuffle Spill (Disk) for a stage that ran. Then multiply the total shuffle write by this number. However, this can be somewhat compounded if the stage is doing a reduction:

Then round up a bit because too many partitions is usually better than too few partitions.

In fact, when in doubt, it’s almost always better to err on the side of a larger number of tasks (and thus partitions). This advice is in contrast to recommendations for MapReduce, which requires you to be more conservative with the number of tasks. The difference stems from the fact that MapReduce has a high startup overhead for tasks, while Spark does not.

Slimming Down Your Data Structures

Data flows through Spark in the form of records. A record has two representations: a deserialized Java object representation and a serialized binary representation. In general, Spark uses the deserialized representation for records in memory and the serialized representation for records stored on disk or being transferred over the network. There is work planned to store some in-memory shuffle data in serialized form.

The spark.serializer property controls the serializer that’s used to convert between these two representations. The Kryo serializer, org.apache.spark.serializer.KryoSerializer, is the preferred option. It is unfortunately not the default, because of some instabilities in Kryo during earlier versions of Spark and a desire not to break compatibility, but the Kryo serializer should always be used

The footprint of your records in these two representations has a massive impact on Spark performance. It’s worthwhile to review the data types that get passed around and look for places to trim some fat.

Bloated deserialized objects will result in Spark spilling data to disk more often and reduce the number of deserialized records Spark can cache (e.g. at the MEMORY storage level). The Spark tuning guide has a great section on slimming these down.

Bloated serialized objects will result in greater disk and network I/O, as well as reduce the number of serialized records Spark can cache (e.g. at the MEMORY_SER storage level.)  The main action item here is to make sure to register any custom classes you define and pass around using the SparkConf#registerKryoClasses API.

Data Formats

Whenever you have the power to make the decision about how data is stored on disk, use an extensible binary format like Avro, Parquet, Thrift, or Protobuf. Pick one of these formats and stick to it. To be clear, when one talks about using Avro, Thrift, or Protobuf on Hadoop, they mean that each record is a Avro/Thrift/Protobuf struct stored in a sequence file. JSON is just not worth it.

Every time you consider storing lots of data in JSON, think about the conflicts that will be started in the Middle East, the beautiful rivers that will be dammed in Canada, or the radioactive fallout from the nuclear plants that will be built in the American heartland to power the CPU cycles spent parsing your files over and over and over again. Also, try to learn people skills so that you can convince your peers and superiors to do this, too.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Spark committer, and an Apache Hadoop PMC member. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

 

zz from:

  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

业务场景及问题描述

一个分析类项目,由上层用户定制多种分析需求,每个需求可能涉及GB甚至TB级别数据。源数据是被打上多种标签的用户行为日志,如果每个标签KEY对应一列,则可能存在几千列,且较为稀疏。由于标签是策略挖掘出来的,故经常发生变化,对新入库数据产生影响。

这就要求:

  1. 底层存储读取性能高,不拖累计算耗时;压缩比高,尽量节省存储空间
  2. 数据Schema支持动态扩展,且后向兼容性好

计算平台基于Spark,存储结构自然想到使用columnar结构,在多种paper和性能测试里看到parquet基本满足需求,倾向于选择它。但是使用tags map<string, array>存储标签数据,还是tag_xxx array的格式呢?两者是否都能使用到spark的pushdown功能,以跳过不需要的columns呢?

以下将从源码和日志两方面进行分析,结论是都可以使用到pushdown,性能理论上基本持平(未数据验证)。而显然map方式的扩展性更好,map key的增删可以完全由业务层控制。

SparkSQL源码分析

网上对SparkSQL的解析处理过程分析较多,例如:

  • http://www.csdn.net/article/2014-07-15/2820658/2
  • http://blog.csdn.net/oopsoom/article/details/37658021
  • http://mmicky.blog.163.com/blog/static/150290154201487102349800/ (共十篇,比较全面)

SparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

sparkSQL1.1总体上由四个模块组成:

  • core 处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
  • catalyst 处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
  • hive 对hive数据的处理
  • hive-ThriftServer 提供CLI和JDBC/ODBC接口

saveAsParquetFile

以下直接深入与Parquet文件读写相关代码,假设我们调用了schemaRDD.saveAsParquetFile(…),调用链上核心的类如下:

sparksql-uml-class

关键的调用关系如图:

sparksql-saveAsParquetFile

结合代码来看。

 Scala |  copy code |? 
1
  def saveAsParquetFile(path: String): Unit = {
2
    sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
3
  }

可以看到saveAsParquetFile生成了WriteToFile这个LogicPlan子类,Lazy调度后,在ParquetOperations.apply()方法里,会触发真正的执行过程:

 Scala |  copy code |? 
1
  object ParquetOperations extends Strategy {
2
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3
      // TODO: need to support writing to other types of files.  Unify the below code paths.
4
      case logical.WriteToFile(path, child) =&gt;
5
        val relation =
6
          ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
7
        // Note: overwrite=false because otherwise the metadata we just created will be deleted
8
        InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil

在InsertIntoParquetTable.execute()方法里会调用self的saveAsHadoopFile(),这个方法针对RDD每一行,调用writeShard执行写入:

 Scala |  copy code |? 
01
    def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
02
      // Hadoop wants a 32−bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
03
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
04
      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
05
      /* "reduce task" &lt;split #&gt; &lt;attempt # = spark task #&gt; */
06
      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
07
        attemptNumber)
08
      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
09
      val format = new AppendingParquetOutputFormat(taskIdOffset)
10
      val committer = format.getOutputCommitter(hadoopContext)
11
      committer.setupTask(hadoopContext)
12
      val writer = format.getRecordWriter(hadoopContext)
13
      try {
14
        while (iter.hasNext) {
15
          val row = iter.next()
16
          writer.write(null, row)
17
        }
18
      } finally {
19
        writer.close(hadoopContext)
20
      }
21
      committer.commitTask(hadoopContext)
22
      1
23
    }

这里有几个重要的变量,都标注在核心类那张图上了。writer.write依次触发了ParquetRecordWriter.write()、InternalParquetRecordWriter.write()、RowWriteSupport.write()将数据写入内存,并“定期”调用InternalParquetRecordWriter.checkBlockSizeReached()检查是否需要格式化并flush到磁盘(一般是HDFS)。

schema生成

在说write和flush前,得先确定schema在哪里生成的。writeShard里调用val writer = format.getRecordWriter(hadoopContext),触发:

 Scala |  copy code |? 
1
    WriteContext init = writeSupport.init(conf);
2
    ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);

其中很关键的方法是ParquetTypes.fromDataType()

public static parquet.schema.Type fromDataType(org.apache.spark.sql.catalyst.types.DataType ctype,
                               String name,
                               boolean nullable,
                               boolean inArray)

Converts a given Catalyst DataType into the corresponding Parquet Type.The conversion follows the rules below:

  • Primitive types are converted into Parquet’s primitive types.
  • StructTypes are converted into Parquet’s GroupType with the corresponding field types.
  • ArrayTypes are converted into a 2-level nested group, where the outer group has the inner group as sole field. The inner group has name values and repetition level REPEATED and has the element type of the array as schema. We use Parquet’s ConversionPatterns for this purpose.
  • MapTypes are converted into a nested (2-level) Parquet GroupType with two fields: a key type and a value type. The nested group has repetition level REPEATED and name map. We use Parquet’sConversionPatterns for this purpose

Parquet’s repetition level is generally set according to the following rule:

  • If the call to fromDataType is recursive inside an enclosing ArrayType or MapType, then the repetition level is set to REPEATED.
  • Otherwise, if the attribute whose type is converted is nullable, the Parquet type gets repetition level OPTIONAL and otherwise REQUIRED.

write过程

回到write这条线,会递归调用writeValue()、writePrimitive()、writeStruct()、writeMap()、writeArray()将基本类型和struct、map、array保存起来。其中根据schema,又会触发MessageColumnIORecordConsumer内部类的startMessage()、startField()、startGroup()等方法。这3个概念很重要,应该说就是基于这些才实现了dremel论文里的数据结构:

  • Message: a new record,对应一行
  • Field: a field in a group or message. if the field is repeated the field is started only once and all values added in between start and end,对应一个kv对,key是field name,value可以是group,从而嵌套其他数据结构
  • Group: a group in a field

由于采用了抽象嵌套的方式,map<string, array>首先是一个field整体,而其中每一个string, array也构成一个子field。在底层存储和建索引的时候,应该是nested最深的field对应一个column,针对这些column计算r和d的值。

flush过程:

再来看flush这条线:

 Scala |  copy code |? 
1
# InternalParquetRecordWriter
2
  public void write(T value) throws IOException, InterruptedException {
3
    writeSupport.write(value);
4
    ++ recordCount;
5
    checkBlockSizeReached();
6
  }

如果需要flush,则checkBlockSizeReached会再调用flushRowGroupToStore()落地磁盘,并调用initStore()重新初始化内存存储区。

SparkSQL日志分析

以上源码分析的推论是,nested结构的每一个field都会被存储为一列,以下通过写入和查询日志再予以证明。

数据结构类似:

 SQL |  copy code |? 
01
CREATE TABLE IF NOT EXISTS orc_qiche (
02
    query string,
03
    uid string,
04
    country string,
05
    province string,
06
    city string,
07
    sttime string,
08
    properties map&lt;string, array&lt;string&gt;&gt;,
09
    intension array&lt;string&gt;
10
    )
11

其中properties含有brand、effects等key。

写入日志:

 Scala |  copy code |? 
1
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 88B for [properties, brand, array] BINARY: 2 values, 34B raw, 50B comp, 1 pages,
2
encodings: [RLE, PLAIN]
3
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 85B for [properties, effects, array] BINARY: 2 values, 33B raw, 48B comp, 1 pages
4
, encodings: [RLE, PLAIN]
5
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 69B for [properties, brand, array] BINARY: 1 values, 21B raw, 36B comp, 1 pages,
6
encodings: [RLE, PLAIN]
7
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 72B for [query] BINARY: 1 values, 16B raw, 35B comp, 1 pages, encodings: [RLE, BI
8
T_PACKED, PLAIN]

查询日志:

8A542FE1-9484-4B2C-9184-DEF07113B83B

from: http://blog.thislongrun.com/2015/07/Forfeit-Partition-Tolerance-Distributed-System-CAP-Theorem.html?m=1
The CA–consistent, available, but not network partition tolerant–category in CAP has a very specific history. Not only forfeiting “network partition tolerance” can be understood as impossible in theory and crazy in practice (P as an illusion of a choice), but there is also an overlap between the CA and CP categories. As a result, many consider that it’s impossible to build a production CA system. But you can actually build a system without network partition tolerance, and sometimes you should.

A brief history of the CA category

Let’s look at the academic history of the CA category in CAP:
  • In 2000, Eric Brewer presents the CAP conjecture. In his presentation, CA exists, for example for systems using the two-phase commit protocol. He considers that “‹the whole space is useful.”
  • In 2002, Seth Gilbert and Nancy Lynch publish the CAP proof. CA exists: “Systems that run on intranets and LANs are an example of these types of algorithms.
  • In 2010, Daniel Abadi raises the point that there is an overlap between CA and CP: “What does ‘not tolerant’ mean? In practice, it means that they lose availability if there is a partition. Hence CP and CA are essentially identical.
  • Still in 2010, Michael Stonebraker publishes multiple documents around the limited importance of partitions, with the tagline “Myth #6: In CAP, choose AP over CA”, considering that with the capacity of modern hardware, small distributed systems can solve most real-life issues, and that “it doesn’t much matter what you do when confronted with network partitions.”
  • And again in 2010, Coda Hale publishes a blog post: “You cannot sacrifice partition tolerance”, explaining that only AP and CP are possible.
  • This triggers a feedback from Stonebraker, who restates all his points.
  • 2 years later, in 2012, referring to these works, Eric Brewer states that “exactly what it means to forfeit P is unclear” and then clarifies: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures, such as disasters or multiple simultaneous faults.”


So we need to sort out the following issues:
  • There is an overlap between CP and CA.
  • There is a theoretical impossibility: network partitions are a given, you can choose between ‘A’ and ‘C’ when a partition happens but not if partitions happen.
  • There is a practical impossibility: network partitions are too likely to happen on a real life system to be ignored, so CA is impossible in practice.
What does CA mean?
CA is about “forfeiting network partition tolerance”, i.e. being “network partition intolerant”. Partition intolerance does not mean that network partitions cannot happen, it means network partitions are a critical issue. It’s a bit like gluten: being “gluten intolerant” does not mean that you cannot eat any, it means that you should not. Like for gluten, a CA system should also have a means of recovery should a network partition actually happen. The two-phase commit is a perfect example: it comes with a repair tool to fix the transactions broken by an heuristic resolution.

The fact that CA does not mean “I have a network that cannot be partitioned” is important, because it implies a partition can actually happen. This is stressed by Brewer: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures.” To estimate this probability you must be quite clear about what a partition actually is. This whole post is only about network partitions.

Let’s summarize: CA describes the specification of an operating range, and not a behavior. CP, AP describe the behavior when a partition occurs. This obviously leaves room for an overlap between CP and CA. Let’s look at this overlap now.
The overlap between CP and CA
It’s the point identified by Abadi: “What does “not tolerant” mean? In practice, it means that they lose availability if there is a partition. Hence CP and CA are essentially identical.” A system that does not do anything once partitioned is trivially CP: it does not present a non-consistent history. Such a system could also be considered as CA: it stops working when there is a partition–hence the overlap. This overlap is minimal however:
  • Many CA systems are not CP: for example, the two-phase commit protocol is not consistent (nor available, nor ACID-atomic) when there is a partition.
  • Many CP systems are not CA: for example, a consensus server like ZooKeeper is totally tolerant to partitions.
Systems that belong to these two categories are only systems that stop working during the partition, but are consistent once the partition is fixed (trivially a webserver connected to a database). I personally prefer to call these systems ‘CA’ rather than ‘CP’, even if the CAP theorem allows for both: this expresses that a partition is a severe issue for the system. Ultimately, it’s your choice.
Partitions are a given in the CAP theorem
That’s exactly CAP: if there is a partition, you have to choose between ‘A’ and ‘C’. We have a model that allows partitions, and a theorem that says we have to choose ‘A’ or ‘C’ when there is a partition, so we cannot “refuse to see partitions”.  But actually “forfeiting partitions” is exactly that: it’s removing partitions from the model and building our application on a brand new model without partitions.


From a theoretical point of view, forfeiting partitions means removing them from the model. They will never happen in our theoretical model.
From a practical point of view, forfeiting partitions means removing them from the operating range. They may happen in reality.


By definition a model differs from reality. The question is always: is this model a good representation of reality?

Partitions happen too often in real life to be ignored

Well, here ended the debate between Coda Hale and Michael Stonebraker: Hale saying that there are a lot of partitions in his datacenters, Stonebraker saying that there are problems more probable than partitions that are not fixed anyway, and that surviving partitions will not “move the needle” on availability.
Without data agreed upon, there is no real way out from this debate. The good news is we don’t have to revive it to say that CA can be used to describe a distributed system: a CA system is a system built by someone who thinks he can forfeit partitions.
But the key point of the discussion is the difficulty to reason about failures without describing the system. In the debate above, Hale was speaking about systems of “any interesting scale”, while Stonebraker was considering small systems of high range servers on a LAN (“if you need 200 nodes to support a specific SQL application, then VoltDB can probably do the same application on 4 nodes”). But these two types of distributed systems are totally different animals. When discussing a design remember the old programming rule–“fancy algorithms are slow when n is small, and n is usually small”, and check the value of n.

When to use CA

The branch can be partitioned from the tree, but it
may not be the monkey’s main concern.


Let’s recall what Brewer wrote in 2012: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures, such as disasters or multiple simultaneous faults.”


Eric Brewer detailed in a mail he sent me (quoted here with his permission):
I tend to explain it a few different ways:
1) it is trivial to get CA in a non-distributed system, such as a single node
2) it is also fine to assume CA on a LAN, especially if it is (over) engineered for multiple paths or even for fail stop.  The CM-5 had an over-engineered network that would halt if it detected any errors, but it almost never did (in fact I don’t know of case where it actually stopped, but there probably were some).  The CM-5 case thus really was an operating range argument.
3) If the probability of a partition is lower than other major system failures that would take out an application, then you can claim CA.  For example, you might lose a quorum due to correlated failures (such as power or a disaster), which would also lose availability even though not a partition.  If your network is 5 9s, you can probably ignore the partition case in terms of the code you write (but you should at least detect it!).


“CA should mean that the probability of a partition is far less than that of other systemic failures” says we can call a system CA if the “probability of a partition “ is minimal–the non distributed or over-engineered network case. These systems are often not of “any interesting scale” but that doesn’t mean they don’t have any business value.


There is a more complex case: the probability of “multiple simultaneous faults” depends on many things, including the software itself. Many non-critical software are more likely to get a data corruption from a software bug than from a network partition, just because simple error scenarios like wrong user-inputs are not tested enough. A complicated administration interface is also a common source of downtime. In other words, choosing CA depends on the network quality and the software quality itself.


Network partition tolerance is a feature like any other. It has to be planned, implemented and tested. And, as any feature, the decision to implement it or not must take into account the benefits of the feature compared to its implementation cost. For such a feature it is:


expected number of partitions * cost per partition (unavailability, reputation, repair …)
vs.
cost of supporting partitions (testing effort included).


Even if the ratio is positive, i.e. the system should be partition tolerant, there could be other features that have a better ratio and they will be prioritized. That’s a well known engineering drama: it’s not because a feature is useful and brings value that it’s implemented in the end.


An example of such CA systems would be those GPU-based machine learning systems. The one built by Baidu was “comprised of 36 server nodes, each with 2 six-core Intel Xeon E5-2620 processors. Each server contains 4 Nvidia Tesla K40m GPUs and one FDR InfiniBand (56Gb/s) which is a high-performance low-latency interconnection and supports RDMA. The peak single precision floating point performance of each GPU is 4.29TFlops and each GPU has 12GB of memory.” For such a system, partition tolerance is not an imperious necessity: if a partition occurs the calculation can be restarted from scratch once the partition is fixed. As already stated, this does not mean partition tolerance is not useful. Partition tolerance would be typically useful should the calculation take weeks. But such systems can also exist without partition tolerance.

Conclusion: “the whole space is useful”

Being partition tolerant is comfortable. You have to be Stonebraker to claim partition intolerance. On the other hand, Kyle ‘’Aphyr’ Kingsbury proves regularly with smart but simple tests that many systems used in production are not network partition tolerant.
It’s not really that network partition tolerance can be easily forfeited, especially if the system is of “any interesting scale.” But first it is worth checking the system’s size: is it of “any interesting scale?” Exactly like a system that does not need to be distributed should not be distributed, a distributed system that can be kept small should be kept small.
There is also a catch in how CAP is sometimes (mis)understood: “node failures, processes crashes and network partitions are partitions so you have to be partition tolerant”. This is not only false but also dangerous: it hides the fact that each of these faults could be tackled independently with a specific priority. Before trying to be available during network partition, you should first validate that you don’t lose data with a single process crash. With fault tolerance like with any other problem, decomposing it makes it easier to fix. Network partition is just one type of fault out of many.
So, sometimes using CA just makes sense. As already stated by Eric Brewer: “the whole space is useful.
原文comments:https://aphyr.com/posts/325-comments-on-you-do-it-too