本文介绍: 【代码】Rust消费kafka。
use futures::stream::StreamExt; // 引入 StreamExt 以使用 next() 方法
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Message};
async fn run_consumer() -> KafkaResult<()> {
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "test_group")
.set("bootstrap.servers", "localhost:9092")
.set("enable.auto.commit", "true")
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&["test_topic"]).expect("Can't subscribe to specified topics");
let mut message_stream = consumer.stream();
while let Some(message) = message_stream.next().await {
match message {
Ok(m) => {
match m.payload_view::<str>() {
Some(Ok(payload)) => {
println!("Key: '{:?}', Payload: '{}'", m.key(), payload);
}
Some(Err(e)) => {
eprintln!("Error while deserializing message payload: {:?}", e);
}
None => {
println!("Key: '{:?}', Payload: <empty>", m.key());
}
}
consumer.commit_message(&m, CommitMode::Async)?;
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
Ok(())
}
fn main() {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(run_consumer()).unwrap();
}
[dependencies]
rdkafka = “0.36.2”
tokio = { version = “1.36.0”, features = [“full”] }
futures = “0.3.30”
[target.x86_64-unknown-linux-musl]
linker = “x86_64-linux-musl-gcc”
原文地址:https://blog.csdn.net/t1g2q3/article/details/136038797
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_68037.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。