Archive for the ‘实时计算’ Category

从2013年的millwheel,到2015年的dataflow,再到2016年strata hadoop上的《Watermarks:

Measuring Time and Progress in Streaming Pipelines》,Google连续发表了3篇low watermark相关论文,致力于解决实时计算中的时序性问题。

但何为low watermark,为什么需要它?与其他方式相比有什么优劣?实时计算已出现多年,为什么此时才需要解决时序性?Paper里最关键的地方是什么?本文尝试解答这些问题。

时序性问题

实时计算的数据源一般来自于数据传输系统,从业务系统里收集日志消息,违反时序性的常见场景有:

  1. 从多个机房/地域采集数据,导致数据乱序(局部有序、整体乱序)
  2. 某机器或进程故障,重启后数据大大延迟
  3. 某机房或交换机故障,恢复后数据大大延迟

前一种是常态,后两者可视为故障恢复后的数据回追,个人认为low watermark主要解决第一类问题,但也为第二类问题提供一种可能的解决方案。

那么乱序会带来什么问题呢?Millwheel论文里举了一个典型的例子,即波峰波谷的计算:

In the case of Zeitgeist, shown in Figure 1, our input would be a continuously arriving set of search queries, and our output would be the set of queries that are spiking or dipping.

it is important to be able to distinguish whether a flurry of expected Arabic queries at t = 1296167641 is simply delayed on the wire, or actually not there.

计算搜索词在一个时间窗口(e.g. 1分钟)内的搜索次数,将其波动率与阈值比较甚至策略,以发现异常事件。波峰时由于数据量大,少量数据延迟影响可以忽略。但波谷本身数据量就小,无法判断是真的到达波谷,还仅仅是由于延迟导致数据还在传输途中。

之前的做法是再设置一个安全时间,延迟几分钟再计算,但安全时间如何设置?过长则影响时效性,过短无法奏效。或者采取右端开放窗口的补偿方法,先产出数据,当延迟数据到来时再予以修改,但需要业务场景适用、下游系统支持修改。

除了aggregation计算对时序性有要求外,一些策略计算可能也会要求数据完整、有序。

low watermark原理

The low watermark for a computation provides a bound on the timestamps of future records arriving at that computation.

low watermark是一个时间戳,它承诺未来不会有早于该时间戳的数据到达该计算节点。这里的时间计算一般基于eventtime,即事件发生时间,例如业务系统的log时间,而较少使用数据到达计算节点的时间(processing time,某些场景也可以用)。

Definition: We provide a recursive definition of low watermarks based on a pipeline’s data flow. Given a computation, A, let the oldest work of A be a timestamp corresponding to the oldest unfinished (in-flight, stored, or pending-delivery) record in A. Given this, we define the low watermark of A to be

  • min(oldest work of A, low watermark of C : C outputs to A)
  • If there are no input streams, the low watermark and oldest work values are equivalent.

对于复杂问题,我们往往将其拆分为子问题,这里也一样,将其转化为递归。对于一个最简单的拓扑片段C->A,即A订阅C产生的数据(C和A都是多并发、分布式的),其计算公式如上。需要注意,如果C没有input streams,即importer节点(storm里的spout节点),那么the low watermark and oldest work values are equivalent。

进一步又区分了input和output low watermark:

  • Input Low Watermark: Oldest work not yet sent to this streaming stage.
  • InputLowWatermark(Stage) = min { OutputLowWatermark(Stage’) | Stage’ is upstream of Stage}
  • Output Low Watermark: Oldest work not yet completed by this streaming stage.
  • OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) }

Input low watermark是所有上游节点output low watermark的最小值,对于importer而言就是最小订阅进度。而Output low watermark是对下游的承诺,由自己的input递归传递上游承诺,由自己产出数据的oldest work代表自己的承诺,两者取min。

这里有两个重要的隐藏结论:

  1. output low watermark <= input low watermark,即下游永远不可能比上游快
  2. 当前节点使用input low watermark驱动窗口计算,即input是计算中直接使用的,而output只是递归的需要

关键技术点

递归问题的一个关键是能否找到初始条件,low watermark也一样,If there are no input streams, the low watermark and oldest work values are equivalent. 即importer的订阅进度如何计算。Google论文中笼统指出file和pub-sub两种数据源data time的计算方法,而在我们的场景下,数据一般来自于业务系统的前端机,通过类似kafka的消息传输系统进入实时计算,可以保证来源于一个前端机的数据是有序的,但传输过程中多个前端机数据乱序。

low-watermark-importer

从上图示例可以看出,importer的计算过程包含以下3步:

  1. progress of single web server = max(input data time)
  2. input low watermark of single worker = min(progress of single web server)
  3. input low watermark of importer = min(input low watermark of single worker)

在实际中,由于各importer节点数据互不相干,故若importer中存在窗口,可直接使用step 2的结果驱动。另,示例中假设importer无窗口计算,故output与input low watermark相等。

low watermark的工程实现

Millwheel论文中采取了集中式,而Flink据说是采用inject的方式。而我们的小批量实时系统,本身就是基于query and own的task管理模型,故也采取了集中计算的方式。

抽象了两类包含data time的task,即data task和state task,前者代表该task批次里的oldest work value,后者存储当前计算节点的input low watermark。TaskManager中心节点维持了上下游订阅关系 -> datatime -> count的计数器,从而将多并发计算节点的时间戳汇聚为层级算子的时间戳。

下游节点向TaskManager请求上游算子的两类时间戳,根据公式计算input low watermark并驱动窗口滑动。

计算的时机是:

  1. 在本轮数据处理前,计算input low watermark
  2. 在本轮数据处理结束时,计算oldest work存入data task,并保存input low watermark至state task

这里需要注意的是,虽然OutputLowWatermark(Stage) = min { InputLowWatermark(Stage), OldestWork(Stage) },但我们没有直接存储上游节点的output low watermark,而是将input和oldest值分别存储,原因是:

  1. 当前节点的output low watermark在下游节点计算input low watermark时才会被用到,即其产生和使用间有一个时间差
  2. 而当前节点的input low watermark在output low watermark产生后还可以独立前移,分开存储能够尽早推动下游 input low watermark的前移(这里比较绕,需要细细琢磨,在分布式场景下考虑就是理所应当的了)

综上所述,记录了我对论文的学习以及实践的一些心得,但由于我们的low watermark尚未经历大规模应用的检验,可能存在疏漏,还望指正。

最后猜测下为什么Google选择在这个时间段发表low watermark。早年的实时计算多追求低延迟、大吞吐、不重不丢,而近些年这方面的表现已较为优秀,所以才进一步追求更细致的功能。另外,早年的实时场景多为辅助型的,近些年已逐渐走入核心领域,故也会提升对业务准确性的要求。