本文介绍: 11qwe
1. 安装pykafka
pip install pykafka
2. 生产者
from pykafka import KafkaClient
def get_kafka_producer(hosts, topics):
client = KafkaClient(hosts=hosts)
print(client.topics)
topic = client.topics[topics]
producer = topic.get_producer()
return producer
测试
hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
producer = get_kafka_producer(hosts, topics)
for i in range(10):
msg = "test message " + str(i)
# msg = bytes(msg, encoding='utf-8')
# producer.produce(msg)
producer.produce(msg.encode())
producer.stop()
3. 消费者
def get_kafka_consumer(hosts, topics):
client = KafkaClient(hosts=hosts)
topic=client.topics[topics]
consumer = topic.get_balanced_consumer(consumer_group='test_kafka_topic', auto_commit_enable=True,
zookeeper_connect='192.168.20.201:2181,192.168.20.202:2181,192.168.20.203:2181',
managed=True, consumer_timeout_ms=1000)
# managed=True,即使用新式reblance分区方法,不需zk;managed=False则需通过zk来实现reblance
return consumer
测试
hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092'
topics = "test_kafka_topic"
consumer = get_kafka_consumer(hosts, topics)
for msg in consumer:
print(msg)
if msg is not None:
print(msg.offset)
print(msg.value)
原文地址:https://blog.csdn.net/MusicDancing/article/details/135609740
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_58056.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。