本文介绍: 通过这样的配置,你的 Spring Boot 应用程序启用 Kafka 支持,你可以使用 KafkaTemplate 进行消息发送使用 @KafkaListener 进行消息消费。但是在这种模式下,如果处理消息时发生异常,Kafka 服务器会重新发送相同消息可能会导致消息重复消费。当你在 Spring Boot 应用程序配置类上添加 @EnableKafka 注解时,它会激活 Kafka 基础设施,使你能够应用程序中使用 Kafka 相关组件。这样可以确保消息的准确处理,避免重复消费

1、properties 配置

control.command.kafka.enabled=true
control.command.kafka.bootstrap-servers=172.0.0.1:9092
control.command.kafka.command-topics=lastTopic
control.command.kafka.consumer.group-id=consumer-eslink-iwater-control-command
control.command.kafka.consumer.properties.session.timeout.ms=30000
control.command.kafka.consumer.properties.request.timeout.ms=90000
control.command.kafka.consumer.fetch-min-size=10KB
control.command.kafka.consumer.fetch-max-wait=500
control.command.kafka.consumer.max-poll-records=1000
control.command.kafka.consumer.auto-offset-reset=earliest
control.command.kafka.listener.ack-mode=MANUAL_IMMEDIATE
control.command.kafka.listener.concurrency=1
control.command.kafka.listener.type=SINGLE
control.command.kafka.producer.acks=all
control.command.kafka.producer.batchSize=4096
control.command.kafka.producer.bufferMemory=40960
control.command.kafka.producer.linger=10
control.command.kafka.producer.retries=3

2、Config 配置

@Configuration
@ConditionalOnExpression("${control.command.kafka.enabled:false}")
@EnableKafka
public class ControlCommandKafkaConfig {

    @Bean("controlCommandKafkaProperties")
    @ConfigurationProperties("control.command.kafka")
    @Primary
    public KafkaProperties kafkaProperties() {
        return new KafkaProperties();
    }

    @Bean("controlCommandKafkaConsumerFactory")
    public ConsumerFactory<Object, Object> kafkaConsumerFactory(
            @Qualifier("controlCommandKafkaProperties") KafkaProperties inProps) {
        DefaultKafkaConsumerFactory<Object, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(inProps.buildConsumerProperties());
        return consumerFactory;
    }

    @Bean("controlCommandBatchFactory")
    @DependsOn("controlCommandKafkaProperties")
    public KafkaListenerContainerFactory<?> egBatchFactory(@Qualifier("controlCommandKafkaProperties") KafkaProperties inProps,
                                                           @Qualifier("controlCommandKafkaConsumerFactory") ConsumerFactory consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory =
                new ConcurrentKafkaListenerContainerFactory<>();
        listenerFactory.setConsumerFactory(consumerFactory);
        configureListenerFactory(listenerFactory, inProps);
        configureContainer(listenerFactory.getContainerProperties(), inProps);
        return listenerFactory;
    }

    @Bean("controlCommandKafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate(@Qualifier("controlCommandKafkaProperties") KafkaProperties inProps) {
        return new KafkaTemplate(new DefaultKafkaProducerFactory(inProps.buildProducerProperties()));
    }

    private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Object, Object> factory, KafkaProperties inProps) {
        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
        KafkaProperties.Listener properties = inProps.getListener();
        // 设置监听线程并发数 control.command.kafka.listener.concurrency
        map.from(properties::getConcurrency).to(factory::setConcurrency);
        // control.kafka.listener.type=batch批量监听
        if (properties.getType().equals(KafkaProperties.Listener.Type.BATCH)) {
            factory.setBatchListener(true);
        }
    }

    private void configureContainer(ContainerProperties container, KafkaProperties inProps) {
        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
        KafkaProperties.Listener properties = inProps.getListener();
        map.from(properties::getAckMode).to(container::setAckMode);
        map.from(properties::getClientId).to(container::setClientId);
        map.from(properties::getAckCount).to(container::setAckCount);
        map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);
        map.from(properties::getPollTimeout).as(Duration::toMillis).to(container::setPollTimeout);
        map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold);
        map.from(properties::getIdleEventInterval).as(Duration::toMillis).to(container::setIdleEventInterval);
        map.from(properties::getMonitorInterval).as(Duration::getSeconds).as(Number::intValue)
                .to(container::setMonitorInterval);
        map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
        map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
    }
}

2.1 @EnableKafka 注解

‘@EnableKafka’ 是用于在 Spring Boot 应用程序启用 Apache Kafka 的注解。当你在 Spring Boot 应用程序的配置类上添加 @EnableKafka 注解时,它会激活 Kafka 基础设施,使你能够应用程序中使用 Kafka 相关组件

以下是对这个注解的简要解释

基础设施激活:@EnableKafka 注解告诉 Spring 在应用程序设置所需的 Kafka 基础设施。这包括创建必要的 Kafka bean 和配置。

Kafka Template:在启用 Kafka 后,你可以使用 Spring Kafka 提供的 KafkaTemplate 类轻松地向 Kafka 主题发送消息。KafkaTemplate 抽象底层的 Kafka 生产者 API,简化消息发送过程

消息监听器通过 @EnableKafka,你还可以使用 Spring 的 @KafkaListener 注解设置 Kafka 消息消费者。@KafkaListener 注解应用于 Spring 组件中的方法,使该方法可以作为 Kafka 消息监听器。当 Kafka 主题中有可用的消息时,带有 @KafkaListener 注解的方法将自动调用,并处理消息内容

下面是在 Spring Boot 应用程序中使用 @EnableKafka 的示例代码

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
@EnableKafka
public class MyKafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyKafkaApplication.class, args);
    }
}

通过这样的配置,你的 Spring Boot 应用程序将启用 Kafka 支持,你可以使用 KafkaTemplate 进行消息发送,使用 @KafkaListener 进行消息消费。请确保在 application.properties 或 application.yml 文件中配置 Kafka 相关属性例如 Kafka 代理地址和其他必要的配置。

注意,为了使 @EnableKafka 正常工作,你需要项目构建配置中包含所需的 Kafka 依赖项。对于 Maven,你可以添加以下依赖项:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

一旦 Kafka 正在运行,并且应用程序正确配置了 @EnableKafka,你就可以在 Spring Boot 应用程序构建基于 Kafka 的消息传递功能

2.2、@ConditionalOnExpression 注解

@ConditionalOnExpression 是 Spring Boot 中的一个条件注解之一。它允许你根据给定的 SpEL 表达式来决定是否启用或禁用某个 Bean 或配置。
条件注解可以用于在 Spring Boot 应用程序中根据特定条件动态创建 Bean 或配置,从而根据不同的配置或环境来灵活地管理应用程序的行为
@ConditionalOnExpression 的工作方式是:它在配置类或 Bean 上进行标记然后在应用程序启动过程解析 SpEL 表达式。如果 SpEL 表达式结果为 true,则相关的 Bean 或配置将被启用,否则将被禁用
以下是 @ConditionalOnExpression 的示例使用方式

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyConfiguration {

    @Bean
    @ConditionalOnExpression("${myapp.feature.enabled:true}")
    public MyBean myBean() {
        // 返回需要创建的 Bean 实例
        return new MyBean();
    }
}

2.3、 @RefreshScope 注解

@RefreshScope 是 Spring Cloud 中的一个注解,用于实现动态刷新 Spring Bean 的配置信息。它通常与 Spring Cloud Config 配合使用,能够运行更新应用程序配置,而无需重启应用。
在微服务架构中,使用 Spring Cloud Config 可以将配置信息集中管理然后通过 @RefreshScope 注解实现配置的动态刷新。这使得应用程序在运行时可以获取最新的配置信息,而不需要停止和启动应用。

2.4、@DependsOn 注解

@DependsOn 是 Spring Framework 中的一个注解,用于指定 Spring Bean 之间依赖关系通过在 Bean 上添加 @DependsOn 注解,你可以确保指定的 Bean 会在其所依赖的其他 Bean 初始化之后再进行初始化
当一个 Bean 希望在另一个 Bean 初始化完成后再初始化时,可以使用 @DependsOn 注解来定义这种依赖关系。这对于确保 Bean 之间的正确顺序初始化非常有用,特别是当某些 Bean 需要依赖其他 Bean 才能正确地进行初始化或工作时。

2.5、 listener.ackmode 消息的确认模式

Spring Kafka 中,AckMode用于配置消息消费者的消息确认模式(Acknowledgment Mode)。这个枚举类型用于决定在消费者处理完 Kafka 消息后如何向 Kafka 服务器发送确认,告知服务器消息是否已经被成功消费。

Spring Kafka 支持以下几种 AckMode

你可以通过在 @KafkaListener 注解中设置 AckMode 来配置消息消费者的确认模式。例如

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;

@KafkaListener(topics = "my_topic", groupId = "my_group", ackMode = "MANUAL")
public void processMessage(String message, Acknowledgment acknowledgment) {
    // 处理消息的逻辑
    // 手动确认消息
    acknowledgment.acknowledge();
}

在上面的例子中,我们ackMode 设置为 MANUAL,表明消息的确认模式为手动确认。在处理消息后,我们手动调用 acknowledgment.acknowledge() 来确认消息的消费。

选择合适的 AckMode 取决于你的应用程序需求和消费者的可靠性要求。如果应用程序对消息的重复消费有一定的容忍度,并且希望简化消费者的处理逻辑,可以选择 AUTO 模式。如果应用程序对消息的准确性要求较高,并且愿意手动确认消息的处理,可以选择 MANUAL 或 MANUAL_IMMEDIATE 模式。

2.6、kafka.consumer.autooffsetreset 偏移量

kafka.consumer.autooffsetreset 是 Kafka 消费者的一个重要配置属性,它决定了当一个新的消费者加入消费者组或者消费者在某个分区没有有效偏移量时,消费者应该从何处开始消费消息。

这个属性有以下几个可能的值:

这个属性通常在 Kafka 消费者的配置中使用,用来控制消费者在特定情况下的起始消费位置。你可以根据你的业务需求选择适合的值。如果你希望消费者能够从最早的消息开始消费,以确保不错过任何消息,可以将它设置为 earliest。如果你只关心新产生的消息,可以将它设置为 latest

3、KafkaProducer 生产者

@Component
@RefreshScope
@ConditionalOnExpression("${control.command.kafka.enabled:false}")
@DependsOn(value = {"controlCommandKafkaTemplate"})
public class ControlCommandKafkaProducer extends BaseLogable {

    @Resource(name = "controlCommandKafkaTemplate")
    private KafkaTemplate<String, String> controlCommandKafkaTemplate;

    public void send(String key, String topic, String msg) {
        bizLogger.info("sent kafka msg, topic:{}, key: {}, msg: {}", topic, key, msg);
        ListenableFuture<SendResult<String, String>> listenableFuture = controlCommandKafkaTemplate.send(topic, key, msg);
        listenableFuture.addCallback(result -> bizLogger.info("sent msg success:{}", msg),
                e -> {
                    bizLogger.error("sent msg failure, msg: {}", msg, e);
                });
    }

}

3.1 org.springframework.kafka.core.KafkaTemplate 使用

3.1.1 发送消息
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

3.1.2 发送消息并指定分区
public void sendMessageToPartition(String topic, int partition, String message) {
    kafkaTemplate.send(topic, partition, null, message);
}

3.1.3 发送消息并指定键
public void sendMessageWithKey(String topic, String key, String message) {
    kafkaTemplate.send(topic, key, message);
}

3.1.4 发送消息并等待确认
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.future.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public void sendMessageAndWaitForConfirmation(String topic, String message) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
    
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            // 消息发送成功的处理逻辑
        }

        @Override
        public void onFailure(Throwable ex) {
            // 消息发送失败的处理逻辑
        }
    });
}

4、KafkaConsumer 消费者

4.1 单条消费

单条消费时配置

control.command.kafka.listener.type=SINGLE

@Component
@RefreshScope
@ConditionalOnExpression("${control.command.kafka.enabled:false}")
@DependsOn(value = {"controlCommandBatchFactory"})
public class ControlCommandKafkaConsumer extends BaseLogable {


    /**
     * 监听主题,单条消费
     */
    @KafkaListener(id = "${control.command.kafka.consumer.groupId}", topics = "${kafka.command.topic.ecgs-command-up}", containerFactory = "controlCommandBatchFactory")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        bizLogger.info("kafka msg::: " + record.value() + ", key: " + record.key());
        //接收Kafka消息
        String data = record.value();

        //接收消息确认
        ack.acknowledge();
    }

}

4.2 多条消费

多条消费时配置

control.command.kafka.listener.type=BATCH

package cc.eslink.yq.iwater.kafka;

import cc.eslink.common.base.BaseLogable;
import cc.eslink.yq.iwater.dto.schedule.AlarmInfoDTO;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;


@Component
@RefreshScope
@ConditionalOnExpression("${control.command.kafka.enabled:false}")
@DependsOn(value = {"controlCommandBatchFactory"})
public class ControlCommandKafkaConsumer extends BaseLogable {


    @KafkaListener(id = "${control.command.kafka.consumer.groupId}", topics = "${kafka.command.topic.ecgs-command-up}", containerFactory = "controlCommandBatchFactory")
    public void listen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        bizLogger.info("== 接收到 kafka msg 条数:::{} ==", records.size());
        //接收消息确认
        ack.acknowledge();
        //接收Kafka消息
        for (ConsumerRecord<String, String> record : records) {
            try {
                final AlarmInfoDTO alarmInfoDTO = JSON.parseObject(record.value(), new TypeReference<AlarmInfoDTO>() {});
            } catch (Exception e) {
                expLogger.error("alarm.dispose error", record, e);
            }
        }


    }

}

原文地址:https://blog.csdn.net/weixin_41827053/article/details/132339253

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_46966.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注