本文介紹了如何避免使用Kafka流丟失消息的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
我們有一個(gè)Streams應(yīng)用程序,它使用源主題中的消息,執(zhí)行一些處理并將結(jié)果轉(zhuǎn)發(fā)到目標(biāo)主題。
消息的結(jié)構(gòu)由某些Avro架構(gòu)控制。
當(dāng)開(kāi)始使用消息時(shí),如果架構(gòu)尚未緩存,應(yīng)用程序?qū)L試從架構(gòu)注冊(cè)表中檢索它。如果由于任何原因架構(gòu)注冊(cè)表不可用(例如網(wǎng)絡(luò)故障),則當(dāng)前正在處理的消息將丟失,因?yàn)槟J(rèn)處理程序是名為LogAndContinueExceptionHandler
的處理程序。
o.a.k.s.e.LogAndContinueExceptionHandler : Exception caught during Deserialization, taskId: 1_5, topic: my.topic.v1, partition: 5, offset: 142768
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 62
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
...
o.a.k.s.p.internals.RecordDeserializer : stream-thread [my-app-StreamThread-3] task [1_5] Skipping record due to deserialization error. topic=[my.topic.v1] partition=[5] offset=[142768]
...
因此,我的問(wèn)題是,處理上述情況的正確方法是什么,并確保無(wú)論如何都不會(huì)丟失消息。是否有現(xiàn)成的LogAndRollbackExceptionHandler
錯(cuò)誤處理程序或?qū)崿F(xiàn)您自己的方法?
提前感謝您的投入。
推薦答案
我在Kafka上的工作不是很多,但當(dāng)我工作時(shí),我記得遇到了您在我們系統(tǒng)中描述的問(wèn)題。
讓我告訴你我們是如何處理我們的場(chǎng)景的,也許這也會(huì)對(duì)你有所幫助:
場(chǎng)景1:如果您的消息在發(fā)布方(Publisher–&>Kafka)丟失,您可以根據(jù)需要配置Kafka確認(rèn)設(shè)置,如果您使用的是SpringCloud Stream和Kafka,則該屬性為spring.cloud.stream.kafka.binder.required-acks
可能的值:
最多一次(Ack=0)
-
出版商不在乎卡夫卡是否承認(rèn)。
發(fā)送并忘記
可能會(huì)丟失數(shù)據(jù)
至少一次(Ack=1)
如果Kafka不確認(rèn),出版商將重新發(fā)送消息。
可能存在重復(fù)。
在將郵件復(fù)制到副本之前發(fā)送確認(rèn)。
正好一次(Ack=All)
如果Kafka不確認(rèn),出版商將重新發(fā)送消息。
但是,如果一條消息多次發(fā)送到Kafka,則不會(huì)有重復(fù)。
內(nèi)部序列號(hào),用于判斷消息是否已經(jīng)寫(xiě)入主題。
需要設(shè)置Min.insync.Replicas屬性,以確保在Kafka向生產(chǎn)者確認(rèn)之前,需要同步的最小復(fù)制數(shù)是多少。
場(chǎng)景2:如果您的數(shù)據(jù)在消費(fèi)者端丟失(Kafka–&>Consumer),您可以根據(jù)您的使用情況更改Kafka的自動(dòng)提交功能。如果您使用的是Spring Cloud Streamspring.cloud.stream.kafka.bindings.input.consumer.AutoCommitOffset.
,則為該屬性
默認(rèn)情況下,在Kafka中AutoCommittee Offset為T(mén)rue,并且發(fā)送給消費(fèi)者的每一條消息在Kafka的末尾都是&Quot;Submitted&Quot;,這意味著它不會(huì)被再次發(fā)送。但是,如果您將AutoCommittee Offset更改為False,您將有權(quán)在代碼中輪詢(xún)來(lái)自Kafka的消息,并且一旦您完成工作,將Commit顯式設(shè)置為T(mén)rue,以讓Kafka知道您現(xiàn)在已經(jīng)完成了消息。
如果消息未提交,Kafka將繼續(xù)重新發(fā)送,直到消息提交為止。
希望這對(duì)您有所幫助,或至少為您指明正確的方向。
這篇關(guān)于如何避免使用Kafka流丟失消息的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,