日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

本文介紹了如何使用Spring Kafka實(shí)現(xiàn)有狀態(tài)消息監(jiān)聽(tīng)器?的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

我希望使用Spring Kafka API實(shí)現(xiàn)有狀態(tài)監(jiān)聽(tīng)器。

提供以下信息:

ConCurrentKafkaListenerContainerFactory,并發(fā)設(shè)置為”n”
Spring@Service類上的@KafkaListener批注方法

然后創(chuàng)建”n”個(gè)KafkaMessageListenerContainers。它們中的每一個(gè)都將有自己的KafkaConsumer,因此將有”n”個(gè)使用者線程-每個(gè)使用者一個(gè)線程。

消費(fèi)消息時(shí),將使用輪詢底層KafkaConsumer的同一線程調(diào)用@KafkaListener方法。由于只有一個(gè)監(jiān)聽(tīng)程序?qū)嵗虼舜吮O(jiān)聽(tīng)程序需要是線程安全的,因?yàn)閷⒂衼?lái)自”n”個(gè)線程的并發(fā)訪問(wèn)。

我不想考慮并發(fā)訪問(wèn),并在我知道只能由一個(gè)線程訪問(wèn)的偵聽(tīng)器中保留狀態(tài)。

如何使用Spring Kafka API為每個(gè)Kafka消費(fèi)者創(chuàng)建單獨(dú)的監(jiān)聽(tīng)器?

推薦答案

您說(shuō)得對(duì);每個(gè)容器都有一個(gè)監(jiān)聽(tīng)器實(shí)例(無(wú)論是配置為@KafkaListener還是MessageListener)。

一種解決方法是將作用域?yàn)?code>MessageListener的原型與n個(gè)KafkaMessageListenerContainerBean(每個(gè)Bean有一個(gè)線程)一起使用。

然后,每個(gè)容器將獲得其自己的偵聽(tīng)器實(shí)例。

@KafkaListenerPOJO抽象不可能做到這一點(diǎn)。

不過(guò),使用無(wú)狀態(tài)Bean通常更好。

編輯

我找到了另一個(gè)解決方法,使用SimpleThreadScope

@SpringBootApplication
public class So51658210Application {

    public static void main(String[] args) {
        SpringApplication.run(So51658210Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
            KafkaListenerEndpointRegistry registry) {
        return args -> {
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
        };
    }

    @Bean
    public ActualListener actualListener() {
        return new ActualListener();
    }

    @Bean
    @Scope("threadScope")
    public ThreadScopedListener listener() {
        return new ThreadScopedListener();
    }

    @Bean
    public static CustomScopeConfigurer scoper() {
        CustomScopeConfigurer configurer = new CustomScopeConfigurer();
        configurer.addScope("threadScope", new SimpleThreadScope());
        return configurer;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so51658210", 3, (short) 1);
    }

    public static class ActualListener {

        @Autowired
        private ObjectFactory<ThreadScopedListener> listener;

        @KafkaListener(id = "foo", topics = "so51658210")
        public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            this.listener.getObject().doListen(in, partition);
        }

    }

    public static class ThreadScopedListener {

        private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println(in + ":"
                    + Thread.currentThread().getName() + ":"
                    + this.hashCode() + ":"
                    + partition);
        }

    }

}

(容器并發(fā)數(shù)為3)。

工作正常:

bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2

唯一的問(wèn)題是作用域不會(huì)自動(dòng)清理(例如,當(dāng)容器停止并且線程離開(kāi)時(shí))。這可能并不重要,具體取決于您的用例。

要解決這個(gè)問(wèn)題,我們需要來(lái)自容器的一些幫助(例如,在偵聽(tīng)器線程停止時(shí)在其上發(fā)布一個(gè)事件)。GH-762。

這篇關(guān)于如何使用Spring Kafka實(shí)現(xiàn)有狀態(tài)消息監(jiān)聽(tīng)器?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,

分享到:
標(biāo)簽:Kafka Spring 如何使用 消息 狀態(tài) 監(jiān)聽(tīng)器
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定