本文介绍: 先启动zookeeper,在启动kafka(这两个在bin下的windows目录下),都需要指定配置文件(配置文件在config目录下)。解压安装,记得放在D盘根目录下,要不然启动时会报命令行太长的错误。下载2的版本,3.的版本会报错。
SpringBoot整合Kafka
下载与安装
下载地址:
https://kafka.apache.org/downloads
下载2的版本,3.的版本会报错
解压安装,记得放在D盘根目录下,要不然启动时会报命令行太长的错误
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
先启动zookeeper,在启动kafka(这两个在bin下的windows目录下),都需要指定配置文件(配置文件在config目录下)。
创建topic,测试生产消费程序
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic itheima
kafka-topics.bat --delete --zookeeper localhost:2181 --topic itheima
kafka-console-producer.bat --broker-list localhost:9092 --topic itheima2022
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic itheima2022 --from-beginning
SpringBoot整合Kafka
导坐标
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
做配置
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order
做客户端
@Service
public class MessageServiceKafkaImpl implements MessageService {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列 (kafka),id:" + id);
kafkaTemplate.send("itheima2022",id);
}
@Override
public String doMessage() {
return null;
}
}
@Component
public class MessageListener {
@KafkaListener(topics = "itheima2022")
public void onMessage(ConsumerRecord<String,String> record){
System.out.println("已完成短信发送业务(kafka):id:"+record.value());
}
}
原文地址:https://blog.csdn.net/shall_zhao/article/details/134761249
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_36500.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。