本文介紹了如何將@Transaction與@KafkaListener一起使用?的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
有沒(méi)有可能將聲明性TX管理(通過(guò)@Transaction)與@KafkaListener注釋方法一起使用?
例如,我想使用它來(lái)為每個(gè)監(jiān)聽(tīng)器定義單獨(dú)的發(fā)送超時(shí)。
我的設(shè)置如下:
TransactionManager:
@Bean
@ConditionalOnBean(value = {HibernateTransactionManager.class})
public ChainedKafkaTransactionManager<Object, Object> chainedHibernateTm(KafkaTransactionManager<String, String> kafkaTransactionManager,
org.springframework.orm.hibernate5.HibernateTransactionManager hibernateTransactionManager) {
return new ChainedKafkaTransactionManager<>(
kafkaTransactionManager,
hibernateTransactionManager);
}
KafkaListener:
@KafkaListener(topic = "my_topic")
@Transactional(timeout = 5)
public void handleMessage(SomeMessage message){
}
問(wèn)題是-KafkaMessageListenerContainer在調(diào)用此類(lèi)方法之前創(chuàng)建自己的事務(wù)-它使用自己的TransactionTemplate:
@Nullable
private TransactionTemplate determineTransactionTemplate() {
return this.transactionManager != null
? new TransactionTemplate(this.transactionManager)
: null;
}
未使用TransactionInterceptor。那么具體的@KafkaListener方法如何設(shè)置具體的TX超時(shí)時(shí)間呢?
推薦答案
可以這樣做,但有點(diǎn)復(fù)雜,因?yàn)槟仨殞⑾牡钠屏堪l(fā)送到Kafka交易。
不使用ChainedKafkaTransactionManager
,您可以為容器使用KafkaTransactionManager
,為HibernateTransactionManager
使用@Transactional
。
這將產(chǎn)生類(lèi)似的結(jié)果,因?yàn)镠ibernate Tx將在Kafka事務(wù)之前提交(如果Hibernate提交失敗,則Kafka Tx將回滾)。
編輯
若要將不同的鏈?zhǔn)絋M配置到每個(gè)偵聽(tīng)器容器中,可以執(zhí)行以下操作。
@組件
類(lèi)ContainerFactoryCustomizer{
ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory,
ChainedKafkaTransactionManager<?, ?> chainedOne,
ChainedKafkaTransactionManager<?, ?> chainedTwo) {
factory.setContainerCustomizer(
container -> {
String groupId = container.getContainerProperties().getGroupId();
if (groupId.equals("foo")) {
container.getContainerProperties().setTransactionManager(chainedOne);
}
else {
container.getContainerProperties().setTransactionManager(chainedTwo);
}
});
}
}
Where each chained TM has a Hibernate TM with a different default timeout.
The `groupid` is populated from the `@KafkaListener` `id` or `groupId` property.
這篇關(guān)于如何將@Transaction與@KafkaListener一起使用?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,