Apache Kafka 中用于錯誤處理的死信隊列:來自 Uber 和 Crowdstrike 的替代方案、最佳實踐和案例研究。
識別和處理錯誤對于任何可靠的數據流管道都是必不可少的。這篇博文探討了 在 Apache Kafka 基礎架構中使用死信隊列實現錯誤處理的最佳實踐。這些選項包括自定義實現、Kafka Streams、Kafka Connect、Spring 框架和并行消費者。真實案例研究展示了 Uber、CrowdStrike 和桑坦德銀行如何以極端規模構建可靠的實時錯誤處理。
Apache Kafka 成為許多企業架構最喜歡的集成中間件。即使對于云優先戰略,企業也可以利用 Kafka 的數據流作為云原生集成平臺即服務 (iPaaS)。
Apache Kafka 數據流中的消息隊列模式
在我開始這篇文章之前,我想讓你知道這個內容是關于“JMS、消息隊列和 Apache Kafka”的博客系列的一部分:
- JMS 消息代理與 Apache Kafka 數據流的10 個比較標準
- 這篇文章– 通過Apache Kafka 中的死信隊列 (DQL)進行錯誤處理的替代方法
- 使用 Apache Kafka實現請求-回復模式
- 即將推出——用于選擇正確消息系統的決策樹(JMS 與 Apache Kafka)
- 即將推出——從 JMS 消息代理到 Apache Kafka:集成、遷移和/或替換
死信隊列 (DLQ)是消息系統或數據流平臺內的一種服務實現,用于存儲未成功處理的消息。系統不是被動地轉儲消息,而是將其移動到死信隊列。
企業集成模式 (EIP)改為調用設計模式死信通道。我們可以將兩者用作同義詞。
本文重點介紹數據流平臺 Apache Kafka。在 Kafka 中將消息放入 DLQ 的主要原因通常是消息格式錯誤或消息內容無效/缺失。例如,如果預期值是整數,但生產者發送了字符串,則會發生應用程序錯誤。在更動態的環境中,“主題不存在”異常可能是無法傳遞消息的另一個錯誤。
因此,通常不要使用現有中間件經驗中的知識。Message Queue 中間件(如符合 JMS 的 IBM MQ、TIBCO EMS 或 RabbitMQ)與分布式提交日志(如 Kafka)的工作方式不同。由于許多其他原因,消息隊列中的 DLQ 用于消息隊列系統,這些原因不能一對一地映射到 Kafka。例如,MQ 系統中的消息由于每條消息的 TTL(生存時間)而過期。
因此,在 Kafka 中將消息放入 DLQ 的主要原因是消息格式錯誤或消息內容無效/缺失。
Apache Kafka 中死信隊列的替代方案
Kafka 中的死信隊列是一個或多個 Kafka 主題,它們接收和存儲由于錯誤而無法在另一個流管道中處理的消息。此概念允許使用以下傳入消息繼續消息流,而不會由于無效消息的錯誤而停止工作流。
Kafka Broker 很笨——智能端點提供錯誤處理
Kafka 架構不支持 broker r 中的DLQ。有意地,Kafka 建立在與現代微服務相同的原則上,使用“啞管道和智能端點”原則。這就是為什么與傳統消息代理相比,Kafka 的擴展性如此之好。過濾和錯誤處理發生在客戶端應用程序中。
數據流平臺的真正解耦可以實現更干凈的領域驅動設計。每個微服務或應用程序都通過自己選擇的技術、通信范式和錯誤處理來實現其邏輯。
在傳統的中間件和消息隊列中,代理提供了這種邏輯。結果是域中的可擴展性和靈活性較差,因為只有中間件團隊才能實現集成邏輯。
用任何編程語言自定義實現 Kafka 死信隊列
Kafka 中的死信隊列獨立于您使用的框架。一些組件為錯誤處理和死信隊列提供了開箱即用的功能。但是,使用JAVA、Go、C++、Python/ target=_blank class=infotextkey>Python 等任何編程語言為 Kafka 應用程序編寫死信隊列邏輯也很容易。
死信隊列實現的源代碼包含一個 try-catch 塊來處理預期或意外異常。如果沒有發生錯誤,則處理該消息。如果發生任何異常,請將消息發送到專用的 DLQ Kafka 主題。
失敗原因應添加到 Kafka 消息的標頭中。不應更改鍵和值,以便將來對歷史事件進行重新處理和故障分析。
死信隊列的開箱即用 Kafka 實現
你并不總是需要實現你的死信隊列。許多組件和框架已經提供了它們的 DLQ 實現。
使用您自己的應用程序,您通常可以控制錯誤或在出現錯誤時修復代碼。但是,與 3rd 方應用程序的集成并不一定允許您處理可能跨集成障礙引入的錯誤。因此,DLQ 變得更加重要,并被包含在某些框架中。
Kafka Connect 內置死信隊列
Kafka Connect 是 Kafka 的集成框架。它包含在開源 Kafka 下載中。不需要其他依賴項(除了您部署到 Connect 集群中的連接器本身)。
默認情況下,如果由于使用無效消息而發生錯誤(例如使用錯誤的 JSON 轉換器而不是正確的 AVRO 轉換器時),Kafka Connect 任務將停止。刪除無效消息是另一種選擇。后者容忍錯誤。
Kafka Connect 中 DLQ 的配置很簡單。只需將兩個配置選項 ' errors.tolerance' 和 ' errors.deadletterqueue.topic.name' 的值設置為正確的值:
博客文章“ Kafka Connect Deep Dive – 錯誤處理和死信隊列”顯示了使用 DLQ 的詳細動手代碼示例。
Kafka Connect 甚至可以用于處理 DLQ 中的錯誤消息。只需部署另一個使用 te DLQ 主題的連接器。例如,如果您的應用程序處理 Avro 消息并且傳入消息是 JSON 格式。然后連接器使用 JSON 消息并將其轉換為 AVRO 消息以成功重新處理:
請注意,Kafka Connect 沒有用于源連接器的死信隊列。
Kafka Streams 應用程序中的錯誤處理
Kafka Streams 是 Kafka 的流處理庫。它可與其他流式傳輸框架相媲美,例如 Apache Flink、Storm、Beam 和類似工具。但是,它是 Kafka 原生的。這意味著您可以在單個可擴展且可靠的基礎架構中構建完整的端到端數據流。
如果您分別使用 Java(JVM 生態系統)來構建 Kafka 應用程序,建議幾乎總是使用 Kafka Streams 而不是 Kafka 的標準 Java 客戶端。為什么?
- Kafka Streams“只是”一個圍繞常規 Java 生產者和消費者 API 的包裝器,以及許多內置的附加功能。
- 兩者都只是嵌入到 Java 應用程序中的庫(JAR 文件)。
- 兩者都是開源 Kafka 下載的一部分 - 沒有額外的依賴項或許可證更改。
- 許多問題已經開箱即用地解決,以構建成熟的流處理服務(流功能、有狀態的嵌入式存儲、滑動窗口、交互式查詢、錯誤處理等等)。
Kafka Streams的內置功能之一是默認的反序列化異常處理程序。它允許您管理無法反序列化的記錄異常。損壞的數據、不正確的序列化邏輯或未處理的記錄類型都可能導致錯誤。該功能不稱為死信隊列,但開箱即用地解決了相同的問題。
Spring Kafka 和 Spring Cloud Stream 的錯誤處理
Spring 框架對 Apache Kafka 有很好的支持。它提供了許多模板以避免自己編寫樣板代碼。Spring-Kafka 和 Spring Cloud Stream Kafka 支持各種重試和錯誤處理選項,包括基于時間/計數的重試、死信隊列等。
盡管 Spring 框架功能非常豐富,但它有點重,并且有一個學習曲線。因此,它非常適合新建項目,或者如果您已經將 Spring 用于其他場景的項目。
有很多很棒的博客文章展示了不同的示例和配置選項。還有用于死信隊列的官方 Spring Cloud Stream 示例。Spring 允許使用簡單的注釋構建邏輯,例如 DLQ。這種編程方法是一些開發人員鐘愛的范例,而另一些則不喜歡它。只需了解選項并為自己選擇合適的選項即可。
Apache Kafka 并行消費者的可擴展處理和錯誤處理
在許多客戶對話中,事實證明,請求死信隊列的主要原因通常是處理連接到外部 Web 服務或數據庫的失敗。超時或 Kafka 無法并行發送各種請求會導致某些應用程序癱瘓。這個問題有一個很好的解決方案:
Apache Kafka的并行消費者是Apache 2.0 許可下的開源項目。它提供了一個帶有客戶端隊列的并行 Apache Kafka 客戶端包裝器、一個具有關鍵并發性的更簡單的消費者/生產者 API,以及可擴展的非阻塞 IO處理。
該庫允許您通過單個 Kafka Consumer 并行處理消息,這意味著您可以在不增加要處理的主題中的分區數量的情況下增加 Kafka Consumer 并行度。對于許多用例,這通過減少 Kafka 代理的負載來提高吞吐量和延遲。它還開辟了新的用例,例如極端并行性、外部數據豐富和排隊。
一個關鍵特性是在單個 Kafka 消費者應用程序中處理/重復 Web 服務和數據庫調用。并行化避免了一次發送單個 Web 請求的需要:
Parallel Consumer 客戶端具有強大的重試邏輯。這包括可配置的延遲和動態錯誤或處理。錯誤也可以發送到死信隊列。
使用死信隊列中的消息
將錯誤發送到死信隊列后,您還沒有完成!壞消息需要被處理或至少被監控!
死信隊列是從事件處理中帶外處理數據錯誤處理的絕佳方式,這意味著錯誤處理程序可以與事件處理代碼分開創建或演變。
存在大量使用死信隊列的錯誤處理策略。DO 和 DONT 探索最佳實踐和經驗教訓。
錯誤處理策略
有幾個選項可用于處理存儲在死信隊列中的消息:
- 重新處理:DLQ中的一些消息需要重新處理。但是,首先,需要解決這個問題。解決方案可以是自動腳本、編輯消息的人工交互,或向生產者返回錯誤,要求重新發送(更正的)消息。
- 刪除錯誤消息(經過進一步分析):根據您的設置,可能會出現錯誤消息。但是,在刪除它們之前,業務流程應該檢查它們。例如,儀表板應用程序可以使用錯誤消息并將它們可視化。
- 高級分析:另一種選擇是分析傳入數據以獲取實時洞察或問題,而不是處理 DLQ 中的每條消息。例如,一個簡單的 ksqlDB 應用程序可以應用流處理進行計算,例如每小時錯誤消息的平均數量或任何其他有助于確定 Kafka 應用程序中的錯誤的見解。
- 停止工作流:如果很少會出現壞消息,結果可能是停止整個業務流程。該動作可以是自動的,也可以由人決定。當然,停止工作流也可以在拋出錯誤的 Kafka 應用程序中完成。如果需要,DLQ 將問題和決策外部化。
- 忽略:這聽起來可能是最糟糕的選擇。只是讓死信隊列填滿,什么都不做。然而,即使這樣在某些用例中也很好,比如監控 Kafka 應用程序的整體行為。請記住,Kafka 主題具有保留時間,并且在該時間之后從主題中刪除消息。只需為您設置正確的方式即可。并監控 DQL 主題是否存在意外行為(例如填充太快)。
以下是在 Kafka 應用程序中使用死信隊列進行錯誤處理的一些最佳實踐和經驗教訓:
- 定義處理無效消息的業務流程(自動與人工)
- 現實:通常,根本沒有人處理 DLQ 消息
- 備選方案 1:數據所有者需要接收警報,而不僅僅是基礎架構團隊
- 備選方案 2:警報應通知記錄團隊系統數據錯誤,他們將需要從記錄系統重新發送/修復數據。
- 如果沒有人關心或抱怨,請考慮質疑和審查 DLQ 存在的必要性。相反,這些消息也可以在初始 Kafka 應用程序中被忽略。這節省了大量的網絡負載、基礎設施和資金。
- 構建帶有適當警報的儀表板并整合相關團隊(例如,通過電子郵件或 Slack 警報)
- 定義每個 Kafka 主題的錯誤處理優先級(停止、刪除和重新處理)
- 僅將不可重試的錯誤消息推送到 DLQ - 連接問題是消費者應用程序的責任。
- 保留原始消息并將它們存儲在 DLQ 中(帶有額外的標頭,例如錯誤消息、錯誤時間、發生錯誤的應用程序名稱等)——這使得重新處理和故障排除變得更加容易。
- 想想你需要多少 Dead Letter Queue Kafka 主題。總是有取舍。但是將所有錯誤存儲在單個 DLQ 中可能對進一步分析和重新處理沒有意義。
請記住,DLQ 會以有保證的順序終止處理,并使任何類型的離線處理變得更加困難。因此,Kafka DQL 并不適合每個用例。
何時不在 Kafka 中使用死信隊列?
讓我們探討一下不應該將哪些類型的消息放入 Kafka 的死信隊列中:
- DLQ 用于背壓處理?由于大量消息的峰值而使用 DLQ 進行節流并不是一個好主意。Kafka 日志背后的存儲會自動處理背壓。消費者以它可以按自己的速度獲取數據的方式提取數據(或者配置錯誤)。如果可能的話,彈性地擴展消費者。即使您的存儲空間已滿,DLQ 也無濟于事。這是它的問題,與是否使用 DLQ 無關。
- 連接失敗的DLQ?由于連接失敗而將消息放入 DQL 無濟于事(即使在多次重試之后)。以下消息也無法連接到該系統。您需要解決連接問題。消息可以根據需要存儲在常規主題中(取決于保留時間)。
最后但同樣重要的是,讓我們探討在某些情況下減少甚至消除對死信隊列的需求的可能性。
卡夫卡的Schema Registry是一種確保數據清理以防止生產者在負載中出錯的方法。它在 Kafka 生產者中強制執行正確的消息結構:
模式注冊表是模式的客戶端檢查。Confluent Server 等一些實現在代理端提供了額外的模式檢查,以拒絕來自未使用模式注冊表的生產者的無效或惡意消息。
Kafka 死信隊列的案例研究
讓我們看看Uber、CrowdStrike 和 Santander Bank 的三個案例研究,它們在 Kafka 基礎設施中實際部署死信隊列。請記住,這些都是非常成熟的例子。不是每個項目都需要那么復雜。
Uber - 構建可靠的再處理和死信隊列
在分布式系統中,重試是不可避免的。從網絡錯誤到復制問題,甚至下游依賴關系的中斷,大規模運行的服務必須準備好盡可能優雅地遇到、識別和處理故障。
鑒于 Uber 的運營范圍和速度,它的系統必須具有容錯能力,并且在智能失敗時毫不妥協。Uber 將 Apache Kafka 用于各種極端規模的用例以實現這一目標。
利用這些特性,Uber 保險工程團隊擴展了 Kafka 在其現有事件驅動架構中的作用,通過使用 n個阻塞請求重新處理和死信隊列來實現解耦、可觀察的錯誤處理,而不會中斷實時流量。該策略有助于他們選擇加入的駕駛員傷害保護計劃在 200 多個城市可靠運行,并為注冊駕駛員扣除每次行程的每英里保費。
這是 Uber 錯誤處理的示例。錯誤會降低重試主題的級別,直到登陸 DLQ:
有關更多信息,請閱讀 Uber 非常詳細的技術文章:“使用 Apache Kafka 構建可靠的再處理和死信隊列”。
CrowdStrike - 處理數萬億事件的錯誤
CrowdStrike 是一家位于德克薩斯州奧斯汀的網絡安全技術公司。它提供云工作負載和端點安全、威脅情報和網絡攻擊響應服務。
CrowdStrike 的基礎設施 每天使用 Apache Kafka 處理數萬億個事件。在我的“ Apache Kaka 網絡安全博客系列”中,我介紹了以任何規模實時創建態勢感知和威脅情報的相關用例。
CrowdStrike 定義了三個最佳實踐 來成功實現死信隊列和錯誤處理:
- 在正確的系統中存儲錯誤消息:定義基礎設施和代碼以捕獲和檢索死信。CrowdStrike 使用 S3 對象存儲來存儲潛在的大量錯誤消息。請注意,Kafka 的分層存儲開箱即用地解決了這個問題,無需其他存儲接口(例如,利用 Confluent Cloud 中的無限存儲)。
- 使用自動化:放置工具以使修復萬無一失,因為手動完成錯誤處理可能非常容易出錯。
- 記錄業務流程并聘請相關團隊:標準化和記錄流程以確保易于使用。并非所有工程師都熟悉組織處理死信消息的策略。
在像 CrowdStrike 這樣的網絡安全平臺中,大規模實時數據處理至關重要。此要求也適用于錯誤處理。下一次網絡攻擊可能是故意包含不適當或無效內容的惡意消息(如 JavaScript 漏洞利用)。因此,必須通過死信隊列實時處理錯誤。
桑坦德銀行 - 用于重試和 DLQ 組合的郵箱 2.0
桑坦德銀行在郵箱應用程序中處理海量數據的同步數據處理面臨巨大挑戰。他們重新架構了他們的基礎架構并構建了一個解耦且可擴展的架構,稱為“Santander Mailbox 2.0”。
Santander 的工作負載并轉移到由 Apache Kafka 提供支持的事件溯源:
新的基于異步事件的架構中的一個關鍵挑戰是錯誤處理。 Santander 使用重試和 DQL Kafka 主題構建的錯誤處理解決了這些問題:
查看來自 Santander 的集成合作伙伴 Consdata的 Kafka 峰會演講“基于重試策略和死信主題的 Apache Kafka 中的可靠事件傳遞”中的詳細信息。
Apache Kafka 中可靠且可擴展的錯誤處理
錯誤處理對于構建可靠的數據流管道和平臺至關重要。存在不同的替代方案來解決這個問題。該解決方案包括死信隊列的自定義實現或利用正在使用的框架,例如 Kafka Streams、Kafka Connect、Spring 框架或 Kafka 的并行消費者。
優步、CrowdStrike 和桑坦德銀行的案例研究表明,錯誤處理并不總是很容易實現。當您設計新的應用程序或架構時,需要從一開始就考慮到這一點。使用 Apache Kafka 進行實時數據流傳輸很有吸引力,但只有在您能夠處理意外行為時才能成功。死信隊列是許多場景的絕佳選擇。