本文介绍: 在现代的微服务架构中,消息队列已经成为一个不可或缺的组件。它能够帮助我们不同服务之间传递消息,并且能够确保这些消息不会丢失。在众多的消息列中,Kafka一个非常出色的选择。它能够处理大量的实时数据,并且提供了强大的持久能力。在本文中,我们将会探讨如何在 SpringBoot整合 Kafka。Apache Kafka一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量可扩展性容错性而闻名。

579a429daf314744b995f37351b46548

前言

在现代的微服务架构中,消息队列已经成为一个不可或缺的组件

能够帮助我们不同服务之间传递消息,并且能够确保这些消息不会丢失

在众多的消息队列中,Kafka 是一个非常出色的选择

它能够处理大量的实时数据,并且提供了强大的持久能力

本文中,我们将会探讨如何在 SpringBoot整合 Kafka。


什么是Kafka?

Apache Kafka 是一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量可扩展性容错性而闻名。它是一个基于发布/订阅模式的消息系统,通常用于大型实时数据流处理应用

Kafka 的主要组件包括:

Kafka 可以分布式系统用于构建实时数据管道,它可以系统或应用之间可靠获取数据。此外,Kafka 可以和 Apache Storm、Apache Hadoop、Apache Spark 等进行集成用于数据处理分析


Kafka的应用场景?

日志收集

一个公司可能很多服务器每个服务器运行很多服务,Kafka 可以用来实现这些服务日志收集功能。各服务的日志分别发送到 Kafka 的不同 Topic 中。

消息系统:

Kafka 能够作为一个大规模的消息处理系统,各生产者将消息发送到 Kafka,消费者从 Kafka 中读取消息进行处理

用户活动跟踪

Kafka 也常用用户活动跟踪实时分析例如用户点击搜索行为可以实时写入到 Kafka,然后进行实时或者离线分析

在 Kafka 上可以进行实时的流处理例如使用 Apache Storm 集成 Kafka 来进行实时数据处理

指标和日志聚合

计数据和监控数据也是 Kafka 的一个重要应用场景。例如通过 Kafka 可以收集各种分布式应用数据然后进行统一处理分析

事件源:

Kafka 可以作为大规模事件处理的源头,例如用户的行为、系统的状态等都可以作为事件通过 Kafka 进行分发处理


示例

版本依赖
模块 版本
SpringBoot 3.1.0
JDK 17
代码
KafkaConfig
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public KafkaReceiver listener() {
        return new KafkaReceiver();
    }

}
KafkaSender
@Component
@Slf4j
public class KafkaSender {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic, String key, String data) {
        //发送消息
        CompletableFuture<SendResult<String, Object>> completable = kafkaTemplate.send(topic, key, data);
        completable.whenCompleteAsync((result, ex) -> {
            if (null == ex) {
                log.info(topic + "生产者发送消息成功:" + result.toString());
            } else {
                log.info(topic + "生产者发送消息失败:" + ex.getMessage());
            }
        });
    }
}
KafkaReceiver
@Component
@Slf4j
public class KafkaReceiver {
    /**
     * 下面的主题是一个数组,可以同时订阅主题,只需按数组格式即可,也就是用","隔开
     */
    @KafkaListener(topics = {"testTopic"})
    public void receive(ConsumerRecord<?, ?> record){
        log.info("消费者收到的消息key: " + record.key());
        log.info("消费者收到的消息value: " + record.value().toString());
    }
}
KafkaController
/**
 * kafka 测试接口
 */
@RestController
public class KafkaController {
    @Autowired
    private KafkaSender kafkaSender;

    @GetMapping("/sendMessageToKafka")
    public String sendMessageToKafka() {
        Map<String, String> messageMap = new HashMap();
        messageMap.put("message", "hello world!");
        ObjectMapper objectMapper = new ObjectMapper();
        String data = null;
        try {
            data = objectMapper.writeValueAsString(messageMap);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        String key = String.valueOf(UUID.randomUUID());
        //kakfa推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认null)
        kafkaSender.send("testTopic", key, data);
        return "ok";
    }
}
测试

http://127.0.0.1:8080/sendMessageToKafka

image-20231203195342080


image-20231203195407585


遇见问题

Error connecting to node xxxxxx:9092 (id: 0 rack: null)

Error connecting to node iZbp127a9vpra4v3kmkkmzZ:9092 (id: 0 rack: null)

解决方案

修改地物理机hosts文件文件目录:C:WindowsSystem32driversetc

image-20231203183127501

新增 xx.xx.xx.xx iZbp127a9vpra4v3kmkkmzZ

如果没生效,则需要重启系统


总结

通过上述步骤我们已经成功地在 SpringBoot 中整合了 Kafka。

这使得我们应用程序能够在不同服务之间传递消息,而不需要担心消息的丢失

我们看到通过使用 SpringBoot,我们可以非常轻松地完成这个过程

希望这篇文章能够帮助你在自己项目更好使用 Kafka。


源码获取

如果需要完整源码关注公众号”架构殿堂” ,回复 “SpringBoot+Kafka”即可获得


写在最后

感谢您的支持和鼓励! 😊🙏

如果大家相关文章感兴趣,可以关注公众号”架构殿堂”,会持续更新AIGC,java基础面试题, netty, spring boot, spring cloud系列文章,一系列干货随时送达!

csdn-end

原文地址:https://blog.csdn.net/jinxinxin1314/article/details/134770887

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_49465.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注