【51CTO.com原創(chuàng)稿件】本文主要講解 Kafka 是什么、Kafka 的架構(gòu)包括工作流程和存儲(chǔ)機(jī)制,以及生產(chǎn)者和消費(fèi)者。

圖片來自 Pexels
最終大家會(huì)掌握 Kafka 中最重要的概念,分別是 Broker、Producer、Consumer、Consumer Group、Topic、Partition、Replica、Leader、Follower,這是學(xué)會(huì)和理解 Kafka 的基礎(chǔ)和必備內(nèi)容。
定義
Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用與大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
消息隊(duì)列
Kafka 本質(zhì)上是一個(gè) MQ(Message Queue),使用消息隊(duì)列的好處?(面試會(huì)問)
- 解耦:允許我們獨(dú)立的擴(kuò)展或修改隊(duì)列兩邊的處理過程。
- 可恢復(fù)性:即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
- 緩沖:有助于解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
- 靈活性&峰值處理能力:不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰,消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力。
- 異步通信:消息隊(duì)列允許用戶把消息放入隊(duì)列但不立即處理它。
發(fā)布/訂閱模式

一對(duì)多,生產(chǎn)者將消息發(fā)布到 Topic 中,有多個(gè)消費(fèi)者訂閱該主題,發(fā)布到 Topic 的消息會(huì)被所有訂閱者消費(fèi),被消費(fèi)的數(shù)據(jù)不會(huì)立即從 Topic 清除。
架構(gòu)

Kafka 存儲(chǔ)的消息來自任意多被稱為 Producer 生產(chǎn)者的進(jìn)程。數(shù)據(jù)從而可以被發(fā)布到不同的 Topic 主題下的不同 Partition 分區(qū)。
在一個(gè)分區(qū)內(nèi),這些消息被索引并連同時(shí)間戳存儲(chǔ)在一起。其它被稱為 Consumer 消費(fèi)者的進(jìn)程可以從分區(qū)訂閱消息。
Kafka 運(yùn)行在一個(gè)由一臺(tái)或多臺(tái)服務(wù)器組成的集群上,并且分區(qū)可以跨集群結(jié)點(diǎn)分布。
下面給出 Kafka 一些重要概念,讓大家對(duì) Kafka 有個(gè)整體的認(rèn)識(shí)和感知,后面還會(huì)詳細(xì)的解析每一個(gè)概念的作用以及更深入的原理:
- Producer: 消息生產(chǎn)者,向 Kafka Broker 發(fā)消息的客戶端。
- Consumer:消息消費(fèi)者,從 Kafka Broker 取消息的客戶端。
- Consumer Group:消費(fèi)者組(CG),消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),提高消費(fèi)能力。一個(gè)分區(qū)只能由組內(nèi)一個(gè)消費(fèi)者消費(fèi),消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
- Broker:一臺(tái) Kafka 機(jī)器就是一個(gè) Broker。一個(gè)集群由多個(gè) Broker 組成。一個(gè) Broker 可以容納多個(gè) Topic。
- Topic:可以理解為一個(gè)隊(duì)列,Topic 將消息分類,生產(chǎn)者和消費(fèi)者面向的是同一個(gè) Topic。
- Partition:為了實(shí)現(xiàn)擴(kuò)展性,提高并發(fā)能力,一個(gè)非常大的 Topic 可以分布到多個(gè) Broker (即服務(wù)器)上,一個(gè) Topic 可以分為多個(gè) Partition,每個(gè) Partition 是一個(gè) 有序的隊(duì)列。
- Replica:副本,為實(shí)現(xiàn)備份的功能,保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 Partition 數(shù)據(jù)不丟失,且 Kafka 仍然能夠繼續(xù)工作,Kafka 提供了副本機(jī)制,一個(gè) Topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè) Follower。
- Leader:每個(gè)分區(qū)多個(gè)副本的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象,都是 Leader。
- Follower:每個(gè)分區(qū)多個(gè)副本的“從”副本,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 還會(huì)成為新的 Leader。
- Offset:消費(fèi)者消費(fèi)的位置信息,監(jiān)控?cái)?shù)據(jù)消費(fèi)到什么位置,當(dāng)消費(fèi)者掛掉再重新恢復(fù)的時(shí)候,可以從消費(fèi)位置繼續(xù)消費(fèi)。
- Zookeeper:Kafka 集群能夠正常工作,需要依賴于 Zookeeper,Zookeeper 幫助 Kafka 存儲(chǔ)和管理集群信息。
工作流程
Kafka集群將 Record 流存儲(chǔ)在稱為 Topic 的類別中,每個(gè)記錄由一個(gè)鍵、一個(gè)值和一個(gè)時(shí)間戳組成。

Kafka 是一個(gè)分布式流平臺(tái),這到底是什么意思?
- 發(fā)布和訂閱記錄流,類似于消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)。
- 以容錯(cuò)的持久方式存儲(chǔ)記錄流。
- 處理記錄流。
Kafka 中消息是以 Topic 進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,面向的都是同一個(gè) Topic。
Topic 是邏輯上的概念,而 Partition 是物理上的概念,每個(gè) Partition 對(duì)應(yīng)于一個(gè) log 文件,該 log 文件中存儲(chǔ)的就是 Producer 生產(chǎn)的數(shù)據(jù)。
Producer 生產(chǎn)的數(shù)據(jù)會(huì)不斷追加到該 log 文件末端,且每條數(shù)據(jù)都有自己的 Offset。
消費(fèi)者組中的每個(gè)消費(fèi)者,都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) Offset,以便出錯(cuò)恢復(fù)時(shí),從上次的位置繼續(xù)消費(fèi)。
存儲(chǔ)機(jī)制

由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到 log 文件末尾,為防止 log 文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采取了分片和索引機(jī)制。
它將每個(gè) Partition 分為多個(gè) Segment,每個(gè) Segment 對(duì)應(yīng)兩個(gè)文件:“.index” 索引文件和 “.log” 數(shù)據(jù)文件。
這些文件位于同一文件下,該文件夾的命名規(guī)則為:topic 名-分區(qū)號(hào)。例如,first 這個(gè) topic 有三分分區(qū),則其對(duì)應(yīng)的文件夾為 first-0,first-1,first-2。
# ls /root/data/kafka/first-0 00000000000000009014.index 00000000000000009014.log 00000000000000009014.timeindex 00000000000000009014.snapshot leader-epoch-checkpoint
index 和 log 文件以當(dāng)前 Segment 的第一條消息的 Offset 命名。下圖為 index 文件和 log 文件的結(jié)構(gòu)示意圖:

“.index” 文件存儲(chǔ)大量的索引信息,“.log” 文件存儲(chǔ)大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 Message 的物理偏移量。
生產(chǎn)者
分區(qū)策略
分區(qū)原因:
- 方便在集群中擴(kuò)展,每個(gè) Partition 可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè) Topic 又可以有多個(gè) Partition 組成,因此可以以 Partition 為單位讀寫了。
- 可以提高并發(fā),因此可以以 Partition 為單位讀寫了。
分區(qū)原則:我們需要將 Producer 發(fā)送的數(shù)據(jù)封裝成一個(gè) ProducerRecord 對(duì)象。
該對(duì)象需要指定一些參數(shù):
- topic:string 類型,NotNull。
- partition:int 類型,可選。
- timestamp:long 類型,可選。
- key:string 類型,可選。
- value:string 類型,可選。
- headers:array 類型,Nullable。
①指明 Partition 的情況下,直接將給定的 Value 作為 Partition 的值。
②沒有指明 Partition 但有 Key 的情況下,將 Key 的 Hash 值與分區(qū)數(shù)取余得到 Partition 值。
③既沒有 Partition 有沒有 Key 的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用都在這個(gè)整數(shù)上自增),將這個(gè)值與可用的分區(qū)數(shù)取余,得到 Partition 值,也就是常說的 Round-Robin 輪詢算法。
數(shù)據(jù)可靠性保證
為保證 Producer 發(fā)送的數(shù)據(jù),能可靠地發(fā)送到指定的 Topic,Topic 的每個(gè) Partition 收到 Producer 發(fā)送的數(shù)據(jù)后,都需要向 Producer 發(fā)送 ACK(ACKnowledge 確認(rèn)收到)。
如果 Producer 收到 ACK,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。

①副本數(shù)據(jù)同步策略
何時(shí)發(fā)送 ACK?確保有 Follower 與 Leader 同步完成,Leader 再發(fā)送 ACK,這樣才能保證 Leader 掛掉之后,能在 Follower 中選舉出新的 Leader 而不丟數(shù)據(jù)。
多少個(gè) Follower 同步完成后發(fā)送 ACK?全部 Follower 同步完成,再發(fā)送 ACK。

②ISR
采用第二種方案,所有 Follower 完成同步,Producer 才能繼續(xù)發(fā)送數(shù)據(jù),設(shè)想有一個(gè) Follower 因?yàn)槟撤N原因出現(xiàn)故障,那 Leader 就要一直等到它完成同步。
這個(gè)問題怎么解決?Leader維護(hù)了一個(gè)動(dòng)態(tài)的 in-sync replica set(ISR):和 Leader 保持同步的 Follower 集合。
當(dāng) ISR 集合中的 Follower 完成數(shù)據(jù)的同步之后,Leader 就會(huì)給 Follower 發(fā)送 ACK。
如果 Follower 長(zhǎng)時(shí)間未向 Leader 同步數(shù)據(jù),則該 Follower 將被踢出 ISR 集合,該時(shí)間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定。Leader 發(fā)生故障后,就會(huì)從 ISR 中選舉出新的 Leader。
③ACK 應(yīng)答機(jī)制
對(duì)于某些不太重要的數(shù)據(jù),對(duì)數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等 ISR 中的 Follower 全部接受成功。
所以 Kafka 為用戶提供了三種可靠性級(jí)別,用戶根據(jù)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。

Ack 參數(shù)配置:
- 0:Producer 不等待 Broker 的 ACK,這提供了最低延遲,Broker 一收到數(shù)據(jù)還沒有寫入磁盤就已經(jīng)返回,當(dāng) Broker 故障時(shí)有可能丟失數(shù)據(jù)。
- 1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盤成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么將會(huì)丟失數(shù)據(jù)。
- -1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盤成功后才返回 ACK。但是在 Broker 發(fā)送 ACK 時(shí),Leader 發(fā)生故障,則會(huì)造成數(shù)據(jù)重復(fù)。
④故障處理細(xì)節(jié)

LEO:每個(gè)副本最大的 Offset。HW:消費(fèi)者能見到的最大的 Offset,ISR 隊(duì)列中最小的 LEO。
Follower 故障:Follower 發(fā)生故障后會(huì)被臨時(shí)踢出 ISR 集合,待該 Follower 恢復(fù)后,F(xiàn)ollower 會(huì) 讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開始向 Leader 進(jìn)行同步數(shù)據(jù)操作。
等該 Follower 的 LEO 大于等于該 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了。
Leader 故障:Leader 發(fā)生故障后,會(huì)從 ISR 中選出一個(gè)新的 Leader,之后,為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的 Follower 會(huì)先將各自的 log 文件高于 HW 的部分截掉,然后從新的 Leader 同步數(shù)據(jù)。
注意:這只能保證副本之間的數(shù)據(jù)一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
Exactly Once 語義
將服務(wù)器的 ACK 級(jí)別設(shè)置為 -1,可以保證 Producer 到 Server 之間不會(huì)丟失數(shù)據(jù),即 At Least Once 語義。
相對(duì)的,將服務(wù)器 ACK 級(jí)別設(shè)置為 0,可以保證生產(chǎn)者每條消息只會(huì)被發(fā)送一次,即 At Most Once 語義。
At Least Once 可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù);相對(duì)的,At Most Once 可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失。
但是,對(duì)于一些非常重要的信息,比如交易數(shù)據(jù),下游數(shù)據(jù)消費(fèi)者要求數(shù)據(jù)既不重復(fù)也不丟失,即 Exactly Once 語義。
0.11 版本的 Kafka,引入了冪等性:Producer 不論向 Server 發(fā)送多少重復(fù)數(shù)據(jù),Server 端都只會(huì)持久化一條。
即:
At Least Once + 冪等性 = Exactly Once
要啟用冪等性,只需要將 Producer 的參數(shù)中 enable.idompotence 設(shè)置為 true 即可。
開啟冪等性的 Producer 在初始化時(shí)會(huì)被分配一個(gè) PID,發(fā)往同一 Partition 的消息會(huì)附帶 Sequence Number。
而 Borker 端會(huì)對(duì)
但是 PID 重啟后就會(huì)變化,同時(shí)不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨分區(qū)會(huì)話的 Exactly Once。
消費(fèi)者
消費(fèi)方式
Consumer 采用 Pull(拉取)模式從 Broker 中讀取數(shù)據(jù)。
Consumer 采用 Push(推送)模式,Broker 給 Consumer 推送消息的速率是由 Broker 決定的,很難適應(yīng)消費(fèi)速率不同的消費(fèi)者。
它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。
而 Pull 模式則可以根據(jù) Consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。Pull 模式不足之處是,如果 Kafka 沒有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán)中,一直返回空數(shù)據(jù)。
因?yàn)橄M(fèi)者從 Broker 主動(dòng)拉取數(shù)據(jù),需要維護(hù)一個(gè)長(zhǎng)輪詢,針對(duì)這一點(diǎn), Kafka 的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳入一個(gè)時(shí)長(zhǎng)參數(shù) timeout。
如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi),Consumer 會(huì)等待一段時(shí)間之后再返回,這段時(shí)長(zhǎng)即為 timeout。
分區(qū)分配策略
一個(gè) Consumer Group 中有多個(gè) Consumer,一個(gè) Topic 有多個(gè) Partition,所以必然會(huì)涉及到 Partition 的分配問題,即確定哪個(gè) Partition 由哪個(gè) Consumer 來消費(fèi)。
Kafka 有兩種分配策略,一個(gè)是 RoundRobin,一個(gè)是 Range,默認(rèn)為Range,當(dāng)消費(fèi)者組內(nèi)消費(fèi)者發(fā)生變化時(shí),會(huì)觸發(fā)分區(qū)分配策略(方法重新分配)。
①RoundRobin

RoundRobin 輪詢方式將分區(qū)所有作為一個(gè)整體進(jìn)行 Hash 排序,消費(fèi)者組內(nèi)分配分區(qū)個(gè)數(shù)最大差別為 1,是按照組來分的,可以解決多個(gè)消費(fèi)者消費(fèi)數(shù)據(jù)不均衡的問題。
但是,當(dāng)消費(fèi)者組內(nèi)訂閱不同主題時(shí),可能造成消費(fèi)混亂,如下圖所示,Consumer0 訂閱主題 A,Consumer1 訂閱主題 B。

將 A、B 主題的分區(qū)排序后分配給消費(fèi)者組,TopicB 分區(qū)中的數(shù)據(jù)可能分配到 Consumer0 中。
②Range

Range 方式是按照主題來分的,不會(huì)產(chǎn)生輪詢方式的消費(fèi)混亂問題。
但是,如下圖所示,Consumer0、Consumer1 同時(shí)訂閱了主題 A 和 B,可能造成消息分配不對(duì)等問題,當(dāng)消費(fèi)者組內(nèi)訂閱的主題越多,分區(qū)分配可能越不均衡。

Offset 的維護(hù)
由于 Consumer 在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,Consumer 恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi)。
所以 Consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) Offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
Kafka 0.9 版本之前,Consumer 默認(rèn)將 Offset 保存在 Zookeeper 中,從 0.9 版本開始,Consumer 默認(rèn)將 Offset 保存在 Kafka 一個(gè)內(nèi)置的 Topic 中,該 Topic 為 __consumer_offsets。
總結(jié)
上面和大家一起深入探討了 Kafka 的架構(gòu),比較偏重理論和基礎(chǔ),這是掌握 Kafka 的必要內(nèi)容,接下來我會(huì)以代碼和實(shí)例的方式,更新 Kafka 有關(guān) API 以及事務(wù)、攔截器、監(jiān)控等高級(jí)篇,讓大家徹底理解并且會(huì)用 Kafka。
作者:臧遠(yuǎn)慧
簡(jiǎn)介:就職于中科星圖股份有限公司(北京),研發(fā)部后端技術(shù)組。個(gè)人擅長(zhǎng) Python/JAVA 開發(fā),了解前端基礎(chǔ);熟練掌握 MySQL,MongoDB,了解 redis;熟悉 linux 開發(fā)環(huán)境,掌握 Shell 編程,有良好的 Git 源碼管理習(xí)慣;精通 Nginx ,F(xiàn)lask、Swagger 開發(fā)框架;有 Docker+Kubernetes 云服務(wù)開發(fā)經(jīng)驗(yàn)。對(duì)人工智能、云原生技術(shù)有較大的興趣。
【51CTO原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文作者和出處為51CTO.com】