一個(gè)挺著啤酒肚,身穿格子衫,發(fā)際線嚴(yán)重后移的中年男子,手拿著保溫杯,胳膊夾著macBook向你走來,看樣子是架構(gòu)師級(jí)別。
面試開始,直入正題。
面試官: 我看到你的簡歷上寫著項(xiàng)目中用到了消息隊(duì)列,還用的是kafka,你有遇到過消息隊(duì)列丟失消息的情況嗎?
我: 消息隊(duì)列還能丟失消息?那誰還用消息隊(duì)列!你是不是搞錯(cuò)了?我沒遇到過丟失消息的情況,也沒考慮過這個(gè)問題。
面試官: 嗯...,小伙子,看來有些面試套路,你還是不太懂。今天面試就先到這里吧!給你的簡歷,我送你下樓。
我去!面試還有啥套路?
能不能少一點(diǎn)套路,多一點(diǎn)真誠!
難道都要去背一遍八股文才能參加面試?
好吧,我去瞅一眼一燈總結(jié)的面試八股文。
我: 消息隊(duì)列發(fā)送消息和消費(fèi)消息的過程,共分為三段,生產(chǎn)過程、服務(wù)端持久化過程、消費(fèi)過程,如下圖所示。
這三個(gè)過程都有可能弄丟消息。
面試官: 嗯,消息丟失的具體原因是什么?怎么防止丟失消息呢?
我: 我詳細(xì)說一下這種情況:
一、生產(chǎn)過程丟失消息
丟失原因:一般可能是網(wǎng)絡(luò)故障,導(dǎo)致消息沒有發(fā)送出去。
解決方案:重發(fā)就行了。
由于kafka為了提高性能,采用了異步發(fā)送消息。我們只有獲取到發(fā)送結(jié)果,才能確保消息發(fā)送成功。 有兩個(gè)方案可以獲取發(fā)送結(jié)果。
一種是kafka把發(fā)送結(jié)果封裝在Future對象中,我可以使用Future的get方法同步阻塞獲取結(jié)果。
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, message));
try {
RecordMetadata recordMetadata = future.get();
if (recordMetadata != null) {
System.out.println("發(fā)送成功");
}
} catch (Exception e) {
e.printStackTrace();
}
另一種是使用kafka的callback函數(shù)獲取返回結(jié)果。
producer.send(new ProducerRecord<>(topic, message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("發(fā)送成功");
} else {
System.out.println("發(fā)送失敗");
}
}
});
如果發(fā)送失敗了,有兩種重試方案:
- 手動(dòng)重試 在catch邏輯或else邏輯中,再調(diào)用一次send方法。如果還不成功怎么辦? 在數(shù)據(jù)庫中建一張異常消息表,把失敗消息存入表中,然后搞個(gè)異步任務(wù)重試,便于控制重試次數(shù)和間隔時(shí)間。
- 自動(dòng)重試 kafka支持自動(dòng)重試,設(shè)置參數(shù)如下,當(dāng)集群Leader選舉中或者Follower數(shù)量不足等原因返回失敗時(shí),就可以自動(dòng)重試。
- # 設(shè)置重試次數(shù)為3
retries = 3# 設(shè)置重試間隔為100msretry.backoff.ms = 100 - 一般我們不會(huì)用kafka自動(dòng)重試,因?yàn)槌^重試次數(shù),還是會(huì)返回失敗,還需要我們手動(dòng)重試。
二、服務(wù)端持久化過程丟失消息
為了保證性能,kafka采用的是異步刷盤,當(dāng)我們發(fā)送消息成功后,Broker節(jié)點(diǎn)在刷盤之前宕機(jī)了,就會(huì)導(dǎo)致消息丟失。
當(dāng)然我們也可以設(shè)置刷盤頻率:
# 設(shè)置每1000條消息刷一次盤
flush.messages = 1000
# 設(shè)置每秒刷一次盤
flush.ms = 1000
先普及一下kafka集群的架構(gòu)模型:
kafka集群由多個(gè)broker組成,一個(gè)broker就是一個(gè)節(jié)點(diǎn)(機(jī)器)。 一個(gè)topic有多個(gè)partition(分區(qū)),每個(gè)partition分布在不同的broker上面,可以充分利用分布式機(jī)器性能,擴(kuò)容時(shí)只需要加機(jī)器、加partition就行了。
一個(gè)partition又有多個(gè)replica(副本),有一個(gè)leader replica(主副本)和多個(gè)follower replica(從副本),這樣設(shè)計(jì)是為了保證數(shù)據(jù)的安全性。
發(fā)送消息和消費(fèi)消息都在leader上面,follower負(fù)責(zé)定時(shí)從leader上面拉取消息,只有follower從leader上面把這條消息拉取回來,才算生產(chǎn)者發(fā)送消息成功。
kafka為了加快持久化消息的性能,把性能較好的follower組成一個(gè)ISR列表(in-sync replica),把性能較差的follower組成一個(gè)OSR列表(out-of-sync replica),ISR+OSR=AR(assigned repllicas)。 如果某個(gè)follower一段時(shí)間沒有向leader拉取消息,落后leader太多,就把它移出ISR,放到OSR之中。 如果某個(gè)follower追上了leader,又會(huì)把它重新放到ISR之中。 如果leader掛掉,就會(huì)從ISR之中選一個(gè)follower做leader。
為了提升持久化消息性能,我們可以進(jìn)行一些設(shè)置:
# 如果follower超過一秒沒有向leader拉取消息,就把它移出ISR列表
rerplica.lag.time.max.ms = 1000
# 如果follower落后leader一千條消息,就把它移出ISR列表
rerplica.lag.max.messages = 1000
# 至少保證ISR中有3個(gè)follower
min.insync.replicas = 3
# 異步消息,不需要leader確認(rèn),立即給生產(chǎn)者返回發(fā)送成功,丟失消息概率較大
asks = 0
# leader把消息寫入本地日志中,不會(huì)等所有follower確認(rèn),就給生產(chǎn)者返回發(fā)送成功,小概率丟失消息
asks = 1
# leader需要所有ISR中follower確認(rèn),才給生產(chǎn)者返回發(fā)送成功,不會(huì)丟失消息
asks = -1 或者 asks = all
三、消費(fèi)過程丟失消息
kafka中有個(gè)offset的概念,consumer從partition中拉取消息,consumer本地處理完成后需要commit一下offset,表示消費(fèi)完成,下次就不會(huì)再拉取到這條消息。
所以我們需要關(guān)閉自動(dòng)commit offset的配置,防止consumer拉到消息后,服務(wù)宕機(jī),導(dǎo)致消息丟失。
enable.auto.commit = false
面試官: 還得是你,就你總結(jié)的全,我都想不那么全,明天來上班吧,薪資double。
本文知識(shí)點(diǎn)總結(jié):