日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線(xiàn)咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

本文介紹了如何避免使用Kafka流丟失消息的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

我們有一個(gè)Streams應(yīng)用程序,它使用源主題中的消息,執(zhí)行一些處理并將結(jié)果轉(zhuǎn)發(fā)到目標(biāo)主題。

消息的結(jié)構(gòu)由某些Avro架構(gòu)控制。

當(dāng)開(kāi)始使用消息時(shí),如果架構(gòu)尚未緩存,應(yīng)用程序?qū)L試從架構(gòu)注冊(cè)表中檢索它。如果由于任何原因架構(gòu)注冊(cè)表不可用(例如網(wǎng)絡(luò)故障),則當(dāng)前正在處理的消息將丟失,因?yàn)槟J(rèn)處理程序是名為LogAndContinueExceptionHandler的處理程序。

o.a.k.s.e.LogAndContinueExceptionHandler : Exception caught during Deserialization, taskId: 1_5, topic: my.topic.v1, partition: 5, offset: 142768
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 62
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
...
o.a.k.s.p.internals.RecordDeserializer : stream-thread [my-app-StreamThread-3] task [1_5] Skipping record due to deserialization error. topic=[my.topic.v1] partition=[5] offset=[142768]
...

因此,我的問(wèn)題是,處理上述情況的正確方法是什么,并確保無(wú)論如何都不會(huì)丟失消息。是否有現(xiàn)成的LogAndRollbackExceptionHandler錯(cuò)誤處理程序或?qū)崿F(xiàn)您自己的方法?

提前感謝您的投入。

推薦答案

我在Kafka上的工作不是很多,但當(dāng)我工作時(shí),我記得遇到了您在我們系統(tǒng)中描述的問(wèn)題。

讓我告訴你我們是如何處理我們的場(chǎng)景的,也許這也會(huì)對(duì)你有所幫助:

場(chǎng)景1:如果您的消息在發(fā)布方(Publisher–&>Kafka)丟失,您可以根據(jù)需要配置Kafka確認(rèn)設(shè)置,如果您使用的是SpringCloud Stream和Kafka,則該屬性為spring.cloud.stream.kafka.binder.required-acks

可能的值:

    最多一次(Ack=0)

      出版商不在乎卡夫卡是否承認(rèn)。
      發(fā)送并忘記
      可能會(huì)丟失數(shù)據(jù)

    至少一次(Ack=1)

      如果Kafka不確認(rèn),出版商將重新發(fā)送消息。

      可能存在重復(fù)。

      在將郵件復(fù)制到副本之前發(fā)送確認(rèn)。

    正好一次(Ack=All)

      如果Kafka不確認(rèn),出版商將重新發(fā)送消息。

      但是,如果一條消息多次發(fā)送到Kafka,則不會(huì)有重復(fù)。

      內(nèi)部序列號(hào),用于判斷消息是否已經(jīng)寫(xiě)入主題。

      需要設(shè)置Min.insync.Replicas屬性,以確保在Kafka向生產(chǎn)者確認(rèn)之前,需要同步的最小復(fù)制數(shù)是多少。

場(chǎng)景2:如果您的數(shù)據(jù)在消費(fèi)者端丟失(Kafka–&>Consumer),您可以根據(jù)您的使用情況更改Kafka的自動(dòng)提交功能。如果您使用的是Spring Cloud Streamspring.cloud.stream.kafka.bindings.input.consumer.AutoCommitOffset.

,則為該屬性

默認(rèn)情況下,在Kafka中AutoCommittee Offset為T(mén)rue,并且發(fā)送給消費(fèi)者的每一條消息在Kafka的末尾都是&Quot;Submitted&Quot;,這意味著它不會(huì)被再次發(fā)送。但是,如果您將AutoCommittee Offset更改為False,您將有權(quán)在代碼中輪詢(xún)來(lái)自Kafka的消息,并且一旦您完成工作,將Commit顯式設(shè)置為T(mén)rue,以讓Kafka知道您現(xiàn)在已經(jīng)完成了消息。

如果消息未提交,Kafka將繼續(xù)重新發(fā)送,直到消息提交為止。

希望這對(duì)您有所幫助,或至少為您指明正確的方向。

這篇關(guān)于如何避免使用Kafka流丟失消息的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,

分享到:
標(biāo)簽:Kafka 丟失 消息
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定