Archive for 七月, 2016

从2013年的millwheel,到2015年的dataflow,再到2016年strata hadoop上的《Watermarks:

Measuring Time and Progress in Streaming Pipelines》,Google连续发表了3篇low watermark相关论文,致力于解决实时计算中的时序性问题。

但何为low watermark,为什么需要它?与其他方式相比有什么优劣?实时计算已出现多年,为什么此时才需要解决时序性?Paper里最关键的地方是什么?本文尝试解答这些问题。

时序性问题

实时计算的数据源一般来自于数据传输系统,从业务系统里收集日志消息,违反时序性的常见场景有:

  1. 从多个机房/地域采集数据,导致数据乱序(局部有序、整体乱序)
  2. 某机器或进程故障,重启后数据大大延迟
  3. 某机房或交换机故障,恢复后数据大大延迟

前一种是常态,后两者可视为故障恢复后的数据回追,个人认为low watermark主要解决第一类问题,但也为第二类问题提供一种可能的解决方案。

那么乱序会带来什么问题呢?Millwheel论文里举了一个典型的例子,即波峰波谷的计算:

In the case of Zeitgeist, shown in Figure 1, our input would be a continuously arriving set of search queries, and our output would be the set of queries that are spiking or dipping.

it is important to be able to distinguish whether a flurry of expected Arabic queries at t = 1296167641 is simply delayed on the wire, or actually not there.

计算搜索词在一个时间窗口(e.g. 1分钟)内的搜索次数,将其波动率与阈值比较甚至策略,以发现异常事件。波峰时由于数据量大,少量数据延迟影响可以忽略。但波谷本身数据量就小,无法判断是真的到达波谷,还仅仅是由于延迟导致数据还在传输途中。

之前的做法是再设置一个安全时间,延迟几分钟再计算,但安全时间如何设置?过长则影响时效性,过短无法奏效。或者采取右端开放窗口的补偿方法,先产出数据,当延迟数据到来时再予以修改,但需要业务场景适用、下游系统支持修改。

除了aggregation计算对时序性有要求外,一些策略计算可能也会要求数据完整、有序。

low watermark原理

The low watermark for a computation provides a bound on the timestamps of future records arriving at that computation.

low watermark是一个时间戳,它承诺未来不会有早于该时间戳的数据到达该计算节点。这里的时间计算一般基于eventtime,即事件发生时间,例如业务系统的log时间,而较少使用数据到达计算节点的时间(processing time,某些场景也可以用)。

Definition: We provide a recursive definition of low watermarks based on a pipeline’s data flow. Given a computation, A, let the oldest work of A be a timestamp corresponding to the oldest unfinished (in-flight, stored, or pending-delivery) record in A. Given this, we define the low watermark of A to be

  • min(oldest work of A, low watermark of C : C outputs to A)
  • If there are no input streams, the low watermark and oldest work values are equivalent.

对于复杂问题,我们往往将其拆分为子问题,这里也一样,将其转化为递归。对于一个最简单的拓扑片段C->A,即A订阅C产生的数据(C和A都是多并发、分布式的),其计算公式如上。需要注意,如果C没有input streams,即importer节点(storm里的spout节点),那么the low watermark and oldest work values are equivalent。

进一步又区分了input和output low watermark:

  • Input Low Watermark: Oldest work not yet sent to this streaming stage.
  • InputLowWatermark(Stage) = min { OutputLowWatermark(Stage’) | Stage’ is upstream of Stage}
  • Output Low Watermark: Oldest work not yet completed by this streaming stage.
  • OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) }

Input low watermark是所有上游节点output low watermark的最小值,对于importer而言就是最小订阅进度。而Output low watermark是对下游的承诺,由自己的input递归传递上游承诺,由自己产出数据的oldest work代表自己的承诺,两者取min。

这里有两个重要的隐藏结论:

  1. output low watermark <= input low watermark,即下游永远不可能比上游快
  2. 当前节点使用input low watermark驱动窗口计算,即input是计算中直接使用的,而output只是递归的需要

关键技术点

递归问题的一个关键是能否找到初始条件,low watermark也一样,If there are no input streams, the low watermark and oldest work values are equivalent. 即importer的订阅进度如何计算。Google论文中笼统指出file和pub-sub两种数据源data time的计算方法,而在我们的场景下,数据一般来自于业务系统的前端机,通过类似kafka的消息传输系统进入实时计算,可以保证来源于一个前端机的数据是有序的,但传输过程中多个前端机数据乱序。

low-watermark-importer

从上图示例可以看出,importer的计算过程包含以下3步:

  1. progress of single web server = max(input data time)
  2. input low watermark of single worker = min(progress of single web server)
  3. input low watermark of importer = min(input low watermark of single worker)

在实际中,由于各importer节点数据互不相干,故若importer中存在窗口,可直接使用step 2的结果驱动。另,示例中假设importer无窗口计算,故output与input low watermark相等。

low watermark的工程实现

Millwheel论文中采取了集中式,而Flink据说是采用inject的方式。而我们的小批量实时系统,本身就是基于query and own的task管理模型,故也采取了集中计算的方式。

抽象了两类包含data time的task,即data task和state task,前者代表该task批次里的oldest work value,后者存储当前计算节点的input low watermark。TaskManager中心节点维持了上下游订阅关系 -> datatime -> count的计数器,从而将多并发计算节点的时间戳汇聚为层级算子的时间戳。

下游节点向TaskManager请求上游算子的两类时间戳,根据公式计算input low watermark并驱动窗口滑动。

计算的时机是:

  1. 在本轮数据处理前,计算input low watermark
  2. 在本轮数据处理结束时,计算oldest work存入data task,并保存input low watermark至state task

这里需要注意的是,虽然OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) },但我们没有直接存储上游节点的output low watermark,而是将input和oldest值分别存储,原因是:

  1. 当前节点的output low watermark在下游节点计算input low watermark时才会被用到,即其产生和使用间有一个时间差
  2. 而当前节点的input low watermark在output low watermark产生后还可以独立前移,分开存储能够尽早推动下游 input low watermark的前移(这里比较绕,需要细细琢磨,在分布式场景下考虑就是理所应当的了)

综上所述,记录了我对论文的学习以及实践的一些心得,但由于我们的low watermark尚未经历大规模应用的检验,可能存在疏漏,还望指正。

最后猜测下为什么Google选择在这个时间段发表low watermark。早年的实时计算多追求低延迟、大吞吐、不重不丢,而近些年这方面的表现已较为优秀,所以才进一步追求更细致的功能。另外,早年的实时场景多为辅助型的,近些年已逐渐走入核心领域,故也会提升对业务准确性的要求。

不懂硬件,今天经历了raid卡对性能的影响,特记录如下。

一台需要做journal的中控服务器,在压力相当的情况下,比其他服务器的写延迟大很多。

top查看每个cpu的数据,发现部分cpu的wa时间非常大(90%)。iostat -dx 3,发现write rqm虽然也持续较大(经常5、6k,甚至上万),写入流量几十MB/s,但对比相似机型,iowait也不会达到如此高。虽然我们的场景是每条journal(KB-百KB)都需要fsync到磁盘,但也不应该性能这么差。

继续安装iotop命令(需翻墙),sudo执行,发现写入操作都集中在journal threads上(TID是thread id,可以gstack 查看具体是什么线程)。B519E22D-58CD-4E92-8227-89C0BD429C44

 Bash |  copy code |? 
1
Thread 5 (Thread 140711690905952 (LWP 15782)):
2
#0  0x0000003f0b90b8fb in __fsync_nocancel () from /lib64/tls/libpthread.so.0
3
#1  0x0000000000573d5f in sp::tm::FileJournalWriter::write ()
4
……
5
#8  0x0000000000789bd8 in thread_proxy ()
6
#9  0x0000003f0b90610a in start_thread () from /lib64/tls/libpthread.so.0
7
#10 0x0000003f0b0c5ee3 in clone () from /lib64/tls/libc.so.6
8
#11 0x0000000000000000 in ?? ()

至此怀疑并不是软件问题。

联系op同学检查硬件,最终发现raid卡的write cache出现问题:

31a628a85965f2359a37ca8cc

作为缓存,raid cache的作用具体体现在读与写两个不同的方面:

  • 作为写,一般存储阵列只要求数据写到cache就算完成了写操作,当写cache的数据积累到一定程度,阵列才把数据刷到磁盘,可以实现批量的写入。所以,阵列的写是非常快速的。至于cache数据的保护,一般都依赖于镜相与电池(或者是UPS)。
  • cache在读数据方面的作用一样不可忽视,因为如果所需要读取的数据能在cache中命中的话,将大大减少磁盘寻道所需要的时间

而我们的服务器在raid故障时,还需要穿透raid做大量write和fsync,将压力真正压到磁盘上,就必然导致io性能的急剧下降了。这种情况只有更换raid卡。

我们靠谱op的解释:

RAID卡里本身有一个cache 数据写到RAID卡的cache里,就返回了,表现出来的性能要比数据直接落盘好。但是为了保证数据掉电不丢失,RAID卡需要有个电池,在掉电后通过电池把数据刷到磁盘里,保证数据可靠性。由于故障机器的RAID卡电池坏了,掉电后无法回刷数据,为了保证数据可靠性,不能通过cache缓存数据,所有的写操作必须直接落到磁盘上,写性能就会变差。

参考资料:
http://xxrenzhe.blog.51cto.com/4036116/1312510