日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線(xiàn)咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

本文介紹了春季的卡夫卡消費(fèi)者我可以通過(guò)編程重新分配分區(qū)嗎?的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

我剛接觸Kafka,并且使用@KafkaListener(Spring)來(lái)定義Kafka消費(fèi)者。
我想檢查是否可以在運(yùn)行時(shí)手動(dòng)將分區(qū)分配給使用者。

例如,當(dāng)應(yīng)用程序啟動(dòng)時(shí),我不想使用任何數(shù)據(jù)。我目前正在使用@KafkaListener(autoStartup=false ... )用于該目的。

在某個(gè)時(shí)刻,我應(yīng)該(從應(yīng)用程序的另一部分)收到包含要處理的分區(qū)ID的通知,因此我希望跳過(guò)該分區(qū)的最新可用偏移量,因?yàn)槲也恍枰褂们『靡呀?jīng)存在的數(shù)據(jù),并將KafkaConsumer與該通知中的分區(qū)ID關(guān)聯(lián)(&q;)。

稍后,我可能會(huì)收到停止偵聽(tīng)此分區(qū)的通知,盡管存在于其他位置的生產(chǎn)者一直在向該主題和該分區(qū)寫(xiě)入內(nèi)容,因此我應(yīng)該"取消使用者與該分區(qū)的鏈接,并停止接收消息。

我看到有org.springframework.kafka.annotation.TopicPartition,但它提供了一種指定靜態(tài)關(guān)聯(lián)的方法,因此我正在尋找一種指定關(guān)聯(lián)的動(dòng)態(tài)方法。

我想我可以求助于低級(jí)的Kafka客戶(hù)端API,但在這里我真的更喜歡使用Spring。

更新

我使用主題cnp_multi_partition_test_topic和3個(gè)分區(qū)。

我當(dāng)前嘗試從使用者動(dòng)態(tài)管理分區(qū)的代碼如下所示:

@Slf4j
public class SampleKafkaConsumer {   
    @KafkaListener(id = Constants.CONSUMER_ID, topics = Constants.TEST_TOPIC, autoStartup = "false")
    public void consumePartition(@Payload String data, @Headers MessageHeaders messageHeaders) {
        Object partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID);
        Object sessionId    = messageHeaders.get(KafkaHeaders.RECEIVED_MESSAGE_KEY);
        log.info("Consuming from partition: [ {} ] message: Key = [ {} ], content = [ {} ]",partitionId, sessionId,  data);
    }
}
@RequiredArgsConstructor
public class MultiPartitionKafkaConsumerManager {

    private final KafkaListenerEndpointRegistry registry;
    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
    private final UUIDProvider uuidProvider;
    private ConcurrentMessageListenerContainer<String, String> container;

    public void assignPartitions(List<Integer> partitions) {
        if(container != null) {
            container.stop();
            container = null;
        }
        if(partitions.isEmpty()) {
            return;
        }
        var newTopicPartitionOffsets = prepareTopicPartitionOffsets(partitions);
        container =
                factory.createContainer(newTopicPartitionOffsets);
        container.getContainerProperties().setMessageListener(
                registry.getListenerContainer(Constants.CONSUMER_ID).getContainerProperties().getMessageListener());
        // random group
        container.getContainerProperties().setGroupId("sampleGroup-" + uuidProvider.getUUID().toString());
        container.setConcurrency(1);
        container.start();
    }

    private TopicPartitionOffset[] prepareTopicPartitionOffsets(List<Integer> partitions) {
        return partitions.stream()
                .map(p -> new TopicPartitionOffset(TEST_TOPIC, p, 0L, TopicPartitionOffset.SeekPosition.END))
                .collect(Collectors.toList())
                .toArray(new TopicPartitionOffset[] {});
    }
}

兩者都是通過(guò)Java配置管理的Spring Bean(單例)。

生產(chǎn)者每秒生成3條消息,并將其發(fā)送到測(cè)試主題的3個(gè)分區(qū)中。我已經(jīng)使用了Kafka UI工具來(lái)確保所有消息確實(shí)都按預(yù)期到達(dá),我使用了@EventListener@Async來(lái)使其同時(shí)發(fā)生。

下面是我如何嘗試模擬工作:


@SpringBootTest // kafka is available, omitted for brevity
public class MyTest {
    @Autowired
    MultiPartitionKafkaConsumerManager manager;
    
    @Test
    public void test_create_kafka_consumer_with_manual_partition_management() throws InterruptedException {
        log.info("Starting the test");
        sleep(5_000);
        log.info("Start listening on partition 0");
        manager.assignPartitions(List.of(0));
        sleep(10_000);
        log.info("Start listening on partition 0,2");
        manager.assignPartitions(List.of(0,2));
        sleep(10_000);
        log.info("Do not listen on partition 0 anymore");
        manager.assignPartitions(List.of(2));
        sleep(10_000);
        log.info("Do not listen on partition 2 anymore - 0 partitions to listen");
        manager.assignPartitions(Collections.emptyList());
        sleep(10_000);

日志顯示以下內(nèi)容:

06:34:20.164 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Starting the test
06:34:25.169 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0
06:34:25.360 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:25.360 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:25.361 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664065360
06:34:25.405 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Subscribed to partition(s): cnp_multi_partition_test_topic-0
06:34:25.422 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:25.429 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.438 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Start listening on partition 0,2
06:34:35.445 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9-1, groupId=sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9] Unsubscribed all topics or patterns and assigned partitions
06:34:35.445 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:35.453 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-96640bc4-e34f-4ade-9ff9-7a2d0bdf38c9: Consumer stopped
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:35.467 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664075467
06:34:35.486 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Subscribed to partition(s): cnp_multi_partition_test_topic-0, cnp_multi_partition_test_topic-2
06:34:35.487 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:35.489 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-0
06:34:35.489 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:45.502 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 0 anymore
06:34:45.503 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb-2, groupId=sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb] Unsubscribed all topics or patterns and assigned partitions
06:34:45.503 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:45.510 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-05fb12f3-aba1-4918-bcf6-a1f840de13eb: Consumer stopped
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.5.1
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId: 0efa8fb0f4c73d92
06:34:45.527 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1633664085527
06:34:45.551 [main] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Subscribed to partition(s): cnp_multi_partition_test_topic-2
06:34:45.551 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
06:34:45.554 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Seeking to LATEST offset of partition cnp_multi_partition_test_topic-2
06:34:55.560 [main] INFO  c.h.c.p.g.m.SamplePartitioningTest - Do not listen on partition 2 anymore - 0 partitions to listen
06:34:55.561 [consumer-0-C-1] INFO  o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698-3, groupId=sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698] Unsubscribed all topics or patterns and assigned partitions
06:34:55.562 [consumer-0-C-1] INFO  o.s.s.c.ThreadPoolTaskScheduler - Shutting down ExecutorService
06:34:55.576 [consumer-0-C-1] INFO  o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - sampleGroup-5e12d8c7-5900-434a-959f-98b14adda698: Consumer stopped

所以我確實(shí)看到使用者已經(jīng)啟動(dòng),它甚至嘗試在內(nèi)部輪詢(xún)記錄,但我認(rèn)為我看到代理拋出并吞噬了WakeupException異常。我不確定我是否理解為什么會(huì)發(fā)生這種情況?

推薦答案

您不能在運(yùn)行時(shí)更改手動(dòng)分配。有幾種方法可以實(shí)現(xiàn)您想要的結(jié)果。

您可以在原型Bean中聲明偵聽(tīng)器;請(qǐng)參見(jiàn)Can i add topics to my @kafkalistener at runtime

您可以使用偵聽(tīng)器容器工廠(chǎng)創(chuàng)建具有適當(dāng)主題配置的新容器,并從靜態(tài)聲明的容器中復(fù)制偵聽(tīng)器。

如果需要,我可以提供后者的示例。

編輯

下面是第二種技術(shù)的示例…

@SpringBootApplication
public class So69465733Application {

    public static void main(String[] args) {
        SpringApplication.run(So69465733Application.class, args);
    }

    @KafkaListener(id = "dummy", topics = "dummy", autoStartup = "false")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean
    ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        return args -> {
            System.out.println("Hit Enter to create a container for topic1, partition0");
            System.in.read();
            ConcurrentMessageListenerContainer<String, String> container1 =
                    factory.createContainer(new TopicPartitionOffset("topic1", 0, SeekPosition.END));
            container1.getContainerProperties().setMessageListener(
                    registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
            container1.getContainerProperties().setGroupId("topic1-0-group2");
            container1.start();

            System.out.println("Hit Enter to create a container for topic2, partition0");
            System.in.read();
            ConcurrentMessageListenerContainer<String, String> container2 =
                    factory.createContainer(new TopicPartitionOffset("topic2", 0, SeekPosition.END));
            container2.getContainerProperties().setMessageListener(
                    registry.getListenerContainer("dummy").getContainerProperties().getMessageListener());
            container2.getContainerProperties().setGroupId("topic2-0-group2");
            container2.start();

            System.in.read();
            container1.stop();
            container2.stop();
        };
    }

}

編輯

從命令行生成器向topic1、topic2發(fā)送記錄后進(jìn)行日志。

Hit Enter to create a container for topic1, partition0

ConsumerConfig values: 
...

Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622966736
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Subscribed to partition(s): topic1-0

Hit Enter to create a container for topic2, partition0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Seeking to LATEST offset of partition topic1-0
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Resetting offset for partition topic1-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.

ConsumerConfig values: 
...

Kafka version: 2.7.1
Kafka commitId: 61dbce85d0d41457
Kafka startTimeMs: 1633622969071
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Subscribed to partition(s): topic2-0

Hit Enter to stop containers
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Seeking to LATEST offset of partition topic2-0
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Cluster ID: ppGfIGsZTUWRTNmRXByfZg
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Resetting offset for partition topic2-0 to position FetchPosition{offset=2, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
record from topic1
[Consumer clientId=consumer-topic1-0-group2-1, groupId=topic1-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
record from topic2
[Consumer clientId=consumer-topic2-0-group2-2, groupId=topic2-0-group2] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
Application shutdown requested.

這篇關(guān)于春季的卡夫卡消費(fèi)者我可以通過(guò)編程重新分配分區(qū)嗎?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,

分享到:
標(biāo)簽:分區(qū) 分配 卡夫卡 可以通過(guò) 春季 消費(fèi)者 編程
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定