本文介绍: kafka中如何实现动态开启 暂停消费者
Kafka中如何动态开启、关闭消费者
使用背景:在开发业务中需要根据具体逻辑选择开启还是关闭消费者
实现逻辑:
1、创建consumer配置类,自定义工厂、自定义消费者配置(省略)
还需要注入consumerService bean(改类里主要是控制动态启停的具体实现方法)
2、接口实现
public class ConsumerKafkaService {
private final Kafka KafkaListenerEndpointRegistry registry;
//暂停消费topic
public void pauseTopic(String topic){
MessageListenerContainer container =getContainer();
if(Objects.nonNull(container) && container.isRunning() ){
//取对应topic
Collection<TopicPartition> partitions = getTopicPartitions(container);
partitions.stream.filter(p -> p.topic().contains(topic)).forEach(partition -> {
if(!container.isPartitionPaused(partition)){
container.pausePartition(partition);
log.info("partition:{} 停止消费",partition);
}
});
}
}
//获取对应topic和分区集合
private static Collection<TopicPartition> getTopicPartitions(MessageListenerContainer container){
return Optional.ofNullable(container.getAssignedPartitions()).orElseGet(Collections::emptyList);
}
//开启消费
public void resumeTopic(String topic){
MessageListenerContainer container =getContainer();
if(Objects.nonNull(container) && container.isRunning() ){
//取对应topic
Collection<TopicPartition> partitions = getTopicPartitions(container);
partitions.stream.filter(p -> p.topic().contains(topic)).forEach(partition -> {
if(!container.isPartitionPaused(partition)){
container.resumePartition(partition);
log.info("partition:{} 开启消费",partition);
}
});
}
}
//根据指定id获取容器
private MessageListenerContainer getContainer(){
return registry.getListenerContainer("XXX");
}
//首次执行,初始化
public void initPause(){
MessageListenerContainer container =getContainer();
if(Objects.nonNull(container) && container.isRunning() ){
//取对应topic
Collection<TopicPartition> partitions = getTopicPartitions(container);
partitions.forEach(partition -> {
if(PlatformCache.availableTopics.contains(partition.topic())){
if(container.isPartitionPaused(partition)){
container.resumePartition(partition);
log.info("partition:{} 开启消费",partition);
}
}else{
container.pausePartition(partition);
log.info("partition:{} 暂停消费",partition);
}
});
}
}
}
消费监听方法上,@KafkaListener(topicPattern=“${topicPattern}”,id=“XXX”,idIsGroup = false)即可监听开启消费的topic数据
原文地址:https://blog.csdn.net/panying941206/article/details/135543160
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_56912.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。