1、生产者
从编程的角度而言,生产者是一个消息的生产者,它负责创建消息并发送到Kafka集群中的一个或多个topic中。
1.1、客户端开发
- 配置生产者客户端参数及创建相应的生产者实例
- 构建待发送的消息
- public ProducerRecord(String topic, V value)
- public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers)
- public ProducerRecord(String topic, Integer partition, K key, V value)
- public ProducerRecord(String topic, K key, V value)
- public ProducerRecord(String topic, V value, Iterable
headers)
- public ProducerRecord(String topic, K key, V value, Iterable
headers)
- public ProducerRecord(String topic, Integer partition, K key, V value, Iterable
headers)
- public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
- 发送消息
- 关闭生产者实例
/**
* @author supanpan
* @date 2023/11/20
*/
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
/**
* bootstrap.servers 该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,格式为host:port,host2:port2
* serializer 该参数指定了用来对消息key进行序列化的序列化器类,key.serializer和value.serializer两个参数需要设置,必须填写序列化器的全限定类名
* client.id 该参数用来设置生产者客户端的ID,是一个字符串,如果不设置,KafkaProducer会自动生成一个非空字符串,格式为"producer-1"、"producer-2"等
*
*/
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
/**
* 防止配置书写错误,使用ProducerConfig类中的常量来设置参数
* @return
*/
public static Properties initNewConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
/**
* 通过反射的方式来设置参数,获取序列化器的全限定类名
*
*/
public static Properties initPerferConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return props;
}
public static void main(String[] args) throws InterruptedException {
Properties props = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// KafkaProducer<String, String> producer = new KafkaProducer<>(props,
// new StringSerializer(), new StringSerializer());
// 创建ProducerRecord对象,其中topic、value是必填项,其余属性都是可选项,partition、timestamp、key、headers
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
try {
producer.send(record);
// 异步发送,获取回调对象获取发送结果
// producer.send(record, new Callback() {
// @Override
// public void onCompletion(RecordMetadata metadata, Exception exception) {
// if (exception == null) {
// System.out.println(metadata.partition() + ":" + metadata.offset());
// }
// }
// });
} catch (Exception e) {
e.printStackTrace();
}finally {
// 关闭生产者实例
producer.close();
}
// TimeUnit.SECONDS.sleep(5);
}
}
1.2、序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网路发送给Kafka。
消费者需要用反序列化器(Deserializer)把字节数组转换成相应的对象才能使用。
生产者使用的序列化器和消费者使用的反序列化器必须是一致的,否则消费者无法正常消费生产者发送的消息。
常见序列化器:
- ByteArray
- ByteBuffer
- Bytes
- Double
- Integer
- Long
- String
上面列举的序列化器都是Kafka提供的,如果需要自定义序列化器,需要实现Serializer接口
org.apache.kafka.common.serialization.Serializer,此接口有三个方法
1.3、分区器
分区器(Partitioner)是生产者在将消息发送到Kafka集群时,根据分区策略选择消息发送的分区。
Kafka提供了默认的分区策略,即DefaultPartitioner,该分区器会根据ProducerRecord对象中的key来计算分区号。
- 如果key为null,则使用轮询的方式选择分区,如果key不为null,则使用key的hash值来计算分区号。
- 如果需要自定义分区器,需要实现Partitioner接口org.apache.kafka.clients.producer.Partitioner,该接口有两个方法:
Partitioner接口的方法
- configure(Map<String, ?> configs)
- partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
自定义分区器
/**
* 自定义分区器
*
*/
public class DemoPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
实现自定义的DemoPartitioner分区器后,需要在配置文件中指定分区器的全限定类名,即partitioner.class属性。
-
props.put(“partitioner.class”, “com.supanpan.kafka.demo.partitioner.DemoPartitioner”);
-
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());
1.4、生产者拦截器
拦截器(Interceptor)是在消息在序列化和反序列化过程中对消息进行处理的组件,它是在消息生产者和消费者与Kafka集群之间的一个拦截点,可以在消息发送前和消费之后对消息进行一些定制化的操作。
Kafka拦截器有两种类型:
- 生产者拦截器
- 消费者拦截器
拦截器是Producer和Consumer的一个公共接口,分别对应两个子接口:ProducerInterceptor和ConsumerInterceptor。
- ProducerInterceptor
- ConsumerInterceptor
- onSend方法会在消息被序列化以前和封装成ProducerRecord对象之后调用,可以利用该方法对消息进行定制化操作,比如修改消息的某些内容,或者增加消息的头部信息等。
生产者拦截器示例
public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(
ProducerRecord<String, String> record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(),
record.partition(), record.timestamp(),
record.key(), modifiedValue, record.headers());
// if (record.value().length() < 5) {
// throw new RuntimeException();
// }
// return record;
}
@Override
public void onAcknowledgement(
RecordMetadata recordMetadata,
Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure++;
}
}
@Override
public void close() {
double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 发送成功率="
+ String.format("%f", successRatio * 100) + "%");
}
@Override
public void configure(Map<String, ?> map) {
}
}
在KafkaProducer的配置参数中指定拦截器的全限定类名,即interceptor.classes属性。
配置方式:
-
props.put(“interceptor.classes”, “com.supanpan.kafka.demo.interceptor.ProducerInterceptorPrefix”);
-
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName())
KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器形成拦截链,
多个拦截器的执行顺序与它们在配置文件中的顺序一致,即先配置的拦截器先执行,后配置的拦截器后执行,
配置的时候,各个拦截器之间使用逗号隔开
1.5、原理分析
整个生产者客户端由两个线程协调运行,这两个线程分别是main线程(主线程)和Sender线程(发送线程)。
- main线程负责接收客户端的请求,将请求转发给Sender线程,然后等待Sender线程的响应结果。
- Sender线程负责从RecordAccumulator中拉取消息批次(Batch),并将消息批次发送给Kafka集群。
RecordAccumulator
-
RecordAccumulator内部维护了一个消息缓冲区,该缓冲区由多个消息批次组成,每个消息批次中可以存放多条消息。
-
RecordAccumulator内部的消息缓冲区中的消息批次是按照消息的topic和partition进行组织的,即每个topic-partition对应一个消息批次。
消息发送流程
- KafkaProducer.send()方法将消息发送给KafkaProducer内部的RecordAccumulator(消息累加器)。
- KafkaProducer内部的Sender线程不断从RecordAccumulator中拉取消息批次(Batch),并将消息批次发送给Kafka集群。
- KafkaProducer内部的Sender线程将消息批次发送给Kafka集群后,会根据Kafka集群的响应结果,对消息批次中的消息进行分类,分为发送成功的消息和发送失败的消息。
- KafkaProducer内部的Sender线程会将发送失败的消息重新放入RecordAccumulator中,等待下次发送。
- KafkaProducer内部的Sender线程会将发送成功的消息提交给RecordAccumulator,RecordAccumulator会将消息从消息缓冲区中移除。
消息发送失败的情况
- 消息发送失败的情况
- 消息发送失败的情况主要有两种:
- 对于第一种情况,KafkaProducer内部的Sender线程会将发送失败的消息重新放入RecordAccumulator中,等待下次发送。
- 对于第二种情况,KafkaProducer内部的Sender线程会将发送失败的消息放入RecordAccumulator中,但是不会重试发送,因为这种情况下消息是不可恢复的。
- 消息发送失败的处理
1.6、重要的生产者参数
- acks
- max.request.size
- retries & retry.backoff.ms
- compression.type
- connections.max.idle.ms
- linger.ms
- 该参数用来指定生产者在发送消息前等待一段时间,希望可以等到更多的消息一起发送,以减少网络请求的次数,从而提升性能,默认值为0,即立即发送
- 参数配置方式
- properties.put(“linger.ms”, “1000”);
- properties.put(ProducerConfig.LINGER_MS_CONFIG, 1000);
- send.buffer.bytes
- 该参数用来指定Socket发送消息缓冲区(SO_SNDBUF)大小,默认值为131072字节,即128KB
- 参数配置方式
- properties.put(“send.buffer.bytes”, “131072”);
- properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);
- 该参数用来指定生产者发送消息到Kafka集群时等待响应的最大时间,默认值为30000,即30秒
- 参数配置方式
原文地址:https://blog.csdn.net/weixin_45688141/article/details/134528036
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_8463.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!