从2013年的millwheel,到2015年的dataflow,再到2016年strata hadoop上的《Watermarks:

Measuring Time and Progress in Streaming Pipelines》,Google连续发表了3篇low watermark相关论文,致力于解决实时计算中的时序性问题。

但何为low watermark,为什么需要它?与其他方式相比有什么优劣?实时计算已出现多年,为什么此时才需要解决时序性?Paper里最关键的地方是什么?本文尝试解答这些问题。

时序性问题

实时计算的数据源一般来自于数据传输系统,从业务系统里收集日志消息,违反时序性的常见场景有:

  1. 从多个机房/地域采集数据,导致数据乱序(局部有序、整体乱序)
  2. 某机器或进程故障,重启后数据大大延迟
  3. 某机房或交换机故障,恢复后数据大大延迟

前一种是常态,后两者可视为故障恢复后的数据回追,个人认为low watermark主要解决第一类问题,但也为第二类问题提供一种可能的解决方案。

那么乱序会带来什么问题呢?Millwheel论文里举了一个典型的例子,即波峰波谷的计算:

In the case of Zeitgeist, shown in Figure 1, our input would be a continuously arriving set of search queries, and our output would be the set of queries that are spiking or dipping.

it is important to be able to distinguish whether a flurry of expected Arabic queries at t = 1296167641 is simply delayed on the wire, or actually not there.

计算搜索词在一个时间窗口(e.g. 1分钟)内的搜索次数,将其波动率与阈值比较甚至策略,以发现异常事件。波峰时由于数据量大,少量数据延迟影响可以忽略。但波谷本身数据量就小,无法判断是真的到达波谷,还仅仅是由于延迟导致数据还在传输途中。

之前的做法是再设置一个安全时间,延迟几分钟再计算,但安全时间如何设置?过长则影响时效性,过短无法奏效。或者采取右端开放窗口的补偿方法,先产出数据,当延迟数据到来时再予以修改,但需要业务场景适用、下游系统支持修改。

除了aggregation计算对时序性有要求外,一些策略计算可能也会要求数据完整、有序。

low watermark原理

The low watermark for a computation provides a bound on the timestamps of future records arriving at that computation.

low watermark是一个时间戳,它承诺未来不会有早于该时间戳的数据到达该计算节点。这里的时间计算一般基于eventtime,即事件发生时间,例如业务系统的log时间,而较少使用数据到达计算节点的时间(processing time,某些场景也可以用)。

Definition: We provide a recursive definition of low watermarks based on a pipeline’s data flow. Given a computation, A, let the oldest work of A be a timestamp corresponding to the oldest unfinished (in-flight, stored, or pending-delivery) record in A. Given this, we define the low watermark of A to be

  • min(oldest work of A, low watermark of C : C outputs to A)
  • If there are no input streams, the low watermark and oldest work values are equivalent.

对于复杂问题,我们往往将其拆分为子问题,这里也一样,将其转化为递归。对于一个最简单的拓扑片段C->A,即A订阅C产生的数据(C和A都是多并发、分布式的),其计算公式如上。需要注意,如果C没有input streams,即importer节点(storm里的spout节点),那么the low watermark and oldest work values are equivalent。

进一步又区分了input和output low watermark:

  • Input Low Watermark: Oldest work not yet sent to this streaming stage.
  • InputLowWatermark(Stage) = min { OutputLowWatermark(Stage’) | Stage’ is upstream of Stage}
  • Output Low Watermark: Oldest work not yet completed by this streaming stage.
  • OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) }

Input low watermark是所有上游节点output low watermark的最小值,对于importer而言就是最小订阅进度。而Output low watermark是对下游的承诺,由自己的input递归传递上游承诺,由自己产出数据的oldest work代表自己的承诺,两者取min。

这里有两个重要的隐藏结论:

  1. output low watermark <= input low watermark,即下游永远不可能比上游快
  2. 当前节点使用input low watermark驱动窗口计算,即input是计算中直接使用的,而output只是递归的需要

关键技术点

递归问题的一个关键是能否找到初始条件,low watermark也一样,If there are no input streams, the low watermark and oldest work values are equivalent. 即importer的订阅进度如何计算。Google论文中笼统指出file和pub-sub两种数据源data time的计算方法,而在我们的场景下,数据一般来自于业务系统的前端机,通过类似kafka的消息传输系统进入实时计算,可以保证来源于一个前端机的数据是有序的,但传输过程中多个前端机数据乱序。

low-watermark-importer

从上图示例可以看出,importer的计算过程包含以下3步:

  1. progress of single web server = max(input data time)
  2. input low watermark of single worker = min(progress of single web server)
  3. input low watermark of importer = min(input low watermark of single worker)

在实际中,由于各importer节点数据互不相干,故若importer中存在窗口,可直接使用step 2的结果驱动。另,示例中假设importer无窗口计算,故output与input low watermark相等。

low watermark的工程实现

Millwheel论文中采取了集中式,而Flink据说是采用inject的方式。而我们的小批量实时系统,本身就是基于query and own的task管理模型,故也采取了集中计算的方式。

抽象了两类包含data time的task,即data task和state task,前者代表该task批次里的oldest work value,后者存储当前计算节点的input low watermark。TaskManager中心节点维持了上下游订阅关系 -> datatime -> count的计数器,从而将多并发计算节点的时间戳汇聚为层级算子的时间戳。

下游节点向TaskManager请求上游算子的两类时间戳,根据公式计算input low watermark并驱动窗口滑动。

计算的时机是:

  1. 在本轮数据处理前,计算input low watermark
  2. 在本轮数据处理结束时,计算oldest work存入data task,并保存input low watermark至state task

这里需要注意的是,虽然OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) },但我们没有直接存储上游节点的output low watermark,而是将input和oldest值分别存储,原因是:

  1. 当前节点的output low watermark在下游节点计算input low watermark时才会被用到,即其产生和使用间有一个时间差
  2. 而当前节点的input low watermark在output low watermark产生后还可以独立前移,分开存储能够尽早推动下游 input low watermark的前移(这里比较绕,需要细细琢磨,在分布式场景下考虑就是理所应当的了)

综上所述,记录了我对论文的学习以及实践的一些心得,但由于我们的low watermark尚未经历大规模应用的检验,可能存在疏漏,还望指正。

最后猜测下为什么Google选择在这个时间段发表low watermark。早年的实时计算多追求低延迟、大吞吐、不重不丢,而近些年这方面的表现已较为优秀,所以才进一步追求更细致的功能。另外,早年的实时场景多为辅助型的,近些年已逐渐走入核心领域,故也会提升对业务准确性的要求。

不懂硬件,今天经历了raid卡对性能的影响,特记录如下。

一台需要做journal的中控服务器,在压力相当的情况下,比其他服务器的写延迟大很多。

top查看每个cpu的数据,发现部分cpu的wa时间非常大(90%)。iostat -dx 3,发现write rqm虽然也持续较大(经常5、6k,甚至上万),写入流量几十MB/s,但对比相似机型,iowait也不会达到如此高。虽然我们的场景是每条journal(KB-百KB)都需要fsync到磁盘,但也不应该性能这么差。

继续安装iotop命令(需翻墙),sudo执行,发现写入操作都集中在journal threads上(TID是thread id,可以gstack 查看具体是什么线程)。B519E22D-58CD-4E92-8227-89C0BD429C44

 Bash |  copy code |? 
1
Thread 5 (Thread 140711690905952 (LWP 15782)):
2
#0  0x0000003f0b90b8fb in __fsync_nocancel () from /lib64/tls/libpthread.so.0
3
#1  0x0000000000573d5f in sp::tm::FileJournalWriter::write ()
4
……
5
#8  0x0000000000789bd8 in thread_proxy ()
6
#9  0x0000003f0b90610a in start_thread () from /lib64/tls/libpthread.so.0
7
#10 0x0000003f0b0c5ee3 in clone () from /lib64/tls/libc.so.6
8
#11 0x0000000000000000 in ?? ()

至此怀疑并不是软件问题。

联系op同学检查硬件,最终发现raid卡的write cache出现问题:

31a628a85965f2359a37ca8cc

作为缓存,raid cache的作用具体体现在读与写两个不同的方面:

  • 作为写,一般存储阵列只要求数据写到cache就算完成了写操作,当写cache的数据积累到一定程度,阵列才把数据刷到磁盘,可以实现批量的写入。所以,阵列的写是非常快速的。至于cache数据的保护,一般都依赖于镜相与电池(或者是UPS)。
  • cache在读数据方面的作用一样不可忽视,因为如果所需要读取的数据能在cache中命中的话,将大大减少磁盘寻道所需要的时间

而我们的服务器在raid故障时,还需要穿透raid做大量write和fsync,将压力真正压到磁盘上,就必然导致io性能的急剧下降了。这种情况只有更换raid卡。

我们靠谱op的解释:

RAID卡里本身有一个cache 数据写到RAID卡的cache里,就返回了,表现出来的性能要比数据直接落盘好。但是为了保证数据掉电不丢失,RAID卡需要有个电池,在掉电后通过电池把数据刷到磁盘里,保证数据可靠性。由于故障机器的RAID卡电池坏了,掉电后无法回刷数据,为了保证数据可靠性,不能通过cache缓存数据,所有的写操作必须直接落到磁盘上,写性能就会变差。

参考资料:
http://xxrenzhe.blog.51cto.com/4036116/1312510

zz from:https://blog.twitter.com/2016/open-sourcing-twitter-heron

Last year we announced the introduction of our new distributed stream computation system, Heron. Today we are excited to announce that we are open sourcing Heron under the permissive Apache v2.0 license. Heron is a proven, production-ready, real-time stream processing engine, which has been powering all of Twitter’s real-time analytics for over two years. Prior to Heron, we used Apache Storm, which we open sourced in 2011. Heron features a wide array of architectural improvements and is backward compatible with the Storm ecosystem for seamless adoption.

Everything that happens in the world happens on Twitter. That generates a huge volume of information in the form of billions of live Tweets and engagements. We need to process this constant stream of data in real-time to capture trending topics and conversations and provide better relevance to our users. This requires a streaming system that continuously examines the data in motion and computes analytics in real-time.

Heron is a streaming system that was born out of the challenges we faced due to increases in volume and diversity of data being processed, as well as the number of use cases for real-time analytics. We needed a system that scaled better, was easier to debug, had better performance, was easier to deploy and manage, and worked in a shared multi-tenant cluster environment.

To address these requirements, we weighed the options of whether to extend Storm, switch to another platform, or to develop a new system. Extending Storm would have required extensive redesign and rewrite of its core components. The next option we considered was using an existing open-source solution. However, there are a number of issues with respect to making several open systems work in their current form at our scale. In addition, these systems are not compatible with Storm’s API. Rewriting the existing topologies with a different API would have been time consuming, requiring our internal customers to go through a very long migration process. Furthermore, there are different libraries that have been developed on top of the Storm API, such as Summingbird. If we changed the underlying API of the streaming platform, we would have had to rewrite other higher-level components of our stack.

We concluded that our best option was to rewrite the system from the ground-up, reusing and building upon some of the existing components within Twitter.

Enter Heron.

Heron represents a fundamental change in streaming architecture from a thread-based system to a process-based system. It is written in industry-standard languages (Java/C++/Python) for efficiency, maintainability, and easier community adoption. Heron is also designed for deployment in modern cluster environments by integrating with powerful open source schedulers, such as Apache Mesos, Apache Aurora, Apache REEF, Slurm.

One of our primary requirements for Heron was ease of debugging and profiling. Heron addresses this by running each task in a process of its own, resulting in increased developer productivity as developers are able to quickly identify errors, profile tasks, and isolate performance issues.

To process large amounts of data in real-time, we designed Heron for high scale, as topologies can run on several hundred machines. At such a scale, optimal resource utilization is critical. We’ve seen 2-5x better efficiency with Heron, which has saved us significant OPEX and CAPEX costs. This level of efficiency was made possible by both the custom IPC layer and the simplification of the computational components’ architecture.

Running at Twitter-scale is not just about speed, it’s also about ease of deployment and management. Heron is designed as a library to simplify deployment. Furthermore, by integrating with off-the-shelf schedulers, Heron topologies safely run alongside critical services in a shared cluster, thereby simplifying management. Heron has proved to be reliable and easy to support, resulting in an order of magnitude reduction of incidents.

We built Heron on the basis of valuable knowledge garnered from our years of experience running Storm at Twitter. We are open sourcing Heron because we would like to share our insights and knowledge and continue to learn from and collaborate with the real-time streaming community.

Our early partners include both Fortune 500 companies, including Microsoft, and startups who are already using Heron for an expanding set of real-time use cases, including ETL, model enhancement, anomaly/fraud detection, IoT/IoE applications, embedded systems, VR/AR, advertisement bidding, financial, security, and social media.

“Heron enables organizations to deploy a unique real-time solution proven for the scale and reach of Twitter,” says Raghu Ramakrishnan, Chief Technology Officer (CTO) for the Data Group at Microsoft. “In working with Twitter, we are contributing an implementation of Heron that could be deployed on Apache Hadoop clusters running YARN and thereby opening up this technology to the entire big data ecosystem.”

We are currently considering moving Heron to an independent open source foundation. If you want to join this discussion, see this issue on GitHub. To join the Heron community, we recommend getting started at heronstreaming.io, joining the discussion on Twitter at @heronstreaming and viewing the source on GitHub.

Acknowledgements

Large projects like Heron would not have been possible without the help of many people.

Thanks to: Maosong Fu, Vikas R. Kedigehalli, Sailesh Mittal,Bill Graham, Neng Lu, Jingwei Wu, Christopher Kellogg, Andrew Jorgensen, Brian Hatfield, Michael Barry, Zhilan Zweiger, Luc Perkins, Sanjeev Kulkarni, Siddharth Taneja, Nikunj Bhagat, Mengdie Hu, Lawrence Yuan, Zuyu Zhang, and Jignesh Patel who worked on architecting, developing, and productionizing Heron.

Thanks to the open source and legal teams: Sasa Gargenta, Douglas Hudson, Chris Aniszczyk.

Thanks to early testers who gave us valuable feedback on deployment and documentation.

References

[1] Twitter Heron: Streaming at Scale, Proceedings of ACM SIGMOD Conference, Melbourne, Australia, June 2015.

[2] Storm@Twitter, Proceedings of ACM SIGMOD Conference, Snowbird, Utah, June 2014.

Tags
analytics, announcements, and open source

本文尝试用spark自带example streaming.NetworkWordCount为例,解释spark streaming 1.6.0的执行流程。

例子代码如下:

 Scala |  copy code |? 
01
    // Create the context with a 1 second batch size
02
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
03
    val ssc = new StreamingContext(sparkConf, Minutes(1))
04
 
05
    // Create a socket stream on target ip:port and count the
06
    // words in input stream of \n delimited text (eg. generated by 'nc')
07
    // Note that no duplication in storage level only for running locally.
08
    // Replication necessary in distributed scenario for fault tolerance.
09
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
10
    val words = lines.flatMap(_.split(" "))
11
    val wordCounts = words.map(=&gt; (x, 1)).reduceByKey(_ + _)
12
    wordCounts.print()
13
    ssc.start()
14
    ssc.awaitTermination()

Streaming的核心是DStream类,即discretized streams。以上的socketTextStream、flatMap、map、reduceByKey、print方法分别生成了5个子类实例:SocketInputDStream<-FlatMappedDStream<-MappedDStream<-ShuffledDStream<-ForEachDStream,前面是后面的parent,可以类比RDD的依赖关系。另外,首尾分别是input和output stream。

但截至到wordCounts.print(),spark集群并没有任何实质性的操作呢,直到ssc.start(),它会触发一系列的初始化,其中包括input stream的数据输入,并使用类似RDD的方式,以outputStream为入口依次触发parent的执行。将分如下几个部分详细说明:

  1. start触发的初始化过程,即input stream任务如何提交至executors?
  2. input stream blocks的产生过程,即数据是如何被读入和缓存的?
  3. input stream如何触发后续的transform、output操作的?

Start触发的初始化过程

SparkStreaming

 

当运行在集群模式下时,driver仅仅初始化job,input的真正读取者是worker节点。如上所示,StreamingContext.start()启动streaming-start子线程,触发JobScheduler.start(),进而触发两个关键实例的start方法:ReceiverTracker.start()和JobGenerator.start()。前者会生成并异步提交input Receiver job,后者以定时器监听duration超时、生成的后续处理任务。先仅关注前者。

核心方法是ReceiverTrackerEndpoint.startReceiver(receiver, scheduledLocations)方法,它调用ssc.sparkContext.submitJob提交Job,传入的RDD是仅包含receiver信息的单元素RDD,processPartition回调方法如下,在worker端取出receiver信息,并初始化ReceiverSupervisorImpl对象,开始读取数据。

 Scala |  copy code |? 
01
      // Function to start the receiver on the worker node
02
      val startReceiverFunc: Iterator[Receiver[_]] =&gt; Unit =
03
        (iterator: Iterator[Receiver[_]]) =&gt; {
04
          if (!iterator.hasNext) {
05
            throw new SparkException(
06
              "Could not start receiver as object not found.")
07
          }
08
          // flykobe: worker拿到RDD,里面仅包含一个元素,就是receiver
09
          // 包装并且启动数据接收
10
          if (TaskContext.get().attemptNumber() == 0) {
11
            val receiver = iterator.next()
12
            assert(iterator.hasNext == false)
13
            val supervisor = new ReceiverSupervisorImpl(
14
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
15
            supervisor.start()
16
            supervisor.awaitTermination()
17
          } else {
18
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
19
          }
20
        }

注意,一个app可以有多个input streams,所以可以在该阶段触发生成多个Jobs(如何调度这些Jobs,就是后面JobScheduler等的事情了)。

Input stream blocks的产生过程

Spark Streaming的处理模型是小批量、伪实时,所以需要对读入的数据进行缓存,每隔一段时间生成一个blocks,再每隔duration时间,由这些blocks组成RDDs。

SparkStreaming-1

 

有两种方式blocks,一种是Receiver单条写入,框架处理底层细节,例如SocketInputDStream、KafkaInputDStream;另一种是Receiver直接形成block,自行控制缓存等。

以SocketInputDStream为例,它的receiver阻塞读取socket数据,每接收到一行,就调用Receiver.store()方法进行存储,进而触发上图左边的逻辑。

首先将数据暂存在BlockGenerator.currentBuffer里,BlockGenerator定时器子线程每隔spark.streaming.blockInterval时间,就调用updateCurrentBuffer()将buffer里的数据形成block,并put进blocksForPushing队列。以上可视为queue的生产过程。

BlockGenerator还有一个blockPushingThread子线程,使用keepPushingBlocks()方法阻塞监听该queue,一旦有block产生,就调用ReceiverSupervisorImpl.pushAndReportBlock()方法,按照storage level交由BlockManager.doPut存储在内存 and/or 磁盘上,在此期间可能会写WAL日志。随后通过AKKA向master的ReceiverTracker发送AddBlock消息,由ReceivedBlockTracker.addBlock()方法建立block与input streaming的关联关系。

这里需要注意的是,block的产生间隔是由spark.streaming.blockInterval控制的,默认是200ms,而非duration参数。

完成以上步骤后,worker节点里保存了blocks的数据,而master节点存储了blocks列表。

Input stream如何触发后续操作

示例里没有window操作,处理间隔仅有duration控制,相对简单。

SparkStreaming-2

 

前面提到JobGenerator的定时器线程,它每隔ssc.graph.batchDuration.milliseconds执行一次,简单生成GenerateJobs消息,由JobGenerator EventLoop子线程处理,触发JobGenerator.generateJobs()。

它首先调用ReceivedBlockTracker.allocateBlocksToBatch()方法,按需写入master端的WAL日志,并组装起(time => (streamID=>blocks, …) ),保存到实例属性timeToAllocatedBlocks HashMap里。

随后进入关键步骤,由于一个app可以有多个output streams,故调用DStreamGraph.generateJobs()依次触发。以这儿的ForEachDStream为例,它的generateJob方法触发生成parent RDD,继续触发parent.compute()方法,最终导致SocketInputDStream按需将duration里的blocks形成RDDs:

 Scala |  copy code |? 
01
 /**
02
   * Generates RDDs with blocks received by the receiver of this stream. */
03
  override def compute(validTime: Time): Option[RDD[T]] = {
04
    val blockRDD = {
05
 
06
      if (validTime &lt; graph.startTime) {
07
        // If this is called for any time before the start time of the context,
08
        // then this returns an empty RDD. This may happen when recovering from a
09
        // driver failure without any write ahead log to recover pre−failure data.
10
        new BlockRDD[T](ssc.sc, Array.empty)
11
      } else {
12
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
13
        // for this batch
14
        val receiverTracker = ssc.scheduler.receiverTracker
15
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
16
 
17
        // Register the input blocks information into InputInfoTracker
18
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
19
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
20
 
21
        // Create the BlockRDD
22
        createBlockRDD(validTime, blockInfos)
23
      }
24
    }
25
    Some(blockRDD)
26
  }

截止到这儿,已经把blocks关联到RDD了。随后调用JobScheduler.submitJobSet()异步提交Job即可。

以上大体描述了spark streaming的job触发过程,但如何对input replication、怎么触发checkpoint、WAL日志如何使用等尚未涉及。

上一篇文章从pushdown filter的角度分析了spark与parquet的结合方式,由于其中涉及不少类和数据结构,补充如下。而Dremel论文里已详细解释了repetitionLevel、definitionLevel和value SerDe协议,不再赘述。下面主要从代码实现角度。

ParquetRecordPushDownFilter

在parquet的加载过程中,维持两颗树,一颗是与requested schema对应的fields tree,另一颗是与filters对应的predicate tree。其加载过程就是不断从解压后的二级制流里获取数据,将其对应到上面两颗树的过程。

predicate tree比较简单,如上图中predicate示例,在逻辑上由filterPredicate树和其叶子节点组成的valueInspector数组构成。每读入一个primitive value(可以看到pushdown filter不支持array、map、struct操作),就获取其对应的inspectors,依次计算boolean值存储起来,如下图所示。

在一个record读取完成后,由FilteringRecordMaterializer.getCurrentRecord()触发,递归计算predicate tree的最终结果,若未通过则抛弃缓存中的record值。

在叶子节点的读取过程中,同时也会更新fields tree对应的数据缓存,如下图,针对每个record维持了一个SpecificMutableRow 。这时有两种叶子节点,一种tree level = 1是root下直接的field,如下图中的short节点;另一种tree level > 1,是nested的子节点,如下图array里下的primitive值。对于前者直接在SpecificMutableRow下有对应槽位,而后者又有一个临时的ArrayBuf缓存,仅当该field递归读完后,由end()触发,才会set回SpecificMutableRow的槽位里。

读取完record并且通过filters之后,由CatalystRecordMaterializer.getCurrentRecord()触发将缓存中的数据,装配为spark所需要的row对象,这里涉及codegen过程,虽然有code缓存,但应该仍然是挺耗时的操作。

ParquetRecordPushDownFilter-datastruct

Pushdown在传统关系型数据库里,是一种常用的查询优化手段,在分布式计算领域,由于数据跨节点传输的代价更大,故pushdown也变得尤为重要。可能由于该手段极为常见,反而找不到对其的准确定义。如果将计算过程视为一颗树,则可以简单理解为尽量将计算下沉至叶子节点,尽早过滤掉不需要的数据,从而减少上层计算的数据量。

本文尝试解读基于Parquet存储的SparkSQL pushdown实现,spark版本1.6.0,parquet版本1.7.0。非分布式计算科班出身,更多是个人思考,谬误处请指正。

面对一个查询请求,如何实现pushdown优化呢?可粗略分为两个阶段,首先找出可以pushdown的操作组合,随后在数据加载阶段予以执行。前者属于执行计划范畴,后者可纳入底层数据结构的实现。

Pushdown之执行计划

SparkSQL的输入可以是DataFrame的算子组合,也可以是sql语句,本质上其解析过程是一样的,即语法、语义的解析(Parser),优化(Optimizer),生成逻辑和物理执行计划等,串联过程在sparkexecution.QueryExecution类里。

考虑df.select(a, b, c).filter(by a).filter(by b).select(c).filter(by c)这样的查询,在optimizer阶段,需要合并多个filters(CombineFilters),并调整算子间的顺序,例如将部分filter移到select等前面(PushPredicateThroughAggregate/Generate/Join/Project)。

以较为简单的PushPredicateThroughProjection为例
优化前,sparkPlan chain是:Filter(condition, … ) ->Projection(fields, …) -> … (注:右侧plan更早,是左侧的child)。
实现为:
  1. 将projection的fields转化为map(attribute => child), e.g., ‘SELECT a + b AS c, d …’ produces Map(c -> a + b).
  2. 将filter的condition分为deterministic和nondeterministic
    1. 如果nondeterministic为空,代表所有filter condition都是确定的,可以全部pushdown,转化后为:Projection(fields, …) -> Filter(将condition里出现的alias转化为实际child, …) -> …
    2. 如果deterministic为空,代表所有都无法pushdown,直接返回,chain不变
    3. 如果两者都非空,则仅pushdown deterministic condition,转化后为:Filter(nondeterministic, … ) -> Project(fields, …) -> Filter(pushedCondition, …) -> …

对于更为复杂的查询,可参看下面带pushdown字眼的rule实现。但总而言之,在optimizer完成后,可能会产生与输入不一样的filters(可能是多个,不一定连续),并且尽量前置

 Scala |  copy code |? 
01
object DefaultOptimizer extends Optimizer {
02
  val batches =
03
    // SubQueries are only needed for analysis and can be removed before execution.
04
    Batch("Remove SubQueries", FixedPoint(100),
05
      EliminateSubQueries) ::
06
    Batch("Aggregate", FixedPoint(100),
07
      ReplaceDistinctWithAggregate,
08
      RemoveLiteralFromGroupExpressions) ::
09
    Batch("Operator Optimizations", FixedPoint(100),
10
      // Operator push down
11
      SetOperationPushDown,
12
      SamplePushDown,
13
      PushPredicateThroughJoin,
14
      PushPredicateThroughProject,
15
      PushPredicateThroughGenerate,
16
      PushPredicateThroughAggregate,
17
      ColumnPruning,
18
      // Operator combine
19
      ProjectCollapsing,
20
      CombineFilters,
21
      CombineLimits,
22
      // Constant folding
23
      NullPropagation,
24
      OptimizeIn,
25
      ConstantFolding,
26
      LikeSimplification,
27
      BooleanSimplification,
28
      RemoveDispensableExpressions,
29
      SimplifyFilters,
30
      SimplifyCasts,
31
      SimplifyCaseConversionExpressions) ::
32
    Batch("Decimal Optimizations", FixedPoint(100),
33
      DecimalAggregates) ::
34
    Batch("LocalRelation", FixedPoint(100),
35
      ConvertToLocalRelation) :: Nil
36
}

由于pushdown最终需要在数据源上执行,所以在SparkPlanner的rule DataSourceStrategy里需要找到plan chain中所有可以deterministic的filters,并传递给Parquet使用,如下代码所示。

需要说明的是PhysicalOperation,它利用scala的unapply解包方法,调用collectProjectsAndFilters()将plan chain解析为(Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression])四元组,这里我们主要关注第二个元素Seq[Expression],它是所有filters的sequence。

从代码可以见,由于涉及partition keys的filters可以通过忽略不相关分片更高效的过滤,所以pushedFilters仅包括不涉及partition keys的filters。

 Scala |  copy code |? 
01
02
    // Scanning partitioned HadoopFsRelation
03
    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
04
        if t.partitionSpec.partitionColumns.nonEmpty =&amp;amp;gt;
05
      // We divide the filter expressions into 3 parts
06
      val partitionColumns = AttributeSet(
07
        t.partitionColumns.map(=&amp;amp;gt; l.output.find(_.name == c.name).get))
08
 
09
      // Only pruning the partition keys
10
      val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns))
11
 
12
      // Only pushes down predicates that do not reference partition keys.
13
      val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
14
 
15
      // Predicates with both partition keys and attributes
16
      val combineFilters = filters.toSet −− partitionFilters.toSet −− pushedFilters.toSet
17
 
18
      val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
19
 
20
      logInfo {
21
        val total = t.partitionSpec.partitions.length
22
        val selected = selectedPartitions.length
23
        val percentPruned = (1 − selected.toDouble / total.toDouble) * 100
24
        s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
25
      }
26
 
27
      val scan = buildPartitionedTableScan(
28
        l,
29
        projects,
30
        pushedFilters,
31
        t.partitionSpec.partitionColumns,
32
        selectedPartitions)
33
 
34
      combineFilters
35
        .reduceLeftOption(expressions.And)
36
        .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
37

需要注意,在scan执行后,还需要执行combineFilters,它由既包含partition keys、又包含非partition keys的filters组成。

上面仅是找到所有filters的序列,但尚未判断哪些可以被pushdown,这是通过buildPartitionedTableScan触发判断的,调用链如下:

  1. 以上获取到备选pushedFilters变量
  2. 调用buildPartitionedTableScan方法,传递pushedFilters作为filters参数
    1. 生成scanBuild(requiredColumns: Seq[Attribute], filters: Array[Filter])方法,注意此filters非彼filters
    2. 调用pruneFilterProject方法,继续传递pushedFilters作为filterPredicates参数
      1. 调用pruneFilterProjectRaw方法,对scanBuild进行wrap,并继续传递pushedFilters作为filterPredicates参数
        1. 调用selectFilters方法,传入翻译为底层field names的pushedFilters作为predicates参数,返回值也叫pushedFilters,为了区分,记为realPushedFilters
          1. 遍历pushedFilters,依次调用translateFilter方法,将支持pushdown的catalyst predicate操作转变为data source的filter操作(支持=,>,<,>=,<=,and,or,not,字符串startsWith、endsWith、contain等)
        2. realPushedFilters传递给scanBuild方法,作为filter参数

以上的realPushedFilters才是spark认为可以被pushdown的filters组合。在scanBuild方法里,调用ParquetRelation.buildInternalScan方法,将realPushedFilters用FilterApi.And方法连接起来、组成一个Filter对象,再调用ParquetInputFormat.setFilterPredicate()设置为parquet.private.read.filter.predicate,从而使后面的parquet解析过程可以真正执行push down filter操作(注:Parquet pushDown不支持字符串操作,所以相关filter在这一步会被忽略)。

注意,传递给parquet的pushdown filters都是org.apache.parquet.filter2.predicate下的FilterPredicate子类对象。

Pushdown之底层执行

push down filters在Parquet里有两个应用场景:基于statistic数据的row group filter,以及record filter。前者可跳过大量数据,效率高;后者实际是在解压缩、反序列化之后进行的,仅能降低数据加载阶段产出的数据量。

RowGroupFilter

数据加载阶段task的执行由SqlNewHadoopRDD.compute()触发,针对Parquet文件,生成ParquetRecordReader对象(spark 1.6里针对flat schema有其他优化),并调用其initialize方法。在开启taskSideMeta时,在execution端执行row group filter操作(taskSideMeta=false的情况参考BLOG)。

 Scala |  copy code |? 
1
// if task.side.metadata is set, rowGroupOffsets is null
2
    if (rowGroupOffsets == null) {
3
      // then we need to apply the predicate push down filter
4
      footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
5
      MessageType fileSchema = footer.getFileMetaData().getSchema();
6
      Filter filter = getFilter(configuration);
7
      filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
8
   }

通过getFilter方法可将parquet.private.read.filter.predicate配置的pushDownFilters解析出来,并包装为FilterPredicateCompat对象,供filterRowGroups使用。后者使用Visitor模式,触发RowGroupFilter.visit方法,调用StatisticsFilter.canDrop()针对每个parquet block(即rowGroup)依次执行predicate操作。

如下是RowGroupFilter.visit代码,仅通过rowGroupFilters的blocks才有机会被解压、反序列化、继续操作:

 Scala |  copy code |? 
01
 
02
  @Override
03
  public List&amp;amp;lt;BlockMetaData&amp;amp;gt; visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
04
    FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
05
 
06
    // check that the schema of the filter matches the schema of the file
07
    SchemaCompatibilityValidator.validate(filterPredicate, schema);
08
 
09
    List&amp;amp;lt;BlockMetaData&amp;amp;gt; filteredBlocks = new ArrayList&amp;amp;lt;BlockMetaData&amp;amp;gt;();
10
 
11
    for (BlockMetaData block : blocks) {
12
      if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
13
        filteredBlocks.add(block);
14
      }
15
    }
16
 
17
    return filteredBlocks;
18
  }
19

StatisticsFilter.canDrop也是用的Visitor模式,触发StatisticsFilter.visit()重载方法,以最简单的equal过滤为例,可见仅当Parquet文件包含statistic信息时才有效。

 Scala |  copy code |? 
01
02
@Override
03
  public &amp;amp;lt;extends Comparable&amp;amp;lt;T&amp;amp;gt;&amp;amp;gt; Boolean visit(Eq&amp;amp;lt;T&amp;amp;gt; eq) {
04
    Column&amp;amp;lt;T&amp;amp;gt; filterColumn = eq.getColumn();
05
    T value = eq.getValue();
06
    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
07
    Statistics&amp;amp;lt;T&amp;amp;gt; stats = columnChunk.getStatistics();
08
 
09
    if (stats.isEmpty()) {
10
      // we have no statistics available, we cannot drop any chunks
11
      return false;
12
    }
13
 
14
    if (value == null) {
15
      // we are looking for records where v eq(null)
16
      // so drop if there are no nulls in this chunk
17
      return !hasNulls(columnChunk);
18
    }
19
 
20
    if (isAllNulls(columnChunk)) {
21
      // we are looking for records where v eq(someNonNull)
22
      // and this is a column of all nulls, so drop it
23
      return true;
24
    }
25
 
26
    // drop if value &amp;amp;lt; min || value &amp;amp;gt; max
27
    return value.compareTo(stats.genericGetMin()) &amp;amp;lt; 0 || value.compareTo(stats.genericGetMax()) &amp;amp;gt; 0;
28
  }
29

Record Filter

通过RowGroupFilter的parquet blocks,通过InternalParquetRecordReader进行读取,涉及到的核心类如下:

ParquetRecordFilter

Parquet代码里,大量使用Visitor模式,下面需要注意。

在MessageColumnIO.getRecordReader()里,构造了三个关键对象:

  1. FilteringRecordMaterializer,在一个record读取完毕后,会调用它的getCurrentRecord方法,检查predicate是否通过。该类有一个关键属性FilteringGroupConverter对象
  2. ColumnReadStoreImpl,实际读取column的入口类,并且将FilteringGroupConverter对象传递给它
  3. RecordReaderImplementation,它的read方法触发底层读取、解压、反序列化等操作,并在一个record完成后,触发FilteringRecordMaterializer.getCurrentRecord方法

另外,RecordReaderImplementation在初始化时,会针对每一个叶子节点(即存储primitive值的地方)生成对应的ColumnReaderImpl对象,后者包含FilteringPrimitiveConverter属性,是对读入的每个value执行predicate的对象,基于ColumnReadStoreImpl.FilteringGroupConverter获得。

在RecordReaderImplementation.read()里,每读入一个r、d、value tuple,就会调用ColumnReaderImpl.writeCurrentValueToConverter(),进而触发调用对属性binding的writeValue方法的调用,根据value的类型,触发FilteringPrimitiveConverter对应addXXX方法。以boolean类型为例,会执行所有valueInspectors即predicate操作:

 Scala |  copy code |? 
1
  @Override
2
  public void addBoolean(boolean value) {
3
    for (ValueInspector valueInspector : valueInspectors) {
4
      valueInspector.update(value);
5
    }
6
    delegate.addBoolean(value);
7
  }

valueInspectors是由IncrementallyUpdatedFilterPredicateBuilder生成的,其update方法会父类ValueInspector的setResult方法,设置当前predicate对象的result和isKnown两个boolean标记位。

 Scala |  copy code |? 
01
    if (clazz.equals(Boolean.class)) {                                                                      
02
      if (pred.getValue() == null) {
03
        valueInspector = new ValueInspector() {                                                             
04
          @Override
05
          public void updateNull() {                                                                        
06
            setResult(true);                                                                                
07
          }                                                                                                 
08
          
09
          @Override
10
          public void update(boolean value) {                                                               
11
            setResult(false);                                                                               
12
          }                                                                                                 
13
        };
14
      } else {
15
        final boolean target = (Boolean) (Object) pred.getValue();                                          
16
        
17
        valueInspector = new ValueInspector() {                                                             
18
          @Override
19
          public void updateNull() {                                                                        
20
            setResult(false);                                                                               
21
          }                                                                                                 
22
          
23
          @Override
24
          public void update(boolean value) {                                                               
25
            setResult(value == target);                                                                     
26
          }                                                                                                 
27
        };                                                                                                  
28
      }                                                                                                     
29
    }

RecordReaderImplementation.read()里,当完整读完一个record后,调用FilteringRecordMaterializer.getCurrentRecord方法,判断是否通过了所有的pushdown filters,若未通过则read方法返回null。具体而言,最终判断predicate是否通过,是要把inspection数组里所有result结果,通过and、or重新串联起来的,流程如下:

  1. RecordReaderImplementation.read()在完整读取一个record后,调用FilteringRecordMaterializer.getCurrentRecord()
    1. 调用IncrementallyUpdatedFilterPredicateEvaluator.evaluate(predicate根节点)
      1. Visitor模式调用predicate.accept(instance),instance完全是为了使用visitor模式构造出来的空IncrementallyUpdatedFilterPredicateEvaluator对象
        1. 调用IncrementallyUpdatedFilterPredicateEvaluator.visit()重载方法,可接受IncrementallyUpdatedFilterPredicate.And/Or或ValueInspector对象作为参数
        2. 如果是And、Or,则利用left、right IncrementallyUpdatedFilterPredicateEvaluator对象生成结果
        3. 如果是ValueInspector对象,则返回之前生成的result bool值

至此,Parquet完成了record级别的pushdown。

虽然Parquet等列式存储号称可提升性能N倍,但在实际中,正如新近发布的Apache Arrow指出的,其本身的序列化、反序列化耗时占比可达70%。Spark 1.6中,针对flat schema进行了一些优化,本文尝试源码级别的解读。

在SqlNewHadoopRDD类中,若开启了spark.sql.parquet.enableUnsafeRowRecordReader、数据源是parquet格式,且tryInitialize成功的话,就会采用UnsafeRowParquetRecordReader替代之前的parquet-hadoop jar提供的ParquetRecordReader类,作为parquet文件解析的入口。

在tryInitialize中,首先调用parent方法,处理了task.side.metadata不同设置时,对footer meta的处理。随后才是检查当前schema是否满足特殊处理的要求:

  • 不可包含非primitive类型 或 repeated类型(例如 struct、array、map)
  • 由于parquet里有type和original type的概念,需要再对original检查,original仅为null、decimal、utf8或date
  • 不支持original type为decimal,且精度超过18的数字
  • 不支持int96 type
  • 不支持schema演进

接下来loadBatch方法就很关键了。

首先调用ParquetFileReader.readNextRowGroup()读入未解压、未序列化的数据,会一次性读入一个row group里涉及到的所有columns,这也是压缩和序列化的单位。

随后,loadBatch进行批量加载。与ParquetRecordReader相比,它最大的区别之一应该在于批量反序列化。可以想象,此时内存中有一个row group的n个columns数据,针对每个column一次性解压缩后,每轮最多反序列化64个values存放到缓存里。

 Scala |  copy code |? 
01
  private boolean loadBatch() throws IOException {
02
    // no more records left
03
    if (rowsReturned &gt;= totalRowCount) { return false; }
04
    checkEndOfRowGroup();
05
 
06
    int num = (int)Math.min(rows.length, totalCountLoadedSoFar − rowsReturned);
07
    rowsReturned += num;
08
 
09
    if (containsVarLenFields) {
10
      for (int i = 0; i &lt; rowWriters.length; ++i) {
11
        rowWriters[i].holder().resetTo(fixedSizeBytes);
12
      }
13
    }
14
 
15
    for (int i = 0; i &lt; columnReaders.length; ++i) {
16
      switch (columnReaders[i].descriptor.getType()) {
17
        case BOOLEAN:
18
          decodeBooleanBatch(i, num);
19
          break;
20
        case INT32:
21
          if (originalTypes[i] == OriginalType.DECIMAL) {
22
            decodeIntAsDecimalBatch(i, num);
23
          } else {
24
            decodeIntBatch(i, num);
25
          }
26
          break;
27
        case INT64:
28
          Preconditions.checkState(originalTypes[i] == null
29
              || originalTypes[i] == OriginalType.DECIMAL,
30
              "Unexpected original type: " + originalTypes[i]);
31
          decodeLongBatch(i, num);
32
          break;
33
        case FLOAT:
34
          decodeFloatBatch(i, num);
35
          break;
36
        case DOUBLE:
37
          decodeDoubleBatch(i, num);
38
          break;
39
        case BINARY:
40
          decodeBinaryBatch(i, num);
41
          break;
42
        case FIXED_LEN_BYTE_ARRAY:
43
          Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL,
44
              "Unexpected original type: " + originalTypes[i]);
45
          decodeFixedLenArrayAsDecimalBatch(i, num);
46
          break;
47
        case INT96:
48
          throw new IOException("Unsupported " + columnReaders[i].descriptor.getType());
49
      }
50
      numBatched = num;
51
      batchIdx = 0;
52
    }
53
    return true;
54
  }

另一个重要区别是,这里使用unsafeRow,backed by raw memory instead of Java objects。每行包含numFields个tuples,一个tuple对应parquet的一列,每个tuple又由3部分组成:

[null bit set] [values] [variable length portion]

The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.以bit位记录列的null情况,8字节对齐。

In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length primitive types, such as long, double, or int, we store the value directly in the word. For fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the base address of the row) that points to the beginning of the variable-length field, and length (they are combined into a long).

values部分则采用了经典的变长存储方法。对于基本数字类型,直接存储在value字段里;其他变长类型,则在value里存储地址偏移量,而在第三部分variable length portion里存储真正的数据。

通过这种unsafe的方式,一方面可以减少GC的消耗。另一方面,也减少了java对象wrap的冗余存储,由于parquet flat格式里,有很多small size数据,这方面的优化对应内存用量应该也有比较大的效果。

最后结合UnsafeRow的get方法,看下内存复用程度。由于我不是Java出身,措辞可能不专业,如果有更好的解释方式,请留言。

针对基本数据类型,例如int直接返回变量本身(因为基本类型本身的内存是不可复用的):

 Scala |  copy code |? 
1
  @Override
2
  public int getInt(int ordinal) {
3
    assertIndexIsValid(ordinal);
4
    return Platform.getInt(baseObject, getFieldOffset(ordinal));
5
  }

而针对可修改的binary,则返回内存copy,以免row被后续mapPartitions等处理时修改了:

 Scala |  copy code |? 
01
02
  @Override
03
  public byte[] getBinary(int ordinal) {
04
    if (isNullAt(ordinal)) {
05
      return null;
06
    } else {
07
      final long offsetAndSize = getLong(ordinal);
08
      final int offset = (int) (offsetAndSize &gt;&gt; 32);
09
      final int size = (int) offsetAndSize;
10
      final byte[] bytes = new byte[size];
11
      Platform.copyMemory(
12
        baseObject,
13
        baseOffset + offset,
14
        bytes,
15
        Platform.BYTE_ARRAY_OFFSET,
16
        size
17
      );
18
      return bytes;
19
    }
20
  }

另外,没有看到unsaferow的内存何时释放。

Spark1.6在2016年1月发布,其中包含内存模型的优化,将之前shuffle、storage、other内存静态配置的方式,修改为动态抢占式。其对应设计稿中,包含了多种折衷和思考,对于理解新老版本内存模型都很有好处,故翻译如下。其中部分专有名词维持英文。

英文原文地址:https://issues.apache.org/jira/secure/attachment/12765646/unified-memory-management-spark-10000.pdf

Unified Memory Management in Spark 1.6

本文提出一种新的Spark内存管理模型,打破了老版本中execution和storage内存空间之间不可逾越的边界。Execution可以借用空闲的storage内存,反之亦然。而当内存压力上升时,被借用的内存会被回收。但考虑到实现复杂度,不会回收被借用于exection的内存(即仅会将execution借给storage的内存,回收还给execution)。

概览

Spark的内存可以大体归为两类:execution和storage。前者包括shuffles、joins、sorts和aggregations所需内存,后者包括cache和节点间数据传输所需内存。

在Spark 1.5和之前版本里,两者是静态配置的,不支持借用。这种方式有如下限制:

  • 不存在普适的默认配置
  • 内存配置的调优,需要使用者了解spark内部原理
  • (默认配置下)即使application无需cache,也只能使用很小一部分内存

我们的目标是,通过内存空间的融合,消除以上限制。

最终结果是提供更好的性能,且无须使用者调优就可以获得较优的内存使用。而且,由于无须静态配置内存分配方式,可以使用一个application支撑多种不同负载,而不会导致过多的spill。

老版本的内存管理

老版本Spark采用静态分配的内存空间,即将全部内存划分为3个独立区块,通过Spark配置设定每个区块相对JVM heap的百分比。

Execution:用于缓存shuffle、join、sort和aggregation的临时数据,通过spark.shuffle.memoryFraction(默认0.2)配置。

Storage:主要用于缓存未来可能会访问的数据,但也用于broadcast和发送较大的task结果(缓存和部分网络应用),通过spark.storage.memoryFraction(默认0.6)配置。

Other:剩余内存主要用于用户代码和spark内部meta数据。由于该类型内存不可控(默认0.2),故本文不再予以讨论。

在可控内存区块,一旦达到配置上限,内存中的数据会被spill至磁盘。而storage区块的数据(在ONLY_MEM配置时)甚至会被抛弃。不论哪种情况,都会导致IO或重算,造成性能下降。

内存配置详解

为了降低OOM风险,例如应对超大数据单元,Spark内存管理过于小心了 ,引入了safety配置解决数据倾斜。内存相关配置如下:

spark.shuffle.memoryFraction(default0.2)

spark.shuffle.safetyFraction(default0.8)

spark.storage.memoryFraction(default0.6)

spark.storage.safetyFraction(default0.9)

spark.storage.unrollFraction(default0.2)

即execution内存最多仅占JVM heap的0.2*0.8=16%!对于无需cache数据的应用,大部分heap内存都被浪费了,而(shuffle等)中间数据却被频繁spill到磁盘并读取。unroll相关配置将在后面介绍。

Execution内存管理

execution内存被分配给JVM里的多个task线程。与上面不同的是,task间的execution内存分配是动态的,如果没有其他tasks存在,Spark允许一个task占用所有可用execution内存。

有以下几个内存相关类:

ShuffleMemoryManager负责全局计数和内存调度(policy enforcement)。它是核心仲裁者,根据task当前内存用量决定如何进行分配。一个JVM里仅有一个实例。

TaskMemoryManager负责每个task的内存分配和跟踪。它通过page table追踪on-heap内存块,task退出时它若检查到有page未释放则会抛出异常。它使用ExecutorMemoryManager真正执行分配和释放。每个task一个实例。

ExecutorMemoryManager负责具体实现,它管理on-heap和off-heap内存,并实现了一个weak reference pool支持跨tasks的空闲页复用。一个JVM里仅有一个实例。

这几个类如下进行交互。一旦task需要分配较大内存,它首先向ShuffleMemoryManager申请X bytes。如果申请可以被满足,task会向TaskMemoryManager要求分配X bytes内存。后者更新自己的page table,并且要求ExecutorMemoryManager真正分配内存。

内存分配策略

每个task可以向ShuffleMemoryManager申请1/N的execution内存,N是当前存在的tasks数量(通过cores配置)。如果内存申请无法满足,可能会导致数据spill从而释放一些内存。依赖于不同的操作类型,该task随后可能继续尝试,或要求分配较小一些的内存。

为了避免过多spill,task仅当已分配到1/(2N)内存后才会spill。如果还没达到1/(2N)就内存不足了,那么该task会阻塞等待其他tasks释放内存。否则,如果先启动的task占用过多内存,会导致后启动的tasks持续spill。

例子:某executor先启动一个task A,并在task B启动前快速占用了所有可用内存。(B启动后)N变成2,task B会阻塞直到task A spill,自己可获得1/(2N)=1/4的execution内存。而一旦task B获取到了1/4的内存,A和B就都有可能spill了。

注意:直到A无法获得更多内存,它才会spill。与此同时,由于内存被占用,其他新tasks可能被饿死。可用通过一些强制spill机制予以规避,但不在本文讨论范围。

Storage内存管理

Storage内存通过BlockManager管理,虽然主要用于缓存RDD分片,它也被用于torrent broadcast和将较大的结果数据发送给driver。

Storage level

每个block都设置了一个storage level,用于指定是存储在内存、磁盘还是off-heap。Block也可以设置为存储在内存和磁盘,当内存不足的时候就可以转移至磁盘了。

Storage level也指定是否以序列化的方式进行存储。需要注意MEMORY_AND_DISK_SER,即内存数据也是以序列化方式存储的,故当需要转移至磁盘时无需额外序列化了,这种配置时,内存回收代价较低。

回收(evict)策略

老版本的回收策略基本就是LRU,且仅用于内存存储数据。有两个例外需要注意。首先,永远不会为了保存当前RDD的新blocks而回收其已存在的blocks。其次,如果unrolling失败,那所涉及的新block直接被回收。

Unrolling

如果BlockManager收到需存储在内存中的iterator,需要将其unroll为数组。但由于迭代数据可能较大、无法全部放入内存,BlockManager需要逐步unroll以避免OOM,并间歇性的检查内存是否足够。

Unroll所需内存消耗storage空间。如果当前没有缓存数据,unrolling可占用全部storage空间。否则,unrolling内存上限由spark.storage.unrollFraction(默认0.2)配置。注意,这些sub-region(unrolling、cache等)都不是静态保留,而是随着抛弃现存blocks而冬天变化的。

如果在抛弃现存blocks后unrolling还是失败了,BlockManager会直接把unrolling对应的block存储到磁盘里。

设计方案

下面给出设计方案,并讨论折衷点。这里仅涉及high level内存管理策略。

建议

Execution和storage的内存空间可交叉。如果execution内存不足,可以借用storage的空闲空间,反之亦然。被借用的内存可以随时被回收,但考虑到实现复杂度,第一版设计里,借给execution的内存用于不会被强制回收。将引入以下新配置:

  • spark.memory.fraction(默认0.75):用于execution和storage的堆内存比例。该值越低,越容易发生spill和缓存数据回收。该配置实际上也限定了OTHER内存大小,以及偶发超大record的内存消耗。
  • spark.memory.storageFraction(默认0.5):spark.memory.fraction中storage内存的比例。缓存数据仅在该内存超限时回收。
  • spark.memory.useLegacyMode(默认false):是否使用老版本的内存管理模型。老版本将堆内存分为固定大小的区块,在未调优时可能导致过度spill。以下deprecated内存配置仅该配置打开时生效:

    • spark.storage.memoryFraction
    • spark.storage.safetyFraction
    • spark.storage.unrollFraction
    • spark.shuffle.memoryFraction
    • spark.shuffle.safetyFraction

以下将讨论折衷。

如何处理内存压力

由于execution和storage共享同一块内存空间,在内存压力上升导致竞争时,我们必须明确内存回收规则。有以下三种可能的选择:

  • 优先回收execution内存
  • 优先回收storage内存
  • 不回收内存

内存回收代价

Storage内存的回收依赖于storage level。MEMORY_ONLY的数据回收代价最大,因为后续的访问将导致重算。MEMORY_AND_DISK_SER代价最小,因为内存回收过程不涉及序列化,仅增加了I/O。

Execution内存的回收场景比较单一,所有被回收的数据将被spill到磁盘,不涉及重算。而且随着最近新增的unsafe功能,execution数据大多以紧凑格式存储,序列化成本也较低。

但有一点需要注意的是,被spill到磁盘的数据必须被回读,所以虽然没有重算,但execution内存回收仍有一定成本。

实现复杂度

Storage回收比较简单,基于现有机制就可将数据落地磁盘。但execution回收较为复杂,有以下方式:

  • 为所有execution内存块注册一个spill callback
  • 修改回读(poll)和spill以实现联动

每种方式,我们都需要关注是否有一个页存储了cache数据(我的理解是,都需要跟踪存储位置)。而且,一些操作还假设至少有一个page内存可用,如果我们强制spill execution内存,就需要小心、以免回收掉保留内存,而把这些操作饿死。

另外,当回收execution内存时,还需要解决如何处理即将被cache的blocks的问题。最简单的方式是等待直到有足够的内存被释放了,但这可能会导致死锁,尤其是待缓存块还依赖内存中exection数据的时候(例如,persist shuffled数据的时候)。一种替代方案是,将待缓存块直接落地磁盘,随后一旦有空闲内存再读取出来。为了避免落地所有待缓存块,可预留少量(例如5%的堆)内存用于存储一部分待缓存块。

但考虑到复杂度,第一版中暂不实现。如果今后证明该功能确实重要,再考虑升级。

选型

最终选择回收缓存数据。由于设计目的是期望提供更多的execution内存,而强制回收execution内存于事无补。另外该选择也更容易实现。

但如果允许所有回收缓存数据,那对于依赖缓存的applications也会有显著影响。所以,我们也需要为缓存块保留一部分内存。

最小保留(Minimum reservations)

不论是回收缓存块还是spill execution内存,都需要允许使用者保留一部分内存以免任一方被饿死(注:因为回收和spill都会释放出空闲的、可被借用的内存)。有动态和静态两者设置方法。老版本为execution和storage都提供了静态保留方式,不足之处仍然是割裂。

这次新的设计里提供动态最小预留内存的方式。即storage可以借用execution内存,但一旦后者需要就会立即被回收并归还。反之亦然,但有一个例外,如果application尝试缓存数据块,但所有内存都用于execution了,那我们不会回收execution内存,而是直接将待缓存数据块落地磁盘。

另外,我们还需为OTHER用途设置内存,老版本默认是20%的堆空间,且在application执行期不变。本次设计不予以改变,但配置方式改为通过spark.memory.fraction(默认0.75),即剩余空间用于OTHER。

其他设计方式

以上分析了相关技术折衷点,这里再对比其他一些方面。

在所有新设计里(A-C),execution和storage内存之和都由spark.memory.fraction限制。另外,为了后向兼容性,使用者可开启spark.memory.useLegacyMode以使用老版本。

(X)Existing behavior。将execution和storage内存分别限制在独立空间,不允许交叉。各自内存空间大小是静态配置的。用于Spark1.5及之前版本。

(A)Evict cached blocks,full fluidity。Execution和storage共享统一的内存空间。当内存压力大时,回收缓存数据。仅当抢占storage后依然内存不足时,execution才会spill。

(B)Evict cached blocks,static storage reservation。与A类似,但为storage保留一定内存不可被借用,缓存块仅当超出该空间时才会被回收和借用。通过spark.memory.storageFraction(默认0.0)进行配置,执行期不可变。

(C)Evict cached blocks,dynamic storage reservation。与B类似,但storage的保留内存也是动态可变的。区别是当内存空闲时,execution可以最大限量的借用。这也是我们选择的方案。

未选择A的原因是,它没有解决多租赁(multi-tenancy)和严重依赖缓存的application问题。所以提出B,添加了保留内存。

B的问题是,保留内存在某些场景下仍然需要使用者配置。在共享环境里(注:一个application支撑多处理请求时)默认值0并不奏效;通过较大的shuffle,一个使用者可能回收另一个使用者的缓存数据。另外,不论设置什么非0的配置,都会导致内存块割裂。例如,为了避免共享环境下潜在的性能倒退,可能设置该值为0.6(旧版本默认storage比例),从而导致execution内存仅占0.4*0.75 = 0.3的堆空间,并不比之前好,尤其是根本无需缓存数据时。

C没有设立强制的内存隔离。当storage空间有空闲时,execution可以借用。唯一的问题是,当storage需要缓存数据,但内存都被execution占用时怎么办?初始版本直接将待缓存数据落地磁盘。

C也是唯一满足如下要求的:

  • Storage内存没有上限(X不满足)
  • Execution内存没有上限(X和B不满足)
  • 可保证最小storage空间(A不满足)

所以我们选择了方案C

(注,后面是实现层面,看代码更合适,故不翻译了)

========================

最后总结下,在新版本且legacy=false时,有如下内存借用规则:

  • execution可借用所有空闲的storage内存
  • execution也可以收回storage向自己借用的内存
  • 但storage已使用的、属于storage的内存,不可被借用
  • storage可借用execution空闲内存
  • 但storage无法收回已经被execution借用的内存

故可以看到规则整体倾向于execution,尽量保证shuffle内存充沛。

另外,该模型不涉及内存实际的分配与释放,仍是由JVM保证的。它做的是内存计数器。

Dynamo使用一致性hash来实现partition and replication,从而达到高扩展和高可用。在实现上,对经典一致性hash进行了一些优化,本文尝试予以解释。

partitioning algorithm

One of the key design requirements for Dynamo is that it must scale incrementally. This requires a mechanism to dynamically partition the data over the set of nodes (i.e., storage hosts) in the system. Dynamo’s partitioning scheme relies on consistent hashing to distribute the load across multiple storage hosts. In consistent hashing [10], the output range of a hash function is treated as a fixed circular space or “ring” (i.e. the largest hash value wraps around to the smallest hash value). Each node in the system is assigned a random value within this space which represents its “position” on the ring. Each data item identified by a key is assigned to a node by hashing the data item’s key to yield its position on the ring, and then walking the ring clockwise to find the first node with a position larger than the item’s position.

假设我们认为hash ring的取值范围是pow(2, 32),那么经典一致性hash算法采用随机的方式计算position,例如直接random(0, pow(2,32)-1),将存储节点映射到ring上,两个存储节点的position就圈出了一个region。在读写item时,也采用任意hash函数计算范围是pow(2,32)的position,看它落在哪个region里以决定由哪个存储节点负责。

Thus, each node becomes responsible for the region in the ring between it and its predecessor node on the ring. The principle advantage of consistent hashing is that departure or arrival of a node only affects its immediate neighbors and other nodes remain unaffected.

采用经典一致性hash的好处是,当有存储节点变化时,影响较为平滑,仅邻居节点需要向新加入的节点迁移数据。

The basic consistent hashing algorithm presents some challenges. First, the random position assignment of each node on the ring leads to non-uniform data and load distribution. Second, the basic algorithm is oblivious to the heterogeneity in the performance of nodes. To address these issues, Dynamo uses a variant of consistent hashing (similar to the one used in [10, 20]): instead of mapping a node to a single point in the circle, each node gets assigned to multiple points in the ring. To this end, Dynamo uses the concept of “virtual nodes”. A virtual node looks like a single node in the system, but each node can be responsible for more than one virtual node. Effectively, when a new node is added to the system, it is assigned multiple positions (henceforth, “tokens”) in the ring. The process of fine-tuning Dynamo’s partitioning scheme is discussed in Section 6.

但是经典方法也有一些弊端。随机计算的节点position会导致不均衡,而且也没有考虑节点间的异构性。Dynamo引入virtual nodes的概念予以解决。

Replication

To achieve high availability and durability, Dynamo replicates its data on multiple hosts. Each data item is replicated at N hosts, where N is a parameter configured “per-instance”. Each key, k, is assigned to a coordinator node (described in the previous section). The coordinator is in charge of the replication of the data items that fall within its range. In addition to locally storing each key within its range, the coordinator replicates these keys at the N-1 clockwise successor nodes in the ring. This results in a system where each node is responsible for the region of the ring between it and its Nth predecessor. In Figure 2, node B replicates the key k at nodes C and D in addition to storing it locally. Node D will store the keys that fall in the ranges (A, B], (B, C], and (C, D].

为了获得高可用,Dynamo采用多副本的方式,将数据copy到ring上连续的N个节点。

The list of nodes that is responsible for storing a particular key is called the preference list. The system is designed, as will be explained in Section 4.8, so that every node in the system can determine which nodes should be in this list for any particular key. To account for node failures, preference list contains more than N nodes. Note that with the use of virtual nodes, it is possible that the first N successor positions for a particular key may be owned by less than N distinct physical nodes (i.e. a node may hold more than one of the first N positions). To address this, the preference list for a key is constructed by skipping positions in the ring to ensure that the list contains only distinct physical nodes.

regions与存储节点间的一对多映射关系被称为preference list,这个list需要在Dynamo节点间传播、共享。并保证仅存储distinct物理节点。

更进一步的优化

Dynamo uses consistent hashing to partition its key space across its replicas and to ensure uniform load distribution. A uniform key distribution can help us achieve uniform load distribution assuming the access distribution of keys is not highly skewed. In particular, Dynamo’s design assumes that even where there is a significant skew in the access distribution there are enough keys in the popular end of the distribution so that the load of handling popular keys can be spread across the nodes uniformly through partitioning. This section discusses the load imbalance seen in Dynamo and the impact of different partitioning strategies on load distribution.

关于数据倾斜,这里有几个假设的前提。首先,在keys的流量分布不是严重倾斜时,均匀的keys分布可以获得较为均衡的负载。其次,Dynamo认为当popular keys足够多时,即使流量不均衡,只要把popular keys分散开,也可以获得均衡的负载。但这反过来也就意味着,当流量不太大、且仅集中于少量popular keys的时候,可能有些节点会过载。Dynamo的分片策略也在演进如下。(另外,这里有一个隐含的概念,即负载均衡,需要数据分布均衡、流量分布均衡)

Strategy 1: T random tokens per node and partition by token value: This was the initial strategy deployed in production (and described in Section 4.2). In this scheme, each node is assigned T tokens (chosen uniformly at random from the hash space). The tokens of all nodes are ordered according to their values in the hash space. Every two consecutive tokens define a range. The last token and the first token form a range that “wraps” around from the highest value to the lowest value in the hash space. Because the tokens are chosen randomly, the ranges vary in size. As nodes join and leave the system, the token set changes and consequently the ranges change. Note that the space needed to maintain the membership at each node increases linearly with the number of nodes in the system.

策略1是最原始的方式,每个物理节点虚出T个virtual nodes,每个virtual node随机分配一个token。tokens圈出的range决定其存储数据的范围。由于采用随机的方式,故需要一个完整的映射表,才可以获悉整个Dynamo集群的存储映射关系,而该表又随着节点数的线性增长。

While using this strategy, the following problems were encountered. First, when a new node joins the system, it needs to “steal” its key ranges from other nodes. However, the nodes handing the key ranges off to the new node have to scan their local persistence store to retrieve the appropriate set of data items. Note that performing such a scan operation on a production node is tricky as scans are highly resource intensive operations and they need to be executed in the background without affecting the customer performance. This requires us to run the bootstrapping task at the lowest priority. However, this significantly slows the bootstrapping process and during busy shopping season, when the nodes are handling millions of requests a day, the bootstrapping has taken almost a day to complete. Second, when a node joins/leaves the system, the key ranges handled by many nodes change and the Merkle trees for the new ranges need to be recalculated, which is a non-trivial operation to perform on a production system. Finally, there was no easy way to take a snapshot of the entire key space due to the randomness in key ranges, and this made the process of archival complicated. In this scheme, archiving the entire key space requires us to retrieve the keys from each node separately, which is highly inefficient.

这种方式仍存在弊端。首先,新节点的加入,需要从一些old节点迁移数据,由于数据不是连续存储的,扫描较为耗时,且需后台执行。其次,由于range变化导致数据变化了,所涉及的merkle trees需要重算。最后,snapshot也不好做。(这里由于对Dynamo底层存储细节不太了解,无法细致推导)

The fundamental issue with this strategy is that the schemes for data partitioning and data placement are intertwined. For instance, in some cases, it is preferred to add more nodes to the system in order to handle an increase in request load. However, in this scenario, it is not possible to add nodes without affecting data partitioning. Ideally, it is desirable to use independent schemes for partitioning and placement. To this end, following strategies were evaluated:

根本原因是数据分片与数据分布耦合在一起了!

Strategy 2: T random tokens per node and equal sized partitions: In this strategy, the hash space is divided into Q equally sized partitions/ranges and each node is assigned T random tokens. Q is usually set such that Q >> N and Q >> S*T, where S is the number of nodes in the system. In this strategy, the tokens are only used to build the function that maps values in the hash space to the ordered lists of nodes and not to decide the partitioning. A partition is placed on the first N unique nodes that are encountered while walking the consistent hashing ring clockwise from the end of the partition. Figure 7 illustrates this strategy for N=3. In this example, nodes A, B, C are encountered while walking the ring from the end of the partition that contains key k1. The primary advantages of this strategy are: (i) decoupling of partitioning and partition placement, and (ii) enabling the possibility of changing the placement scheme at runtime.

策略2将data partition和location的概念分开,partition是数据分片,而location是由token圈定的范围决定的。另外partition是等分的,一般取值非常大。在读写item时,hash出posittion,看它落在哪几个tokens圈定的范围里。这时partition的概念感觉意义还不大,从论文里也可以看到,策略2仅是一个过渡方案。真正重要的是策略3。

Strategy 3: Q/S tokens per node, equal-sized partitions: Similar to strategy 2, this strategy divides the hash space into Q equally sized partitions and the placement of partition is decoupled from the partitioning scheme. Moreover, each node is assigned Q/S tokens where S is the number of nodes in the system. When a node leaves the system, its tokens are randomly distributed to the remaining nodes such that these properties are preserved. Similarly, when a node joins the system it “steals” tokens from nodes in the system in a way that preserves these properties.

策略3也是将hash space等分为取值较大的Q个partitions,而且partition与存储位置解耦(映射关系,而不是相等关系)。同时,每个存储节点分配Q/S个tokens(即形成Q/S个virtual nodes)。所以virtual node的存储范围其实与partition大小是一致的了,但由于物理节点与virtual节点间是1对多关系,故可以灵活的以virtual node为单位进行数据迁移。当有存储节点离开时,它的tokens随机分布到其他节点上;有节点加入时,不是随机分配新tokens了,而是从现有tokens里随机获取所需。

另外,由于tokens总量都不变,故映射表也恒定。猜测:在put/get item时,需要通过item hash出的position算出partition,再由partition映射到token(由于两者都是等分且数量一致,可以直接=或算出)、token映射到node。有节点增删时,需要all token list,从中随机获取tokens后,根据token到node的映射关系,迁移数据,并修改token到node的映射关系,这时partition到token的关系是不需要变化的。

Hadoop生态主要是基于GitHub和jira构建开源社区,今年希望可以参与进去,记录使用方法如下。

由于每个开源项目的要求都可能不同,所以在开始之前,必须先阅读其Contribute页面,一般从README里就可以找到链接。

JIRA操作

当有了代码修改想法时,不论是bugfix,还是功能改进,都可以到项目JIRA页面上,提交一个issue,用英文大致描述想要做的事情。这里需要注意两点:

  1. 提交issue前,先用英文关键词search一下,确认该功能没有实现、且没有其他人提交类似issue
  2. issue粒度最好足够细,一个独立的小功能就好,类似我们敏捷中的task卡片。

如果想自己动手提交代码,可以将该issue assign给自己。真正动手时,可以将issue状态修改为 in progress。

GitHub 操作

在正式coding前,得在github上找到该项目,点击fork按钮fork出一个自己的分支,这样后续在这个分支上的工作只要不被管理员merge回去,就不会有任何影响。

git操作

这时可以建立本地代码仓库了:

git clone https://<path-to-your-repo> <your-local-prj-name> 将远端项目拉取到本地

cd <your-local-prj-name>

git checkout -b <new-branch-name> 建立一个分支,该issue相关的功能都会在这个分支里进行(git与svn不同,git的分支很轻量级,可以认为是功能隔离的单位)

git push origin <new-branch-name>提交新分支,这时github上自己的project下这个分支可见、但为空。

这时可以在新分支上编码了,完成并通过自测后,可以先提交到分支上

git add <new-files>

git commit -m ‘comments’

git push origin <new-branch-name> 这时代码在新分支上可见,但放心,不会影响到社区版本

如果编码持续了一段时间,可能需要从社区版本更新代码下来

git remote add upstream https://<path-to-public-repo> 添加社区版本为upstream源

git fetch upstream 下载社区版本的更新到本地隐藏目录

git checkout master 切换到master分支

git merge upstream/master 合并代码到本地master分支

git push origin master 将合并的结果提交到自己远端的master分支

GitHub操作

这时需要让别人看到自己的代码了,在github自己的project页面上,点击 pull request发起请求。在收到别人回复时,可以进行交流、重复代码修改过程。

JIRA操作

这时还需要将pull request与JIRA关联起来,点击issue页面More/Link,添加一个Web Link,将pull reqeust的uri填进去,link text可以写PR #xxx。

最后,当pull request完成,即被merge回社区或被彻底拒绝后,可以在git里删除分支,并且关闭issue。