Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序
Kafka 官网:Apache Kafka
关于ZooKeeper的弃用
根据 Kafka官网信息,随着Apache Kafka 3.5版本的发布,Zookeeper现已被标记为已弃用。未来计划在Apache Kafka(4.0版)的下一个主要版本中删除ZooKeeper,该版本最快将于2024年4月发布。在弃用阶段,ZooKeeper仍然支持用于Kafka集群元数据的管理,但不建议用于新的部署。新的部署方式使用 KRaft 模式,KRaft 模式部署可以看笔者的文章《kafka 集群 KRaft 模式搭建》,考虑到一些公司仍然在使用老版本的 Kafka,故笔者写这篇文章记录 Kafka 集群Zookeeper 模式搭建
笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11
1、官网下载 Kafka
3.6.0 版本需要至少 java8 及以上版本,笔者使用的是 java8 版本
关于 linux 安装 java,没安装过的朋友可以参考《linux 系统安装 jdk》
下载完成
mkdir /usr/local/kafka
在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录
tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka
2、配置 Kafka
cd /usr/local/kafka/kafka_2.13-3.6.0/config
vi server.properties
配置 broker.id,advertised.listeners,zookeeper.connect
advertised.listeners 本机的外网访问地址
zookeeper.connect zookeeper 地址
advertised.listeners 笔者配置为本机地址
192.168.2.90 节点
192.168.2.11 节点
笔者zookeeper 地址是 192.168.2.130:2181
zookeeper 版本是3.8.3
关于zookeeper单机安装和集群安装可以参考:《Linux环境 安装 zookeeper》《windows环境 安装 zookeeper》《linux 使用 nginx 搭建 zookeeper 集群》
3、启动 Kafka 集群
cd /usr/local/kafka/kafka_2.13-3.6.0
下面2个命令皆可
bin/kafka-server-start.sh config/server.properties
或
bin/kafka-server-start.sh -daemon config/server.properties
4、关闭 Kafka 集群
bin/kafka-server-stop.sh
5、使用Kafka 可视化工具查看
下载地址:https://www.kafkatool.com/download.html
6、测试Kafka集群
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wsjzzcbq</groupId>
<artifactId>kafka-learn</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.wsjzzcbq;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Demo
*
* @author wsjz
* @date 2023/11/24
*/
public class ProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
//配置集群节点信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
//配置序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);
//topic 名称是demo_topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");
RecordMetadata recordMetadata = producer.send(producerRecord).get();
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
}
}
package com.wsjzzcbq;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* ConsumerDemo
*
* @author wsjz
* @date 2023/11/24
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
// 配置集群节点信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
// 消费分组名
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");
// 序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
// 消费者订阅主题
consumer.subscribe(Arrays.asList("demo_topic"));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String,String> record:records) {
System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
record.offset(),record.key(),record.value());
}
}
}
}
至此完
原文地址:https://blog.csdn.net/wsjzzcbq/article/details/134731513
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_27032.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!