近年來流計算技術發展迅猛,甚至有后來居上一統原本批處理主導的分布式計算之勢,其中 Watermark 機制作為流計算結果準確性和延遲的橋梁扮演著不可或缺的角色。然而由于缺乏高質量的學習資源加上計算 Watermark 確實不是一件容易的事情,不少有著批處理計算背景的用戶在流計算作業的開發中可能并不理解 Watermark 的重要意義,從而多走了很多彎路。為此,本文將基于筆者的學習積累和開發經驗,談談個人對 Watermark 的理解,希望起到拋磚引玉的作用。
本文將首先說明 Watermark 提出的背景,然后詳細解析 Watermark 的原理,最后結合工業案例說明 Watermark 在實踐中如何被應用。
Watermark 背景
自 google 的三篇論文和 Hadoop 出現后,工業界的分布式計算技術進入了百花齊放的時期,然而相比于離線批處理計算的蓬勃發展,作為后來者的流計算卻有點停滯不前。流計算和批處理在對于每條記錄的單獨處理上基本一致,不同之處在于聚合類的計算。批處理計算結果的輸出依賴于輸入數據集合的結束,而流計算的輸入數據集通常是無邊界的,不可能等待輸入結束再輸出結果。針對這個問題流處理引入了窗口的特性,簡單來說就是將無限的數據流按照時間范圍切分為一個個有限的數據集,所以我們依然能夠沿用批處理的計算模型。來到這時,業界在流計算和批處理的關系上出現了兩種截然不同的觀點,一個觀點認為流計算是批處理的特例,另一個觀點則認為批處理是流處理的特例。
實時計算與離線計算的分離
流計算是批處理的特例的觀點在早期占據了主導的地位,其中最為典型的便是以 Spark Streaming 為代表的 micro-batching 類型的實時處理框架的流行。Micro-batching 的主要思想是以分鐘甚至秒級別的執行間隔來將批處理應用到數據流上,但不久后人們意識到這種計算模型依然不能完全滿足低延遲高準確性的要求,主要問題除了批處理調度導致的延遲外,還有一點是窗口變小后,數據收集延遲對結果準確性的影響大大增強了。比如說計算一個游戲服務器每 5 min 的新登錄玩家數,但因為網絡或者客戶端設備故障等因素,12:00 的玩家登錄日志可能在 12:10 才被收集到服務器,如果實時計算在 12:05:00 就輸出結果,必然會漏掉這條遲到的數據。在離線計算中這樣的問題并不明顯,因為一個批次的時間跨度較大且對延遲要求不高,因此計算的時間可以設置一個安全的延遲,比如 1 個小時,確保數據都已經收集完成后再開始計算,即使有大量數據是在 1 個小時后才收集到,只需要重算結果即可。然而這樣的實踐經驗并不能應用于實時計算,一是引入額外的安全延遲對于很多對延遲敏感的場景不可接受,二是實時計算的重算要比批處理重算的成本高出很多。因此業界普遍是采用 結合離線和實時處理的 Lambda 架構來應對這個問題,其主要思想是同時運行實時和離線兩個數據處理管道,實時管道提供最近小時內的臨時結算結果,而離線管道提供小時以前的計算結果并覆蓋掉對應時間段的實時計算結果,查詢時將兩者的結果再進行合并產生最終的結果 [1] 。
實時計算與離線計算的融合
實時計算與離線計算的分離說明了用批處理模型不足以表達流計算,于是人們開始探索批處理是流計算特例的模型。2015 年 Google 發表名為 The Dataflow Model 的論文,這篇論文較為詳細地闡述了實時流計算和離線批計算的統一模型(出于篇幅原因不展開講,詳情請見 [2] ),而該模型基于批處理是流計算特例的觀點。The Dataflow Model 將計算分為四個要素,即 what、where、when 和 how:
- what 表示要計算什么結果,即對數據的一系列轉換操作;
- where 表示結果計算上下文,即窗口如何定義;
- when 表示何時輸出和物化計算結果;
- how 表示如何清理已經輸出的結果。
在 what 和 where 兩點上流計算和批處理是相似的,而主要不同之處在于 when 和 how 兩點,這兩點在批處理里基本不會涉及,但在流計算里卻影響著計算結果的準確性,實際上它們分別對應了上文所說的批處理經驗不能應用于實時計算的兩個問題。本文主要討論的 watermark 就是屬于 when 要素里的一種技術,因而下文將主要關注 when。
在批處理中 when 是輸入數據集結束的時候,how 是以覆蓋的形式來清理之前的輸出結果,處理模式都是固定的,因此用戶并不需要考慮。舉個例子,假設要計算一個游戲每天的玩家充值金額,用離線計算時我們會考慮如何將充值金額從日志中提取出來并累加到一起,此為 what;再考慮批處理的運行時間,比如每天 00:30,所以每次計算是處理 24 小時采集到的數據,此為 where;而批處理的 when 是和 where 綁定的,即 00:30 計算開始,結束后馬上輸出結果;至于 how,不同批次的批處理運行的結果是互不相干的,同一批次的運行結果會覆蓋前一次運行的結果。
然而如果游戲策劃急于知道某個活動是否有帶動玩家充值,希望看到每分鐘更新的實時數據,那么上述題目改為用實時流計算去實現,此時要考慮的東西會復雜一點。首先,我們可以依舊可以復用批處理的 what 和 where,即定義一個時間范圍為 24 小時的窗口,計算邏輯和之前一樣;在 when 方面,為了可以實時地得到最新的計算結果,我們需要定義每分鐘輸出一次最新的計算結果,直到達到 24 小時后輸出最終結果;而在 how 方面,我們每次的輸出結果只需要覆蓋之前的結果即可。然而 when 的問題并沒有這么簡單。還記得我們之前說過數據采集延遲嗎?可能一個用戶充值的時間在 16:00,但中間采集的延遲可能有 1 min,導致到達服務器卻是 16:01 分,如果基于充值記錄被處理的時間(即 processing time)來進行窗口劃分,用戶充值記錄可能會被計入錯誤的窗口,所以我們應該以用戶充值這個時間(即 event time)發生的時間為準。這里的難點在于我們計算時并不能判斷所有 event time 窗口內的數據被收集完,因為數據的延遲是不可預知的,這被稱為窗口完整性問題。針對窗口完整性問題,The Dataflow Model 提出了 Watermark 的解決方案。
Watermark 原理解析
Watermark 并沒有很正式的官方定義,最接近定義的是 Streaming 102 [3] 里的一段描述。
A watermark is a notion of input completeness with respect to event times. A watermark with a value of time X makes the statement: “all input data with event times less than X have been observed.” As such, watermarks act as a metric of progress when observing an unbounded data source with no known end.
簡單來說 Watermark 是一個時間戳,表示已經收集完畢的數據的最大 event time,換句話說 event time 小于 Watermark 的數據不應該再出現,基于這個前提我們才有可能將 event time 窗口視為完整并輸出結果。Watermark 設計的初衷是處理 event time 和 processing time 之間的延遲問題,三者的關系可以用下圖展示:

理想的情況下數據沒有延遲,因此 processing time 是等于 event time 的,理想的 Watermark 應該是斜率為 45 度的直線。然而在真實環境下,processing time 和 event time 之間總有不確定的延遲,表現出來的 Watermark 會類似圖 1 中的紅色的曲線。其中紅色曲線與理想 Watermark 的縱坐標差值稱為 processing-time lag,表示在真實世界中的數據延遲,而橫坐標的差值表示 event-time skew,表示該延遲帶來的 event-time 落后量。
Watermark 通常是基于已經觀察到的數據的 event time 來判斷(當然也可以引入 processing time 或者其他外部參數),具體需要用戶根據數據流的 event time 特征來決定,比如最簡單的算法就是取目前為止觀察到的最大 event time。在數據流真實 event time 曲線是單調非減的情況下,比如 event time 是 Kafka producer timestamp 時,我們是可以計算出完美符合實際的 Watermark 的,然而絕大多數情況下數據流的 event time 都是亂序的,因此計算完美的 Watermark 是不現實的(實際上也是沒有必要的),通常我們會以啟發性的 Watermark 算法來代替。
啟發性的 Watermark 算法目的在于在計算結果的延遲和準確性之間找到平衡點。如果采用激進的 Watermark 算法,那么 Watermark 會快于真實的 event time,導致在窗口數據還不完整的情況下過早輸地出計算結果,影響數據的準確性;如果采用保守的 Watermark 算法,那么 Watermark 會落后于真實的 event time,導致窗口數據收集完整后不能及時輸出計算結果,造成數據的延遲。實際上上文所說的 Watermark 取觀察到的最大 event time 和批處理使用的設置一個足夠大的安全延遲的辦法分別就屬于 Watermark 算法的兩個極端。很多情況下用戶偏向于犧牲一定的延時來換取準確性,不過在像金融行業的欺詐檢測場景中,低延遲是首要的,否則準確性再高也沒有意義。針對這種情況 The Dataflow Model 提供了 allow lateness 的機制,工作的原理是用戶可以設置一個時間閾值,如果在計算結果輸出后的這個閾值時間內發現遲到的數據,計算結果會被重新計算和輸出,但如果超出這個閾值的遲到數據就會被丟棄。
這時你們可以看到要開發一個高質量的實時作業是多么不易了,這也是很多實時應用開發者最為頭疼的地方,或許以后利用機器學習去計算 Watermark 是個不錯的主意(然后我們的工作就可以愉快地從調 Watermark 算法參數變為調機器學習模型參數了 :) )。
Watermark 實踐
接下來我們將結合工業生產的案例來說明實戰中 Watermark 是如何影響流計算的。Watermark 在不同計算引擎的實現并不相同,本文將以筆者使用最多的 Apache Flink (下文簡稱 Flink)作為例子來說明。
對于游戲行業來說,游戲的日活躍玩家數是個很常見的指標,游戲策劃或者運營通常可以根據日活躍玩家數的變動來實時地監控某個活動是否收到玩家歡迎的程度,但是游戲可能有海外服務器,數據收集的延遲可能差別較大,造成數據流 event time 亂序比較嚴重,在這種情況下設計 Watermark 算法是個比較大的挑戰。
假設我們有 A、B、C 共 3 臺服務器,其中 A、B 為國內服務器,延遲較低且穩定,而 C 為海外服務器,延遲較高且不穩定,而我們需要計算每分鐘內的登錄玩家數。

我們現在面臨兩種可能帶來 event time 亂序的因素:一是不同服務器間的延遲不同,比如可能先收到服務器 A 在 t2 的數據,再收到服務 C 在 t1 的數據;二是同一服務器的不同數據的延遲不同,比如可能先收到服務器 C t2 的數據再收到 t1 的數據。針對第二種因素,我們可以對不同服務器的數據分別計算 Watermark,再取其中的最小值作為 Watermark,而針對第一種因素,我們則需要設計出針對單個服務器數據流的合理 Watermark 算法。
在算法實現上,Flink 提供兩種觸發 Watermark 更新的方法,即在收到特殊的消息時觸發或者定時觸發,我們這里將選用定時觸發的方法。因為窗口是一分鐘比較小,我們這里將定時的間隔設為 5 秒,也就是說 Watermark 大約落后真實 Watermark 5 秒,然后這 5 秒內 Watermark 是不會提升的,所以可以容忍局部的 processing lag。
我們試著取目前為止觀察到的最大時間戳作為 Watermark,那么 Watermark 的效果如下(為了在消費端更加直觀,我們將坐標系調轉,現在 x 軸表示 processing time)。

其中 t0-t3 分別表示 Watermark 提升的時間點,黃虛線表示在一個 Watermark 周期內的最大 event time,紅線表示 Watermark。可以看到在 t0-t1 的 Watermark 周期內出現了輕微的 event time 亂序,但是并不影響計算的準確性。接下來在 t1-t2 和 t2-t3 兩個周期間也發生了相似的亂序,但是這個亂序并不在同一個 Watermark 周期,因此導致正常延遲的數據被誤認為是遲到數據。解決方法是引入一定可容忍的 event time skew,比如說最簡單的設置一個 skew 閾值,即每次計算 Watermark 的結果都減去這個值。根據數據流延遲的不同,我們還可以給不同服務器設置不同的 skew 閾值。
上述 Watermark 算法代碼如下:
public class WatermarkProcessor implements AssignerWithPeriodicWatermarks<UserLogin> {
private static final long ALLOWED_EVENT_TIME_SKEW = 1000L;
private static final Map<String, Long> maxTimestampPerServer = new HashMap<>(3);
@Nullable
public Watermark getCurrentWatermark() {
Optional<Long> maxTimestamp = maxTimestampPerServer.values().stream()
.min(Comparator.comparingLong(Long::valueOf));
if (maxTimestamp.isPresent()) {
return new Watermark(maxTimestamp.get() - ALLOWED_EVENT_TIME_SKEW);
} else{
return null;
}
}
public long extractTimestamp(UserLogin userLogin, long previousElementTimestamp) {
String server = userLogin.getServer();
long eventTime = userLogin.getEventTime();
if (!maxTimestampPerServer.containsKey(server) ||
userLogin.getEventTime() > maxTimestampPerServer.get(server)) {
maxTimestampPerServer.put(server, eventTime);
}
return eventTime;
}
}
總結
流計算和批處理誰是表達能力更強的計算模式,這個問題或許還將繼續被爭論下去,不過根據 The Dataflow Model 我們已經有足夠的理論支撐來開發低延遲高準確并且可容錯的流計算應用。其中流計算的準確性很大程度上決定于數據流時間的亂序程度,因此我們在開發實時流計算應用時,比起開發離線批處理應用,很大的一個不同是要考慮數據是以什么順序到達,并針對性地設計 Watermark 算法來處理數據流時間的亂序。Watermark 算法需要平衡低延遲和高準確性兩者,在引入最低延遲成本的情況下準確判斷窗口的計算和輸出結果的時機,通常可以從 processing lag 和 event time skew 兩者的容忍閾值入手。