今天我們花了一些時間討論 Kafka 提供的冷門功能:攔截器。如之前所說,攔截器的出場率極低,以至于我從未看到過國內大廠實際應用 Kafka 攔截器的報道。但冷門不代表沒用。事實上,我們可以利用攔截器滿足實際的需求,比如端到端系統性能檢測、消息審計等。?
?既然是不常見,那就說明在實際場景中并沒有太高的出場率,但它們依然是很高級很實用的。下面就有請今天的主角登場:Kafka 攔截器。
什么是攔截器?
如果你用過 Spring Interceptor 或是 Apache Flume,那么應該不會對攔截器這個概念感到陌生,其基本思想就是允許應用程序在不修改邏輯的情況下,動態地實現一組可插拔的事件處理邏輯鏈。它能夠在主業務操作的前后多個時間點上插入對應的“攔截”邏輯。下面這張圖展示了 Spring MVC 攔截器的工作原理:
圖片來源:https://o7planning.org/en/11229/spring-mvc-interceptors-tutorial
攔截器 1 和攔截器 2 分別在請求發送之前、發送之后以及完成之后三個地方插入了對應的處理邏輯。而 Flume 中的攔截器也是同理,它們插入的邏輯可以是修改待發送的消息,也可以是創建新的消息,甚至是丟棄消息。這些功能都是以配置攔截器類的方式動態插入到應用程序中的,故可以快速地切換不同的攔截器而不影響主程序邏輯。
Kafka 攔截器借鑒了這樣的設計思路。你可以在消息處理的前后多個時點動態植入不同的處理邏輯,比如在消息發送前或者在消息被消費后。
作為一個非常小眾的功能,Kafka 攔截器自 0.10.0.0 版本被引入后并未得到太多的實際應用,我也從未在任何 Kafka 技術峰會上看到有公司分享其使用攔截器的成功案例。但即便如此,在自己的 Kafka 工具箱中放入這么一個有用的東西依然是值得的。今天我們就讓它來發揮威力,展示一些非常酷炫的功能。
Kafka 攔截器
Kafka 攔截器分為生產者攔截器和消費者攔截器。生產者攔截器允許你在發送消息前以及消息提交成功后植入你的攔截器邏輯;而消費者攔截器支持在消費消息前以及提交位移后編寫特定邏輯。值得一提的是,這兩種攔截器都支持鏈的方式,即你可以將一組攔截器串連成一個大的攔截器,Kafka 會按照添加順序依次執行攔截器邏輯。
舉個例子,假設你想在生產消息前執行兩個“前置動作”:第一個是為消息增加一個頭信息,封裝發送該消息的時間,第二個是更新發送消息數字段,那么當你將這兩個攔截器串聯在一起統一指定給 Producer 后,Producer 會按順序執行上面的動作,然后再發送消息。
當前 Kafka 攔截器的設置方法是通過參數配置完成的。生產者和消費者兩端有一個相同的參數,名字叫 interceptor.classes,它指定的是一組類的列表,每個類就是特定邏輯的攔截器實現類。拿上面的例子來說,假設第一個攔截器的完整類路徑是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個類是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定攔截器:
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
現在問題來了,我們應該怎么編寫 AddTimeStampInterceptor 和 UpdateCounterInterceptor 類呢?其實很簡單,這兩個類以及你自己編寫的所有 Producer 端攔截器實現類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor 接口。該接口是 Kafka 提供的,里面有兩個核心的方法。
- onSend:該方法會在消息發送之前被調用。如果你想在發送之前對消息“美美容”,這個方法是你唯一的機會。
- onAcknowledgement:該方法會在消息成功提交或發送失敗之后被調用。還記得我在上一期中提到的發送回調通知 callback 嗎?onAcknowledgement 的調用要早于 callback 的調用。值得注意的是,這個方法和 onSend 不是在同一個線程中被調用的,因此如果你在這兩個方法中調用了某個共享可變對象,一定要保證線程安全哦。還有一點很重要,這個方法處在 Producer 發送的主路徑中,所以最好別放一些太重的邏輯進去,否則你會發現你的 Producer TPS 直線下降。
同理,指定消費者攔截器也是同樣的方法,只是具體的實現類要實現org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,這里面也有兩個核心方法。
- onConsume:該方法在消息返回給 Consumer 程序之前調用。也就是說在開始正式處理消息之前,攔截器會先攔一道,搞一些事情,之后再返回給你。
- onCommit:Consumer 在提交位移之后調用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。
一定要注意的是,指定攔截器類時要指定它們的全限定名,即 full qualified name。通俗點說就是要把完整包名也加上,不要只有一個類名在那里,并且還要保證你的 Producer 程序能夠正確加載你的攔截器類。
典型使用場景
Kafka 攔截器都能用在哪些地方呢?其實,跟很多攔截器的用法相同,Kafka 攔截器可以應用于包括客戶端監控、端到端系統性能檢測、消息審計等多種功能在內的場景。
我以端到端系統性能檢測和消息審計為例來展開介紹下。
今天 Kafka 默認提供的監控指標都是針對單個客戶端或 Broker 的,你很難從具體的消息維度去追蹤集群間消息的流轉路徑。同時,如何監控一條消息從生產到最后消費的端到端延時也是很多 Kafka 用戶迫切需要解決的問題。
從技術上來說,我們可以在客戶端程序中增加這樣的統計邏輯,但是對于那些將 Kafka 作為企業級基礎架構的公司來說,在應用代碼中編寫統一的監控邏輯其實是很難的,畢竟這東西非常靈活,不太可能提前確定好所有的計算邏輯。另外,將監控邏輯與主業務邏輯耦合也是軟件工程中不提倡的做法。
現在,通過實現攔截器的邏輯以及可插拔的機制,我們能夠快速地觀測、驗證以及監控集群間的客戶端性能指標,特別是能夠從具體的消息層面上去收集這些數據。這就是 Kafka 攔截器的一個非常典型的使用場景。
我們再來看看消息審計(message audit)的場景。設想你的公司把 Kafka 作為一個私有云消息引擎平臺向全公司提供服務,這必然要涉及多租戶以及消息審計的功能。
作為私有云的 PaaS 提供方,你肯定要能夠隨時查看每條消息是哪個業務方在什么時間發布的,之后又被哪些業務方在什么時刻消費。一個可行的做法就是你編寫一個攔截器類,實現相應的消息審計邏輯,然后強行規定所有接入你的 Kafka 服務的客戶端程序必須設置該攔截器。
案例分享
下面我以一個具體的案例來說明一下攔截器的使用。在這個案例中,我們通過編寫攔截器類來統計消息端到端處理的延時,非常實用,我建議你可以直接移植到你自己的生產環境中。
我曾經給一個公司做 Kafka 培訓,在培訓過程中,那個公司的人提出了一個訴求。他們的場景很簡單,某個業務只有一個 Producer 和一個 Consumer,他們想知道該業務消息從被生產出來到最后被消費的平均總時長是多少,但是目前 Kafka 并沒有提供這種端到端的延時統計。
學習了攔截器之后,我們現在知道可以用攔截器來滿足這個需求。既然是要計算總延時,那么一定要有個公共的地方來保存它,并且這個公共的地方還是要讓生產者和消費者程序都能訪問的。在這個例子中,我們假設數據被保存在 redis 中。
Okay,這個需求顯然要實現生產者攔截器,也要實現消費者攔截器。我們先來實現前者:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<JAVA.lang.String, ?> configs) {
}
上面的代碼比較關鍵的是在發送消息前更新總的已發送消息數。為了節省時間,我沒有考慮發送失敗的情況,因為發送失敗可能導致總發送數不準確。不過好在處理思路是相同的,你可以有針對性地調整下代碼邏輯。
下面是消費者端的攔截器實現,代碼如下:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; // 省略 Jedis 初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
在上面的消費者攔截器中,我們在真正消費一批消息前首先更新了它們的總延時,方法就是用當前的時鐘時間減去封裝在消息中的創建時間,然后累計得到這批消息總的端到端處理延時并更新到 Redis 中。之后的邏輯就很簡單了,我們分別從 Redis 中讀取更新過的總延時和總消息數,兩者相除即得到端到端消息的平均處理延時。
創建好生產者和消費者攔截器后,我們按照上面指定的方法分別將它們配置到各自的 Producer 和 Consumer 程序中,這樣就能計算消息從 Producer 端到 Consumer 端平均的處理延時了。這種端到端的指標監控能夠從全局角度俯察和審視業務運行情況,及時查看業務是否滿足端到端的 SLA 目標。
小結
今天我們花了一些時間討論 Kafka 提供的冷門功能:攔截器。如之前所說,攔截器的出場率極低,以至于我從未看到過國內大廠實際應用 Kafka 攔截器的報道。但冷門不代表沒用。事實上,我們可以利用攔截器滿足實際的需求,比如端到端系統性能檢測、消息審計等。?