日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線(xiàn)咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

一、開(kāi)篇

經(jīng)過(guò)上次文章的鋪墊,相信大家對(duì) JAVA 的 NIO 有了一些感性的認(rèn)識(shí),也初步了解了它的 API 了,可以開(kāi)始去閱讀 Kafka Producer 端的發(fā)送消息的部分了。

突然想感嘆一下,閱讀 Kafka 這個(gè)全世界著名的開(kāi)源項(xiàng)目,多多少少會(huì)讓人賞心悅目

二、發(fā)送消息的八個(gè)主流程

先大致掃一眼,發(fā)送消息的八個(gè)主流程,然后再逐個(gè)擊破。

發(fā)送消息的主流程主要是在 Sender 方法里的,Sender 是一個(gè)后臺(tái)線(xiàn)程,在構(gòu)造 Producer 的時(shí)候,就已經(jīng)被啟動(dòng)在后臺(tái)運(yùn)行了。所以我們主要看它的 run 方法。

run 方法是一個(gè) while 循環(huán),我們看里面的 run 方法。(當(dāng)前位置:Sender 類(lèi))

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

步驟一:獲取集群的元數(shù)據(jù)。(當(dāng)前位置:Sender 類(lèi))

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

在上一篇文章可以知道,我們已經(jīng)在 KafkaProducer 類(lèi)的 doSend 方法中,完成了元數(shù)據(jù)的拉取,所以這里是可以獲取到元數(shù)據(jù)的了。

步驟二:判斷哪些 partition 有消息可以發(fā)送。(當(dāng)前位置:Sender 類(lèi))

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

步驟三:標(biāo)識(shí)還沒(méi)有拉取到元數(shù)據(jù)的 topic,這些 topic 需要再次拉取一次元數(shù)據(jù)。(當(dāng)前位置:Sender 類(lèi))

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

這個(gè)是一些容錯(cuò)

步驟四:檢查與要發(fā)送消息的主機(jī)的網(wǎng)絡(luò)連接是否建立好了(當(dāng)前類(lèi):Sender 類(lèi))

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

步驟五:把發(fā)往同一臺(tái)機(jī)器的不同批次的消息合并成一個(gè)請(qǐng)求

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

步驟六:處理超時(shí)的批次

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

步驟七:創(chuàng)建請(qǐng)求

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

步驟八:真正的發(fā)送消息出去的網(wǎng)絡(luò)請(qǐng)求,包括:發(fā)送請(qǐng)求,接收和處理響應(yīng),拉取元數(shù)據(jù)等

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

三、消息可以發(fā)送出去的條件

(1)首先我們來(lái)到這個(gè) ready 方法里面(當(dāng)前位置:RecordAccumulator)

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

(2)來(lái)看這一行:

boolean exhausted = this.free.queued() > 0;

free 是指 BufferPool,queued 方法:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

waiters 里面是 Condition,表示是否有等待釋放內(nèi)存的線(xiàn)程,如果有,那么就是內(nèi)存不足的意思。

也就是說(shuō),內(nèi)存不足,exhausted 為 true,否則 為 false。

(3)遍歷所有的分區(qū)和批次

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

拿出一個(gè)批次出來(lái),下面開(kāi)始判斷是否可發(fā)送的條件:

(4)第一次發(fā)送為 false;下次重試時(shí)間到了,false;重試時(shí)間沒(méi)到,true。

boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;

batch.attempts :表示是否嘗試過(guò)了

batch.lastAttemptMs :表示分區(qū)的上次嘗試時(shí)間,初始值為當(dāng)前時(shí)間

retryBackOffMs :表示重試的時(shí)間間隔,默認(rèn)為 100 ms

nowMs:表示當(dāng)前時(shí)間

那么這句是什么意思?

  • 如果消息是第一次發(fā)送,那么這個(gè) backingOff 就是 false;
  • 如果消息第一次發(fā)送失敗,進(jìn)入重試,并且還沒(méi)到下次重試的時(shí)間,這個(gè) backingOff 就是 true,如果到了重試的時(shí)間,那么 backingOff 就是 false。

這句話(huà)可能不好理解,可以假設(shè),上次重試時(shí)間點(diǎn)是 10:00:00.000,重試的時(shí)間間隔是 100ms,下次重試時(shí)間是 10:00:00.100,而當(dāng)前時(shí)間是 10:00:00.020,即還沒(méi)到下次重試的時(shí)間。

那么 batch.lastAttemptMs + retryBackoffMs > nowMs 為 true,即還沒(méi)到下次重試時(shí)間。

(5)計(jì)算出已經(jīng)等待的時(shí)間

long waitedTimeMs = nowMs - batch.lastAttemptMs;

nowMs:表示當(dāng)前時(shí)間

batch.lastAttemptMs:上次重試時(shí)間

waitedTimeMs:已經(jīng)等待的時(shí)間

(6)等待的時(shí)間

long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;

retryBackoffMs :表示重試的時(shí)間間隔,默認(rèn)是 100 ms

lingerMs:這個(gè)值默認(rèn)是 0,即來(lái)一條發(fā)送一條。所以在生產(chǎn)上,一定要配置這個(gè)值,充分利用 batch 來(lái)緩存批次,避免過(guò)多和服務(wù)器的通信。

如果是第一次發(fā)送,backingOff 為 false,那么 timeToWaitMs 為 lingerMs。

(7)還需要等待多久

long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);

timeToWaitMs:一共需要等待的時(shí)間

waitedTimeMs:已經(jīng)等待的時(shí)間

timeLeftMs:還需要等待的時(shí)間

(8)是否有批次滿(mǎn)了

boolean full = deque.size() > 1 || batch.records.isFull();

如果隊(duì)列里的批次數(shù)量大于 1,則表示已經(jīng)有批次已經(jīng)滿(mǎn)了。

如果批次數(shù)量為 1,但是這個(gè)批次的消息已經(jīng)滿(mǎn)了

(9)是否超時(shí),即已經(jīng)等待的時(shí)長(zhǎng),是否大于一共需要等待的時(shí)長(zhǎng)

boolean expired = waitedTimeMs >= timeToWaitMs;

(10)最后是發(fā)送條件,下面的五個(gè)條件是或的關(guān)系,任意一個(gè)滿(mǎn)足,都可以發(fā)送

boolean sendable = full || expired || exhausted || closed || flushInProgress();
  • 如果批次已經(jīng)滿(mǎn)了
  • 等待的時(shí)間到了
  • 內(nèi)存滿(mǎn)了
  • 客戶(hù)端關(guān)閉,但仍然有消息沒(méi)發(fā)送

(11)如果達(dá)到了發(fā)送消息的條件,并且重試的時(shí)間到了(或者是第一次發(fā)送)

則把當(dāng)前消息所在的分區(qū)的 Leader Partition 對(duì)應(yīng)的主機(jī),加到 readyNodes 數(shù)據(jù)結(jié)構(gòu)中來(lái)

if (sendable && !backingOff) {
    readyNodes.add(leader);
}

至此,已經(jīng)找到了需要發(fā)送消息的主機(jī),那么接下來(lái)就是建立到這臺(tái)主機(jī)的連接。

四、Kafka Producer 對(duì)于 Java NIO 的封裝

到建立網(wǎng)絡(luò)連接的時(shí)候,看到這段代碼:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

可以看到具體的實(shí)現(xiàn)是在 NetwordClient 里面

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

第一個(gè)條件就是發(fā)送消息不能是在更新元數(shù)據(jù)的時(shí)候;

第二個(gè)條件點(diǎn)進(jìn)去:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

發(fā)現(xiàn)這邊有個(gè)核心的對(duì)象,selector,它是 NetworkClient 里的一個(gè)屬性。(NetworkClient 是 Kafka 網(wǎng)絡(luò)連接的一個(gè)很重要的對(duì)象!):

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

我們?cè)冱c(diǎn)進(jìn)去,找它的實(shí)現(xiàn)類(lèi),Selector:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

可以看到有兩個(gè)核心屬性,第一個(gè) nIOSelector 就是對(duì)于 Java 的 Nio 的封裝。

第二個(gè)是一個(gè) Map,Map 的 key 是 broker 的編號(hào),value 是 KafkaChannel,KafkaChannel 可以理解為是 SocketChannel。

好,然后再繼續(xù)看一下 KafkaChannel:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

最終,如下圖所示:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

五、檢查并建立網(wǎng)絡(luò)連接

我們從第四步的代碼開(kāi)始看:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

第一個(gè)條件,表示是否建立好了連接,如果建立好了,會(huì)在 nodeState 的結(jié)構(gòu)中緩存起來(lái)的。

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

第二個(gè)條件:通道是否準(zhǔn)備好了:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

第三個(gè)條件:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

max.in.flight.requests.per.connection

這個(gè)參數(shù),是在初始化 NetworkClient 對(duì)象的時(shí)候,傳遞進(jìn)來(lái)的,默認(rèn)值是 5.

表示最多默認(rèn)有多少次請(qǐng)求沒(méi)有得到服務(wù)端的響應(yīng)。

這里第三個(gè)條件,就是說(shuō),是否小于 5 個(gè)請(qǐng)求發(fā)送出去了,沒(méi)有得到響應(yīng)。

但現(xiàn)在我們是第一次判斷與主機(jī)的網(wǎng)絡(luò)是否連接好,網(wǎng)絡(luò)肯定是沒(méi)有建立好的,所以這個(gè)方法會(huì)返回 false。

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

然后就開(kāi)始初始化網(wǎng)絡(luò)連接了:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

這里連接的代碼和平時(shí)寫(xiě)的 Java NIO 的代碼是一樣的

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

socket.setTcpNoDelay(true);

注意,他這里有一句這個(gè)代碼,這個(gè)默認(rèn)值是 false,意思是它會(huì)把網(wǎng)絡(luò)中的一些小的數(shù)據(jù)包收集起來(lái),組合成一個(gè)大的數(shù)據(jù)包然后再發(fā)送出去。

它認(rèn)為如果網(wǎng)絡(luò)中有大量小的數(shù)據(jù)包,會(huì)影響網(wǎng)絡(luò)擁塞。

所以這里,一定是要把它設(shè)置為 true 的。因?yàn)橛袝r(shí)候,數(shù)據(jù)包就是比較小,這里不幫我們發(fā)送,明細(xì)是不合適的。

這里,建立網(wǎng)絡(luò)連接,最終往 selector 上綁定了一個(gè) OP_CONNECT 事件,和我們平時(shí)寫(xiě)的代碼是一樣的。

最終這個(gè)方法返回了 false:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

那么回到主流程上,返回 false 之后,這些主機(jī)都會(huì)被移除。

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

然后是步驟七,創(chuàng)建一個(gè)請(qǐng)求。

最后執(zhí)行到這里:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

點(diǎn)進(jìn)去看,核心代碼在這里:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

繼續(xù)往里面看,核心代碼在這里:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

點(diǎn)進(jìn)去:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

再點(diǎn)進(jìn)去,(當(dāng)前位置:PlaintextTransportLayer)

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

這里,如果已經(jīng)連接網(wǎng)絡(luò)了,則移除 OP_CONNECT 事件,并且增加 OP_READ 事件,這樣的話(huà),就可以讀取到 服務(wù)端發(fā)送回來(lái)的響應(yīng)了。

到這里位置,第一遍就建立好了網(wǎng)絡(luò)連接。

六、準(zhǔn)備發(fā)送消息

剛剛我們第一遍執(zhí)行,建立好了網(wǎng)絡(luò)連接,現(xiàn)在開(kāi)始第二次執(zhí)行

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

這里網(wǎng)絡(luò)已經(jīng)準(zhǔn)備好了,所以 if 的方法不執(zhí)行,節(jié)點(diǎn)也不會(huì)被移除了

這個(gè)時(shí)候是可以合并批次的,因?yàn)檫@個(gè) nodes 不為空

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

然后創(chuàng)建一個(gè)請(qǐng)求,并且發(fā)送這個(gè)請(qǐng)求:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

點(diǎn)進(jìn)去:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

在點(diǎn)進(jìn)去 send 方法里,這里有一個(gè)很重要的操作,綁定了 OP_WRITE 事件

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

綁定了 OP_WRITE 事件,才能把數(shù)據(jù)發(fā)送出去!!

現(xiàn)在我們?cè)偻嘶氐?這個(gè)方法:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

點(diǎn)到 poll 方法里來(lái):

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

然后這里會(huì)從 selector 上拿到 SelectionKey,如果是寫(xiě)事件:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

點(diǎn)到 send 方法里來(lái):

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

把消息寫(xiě)出去,并且移除 OP_WRITE 事件。

到此為止,消息終于發(fā)送出去了。

七、獲取服務(wù)端的響應(yīng),拆包和粘包處理

我們可以想到,客戶(hù)端發(fā)送出去的肯定是多個(gè)請(qǐng)求,那么服務(wù)端返回的也是多個(gè)請(qǐng)求,那客戶(hù)端如何從響應(yīng)中解析出這多個(gè)請(qǐng)求呢?這就是拆包處理。

比如,服務(wù)端返回的響應(yīng)是這樣的:

響應(yīng)成功響應(yīng)失敗

我們要拆分成:

響應(yīng)成功

響應(yīng)失敗

但是,由于網(wǎng)絡(luò)原因,返回的可能是這樣的

響應(yīng)成

功響應(yīng)失敗

也就是分兩次發(fā)回給客戶(hù)端

客戶(hù)端該如何處理?

Kafka 是在響應(yīng)消息的前面加上了每個(gè)響應(yīng)的長(zhǎng)度編碼

40響應(yīng)成功30響應(yīng)失敗

那這個(gè)長(zhǎng)度會(huì)發(fā)生拆包嗎?也很簡(jiǎn)單,申請(qǐng)一定長(zhǎng)度的字節(jié),比如2個(gè)字節(jié)來(lái)存長(zhǎng)度,把這個(gè)2字節(jié)的長(zhǎng)度滿(mǎn)了,就是長(zhǎng)度了。

等到讀滿(mǎn)了2字節(jié),就轉(zhuǎn)換成 int 類(lèi)型,再申請(qǐng)這個(gè) int 類(lèi)型長(zhǎng)度的內(nèi)存,再去接收這么多長(zhǎng)度的字節(jié),一直到讀滿(mǎn)為止。

然后來(lái)看看 Kafka 的代碼如何處理的,看到 poll 方法里處理 OP_READ 的方法的部分

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

最終,拆包和粘包的代碼:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

size.hasRemaining, size 是一個(gè) 4 字節(jié)的 ByteBuffer

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

然后開(kāi)始讀4個(gè)字節(jié)的數(shù)據(jù)

int bytesRead = channel.read(size);

讀取完了之后,再看有沒(méi)有剩余空間了,如果讀滿(mǎn)了,那么把這個(gè)4字節(jié)的數(shù)變成一個(gè) int 值,并且繼續(xù)分配這個(gè) int 值大小的 ByteBuffer

if (!size.hasRemaining()) {
    size.rewind();
    int receiveSize = size.getInt();
    if (receiveSize < 0)
        throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
    if (maxSize != UNLIMITED && receiveSize > maxSize)
        throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");

    this.buffer = ByteBuffer.allocate(receiveSize);
}

然后一直讀取內(nèi)容:

if (buffer != null) {
    int bytesRead = channel.read(buffer);
    if (bytesRead < 0)
        throw new EOFException();
    read += bytesRead;
}

然后再來(lái)看:

Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 


Kafka 的網(wǎng)絡(luò)通信設(shè)計(jì),竟然只用 20 行就實(shí)現(xiàn)了粘包拆包邏輯

 

這個(gè) complete 方法,是判斷 size 已經(jīng)讀滿(mǎn)了,并且 內(nèi)容也已經(jīng)讀滿(mǎn)了,那么就表示讀取到了一個(gè)完整的響應(yīng)了。

那么這就是完整的拆包和粘包的處理了,大概也就是20行代碼,也是很精彩的。

八、總結(jié)

本次我們完整的看了 Sender 線(xiàn)程發(fā)送消息的完整過(guò)程,里面包括了 Kafka 如何封裝 Java NIO 代碼,并且合理的建立連接,綁定 OP_READ,OP_WRITE 事件,并且讀取服務(wù)端的響應(yīng),代碼質(zhì)量還是非常高的,看起來(lái)也是賞心悅目。

希望大家對(duì)著源碼再好好看一遍,一定會(huì)有收貨的。

分享到:
標(biāo)簽:Kafka
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定