Python中Kafka如何消费字符串消息
目录
Python中Kafka如何消费字符串消息
[
VibeCoding·九月创作之星挑战赛
10w+人浏览
1.6k人参与
](
)
核心代码
from confluent_kafka import Consumer, KafkaError
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'string-consumer-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['user-events'])
try:
for _ in range(5): # 消费5条消息
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
print(f"❌ 消费错误: {msg.error()}")
continue
print(f"📨 收到消息: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
代码解释
这段代码是一个最小可运行的 Kafka 消费者示例,用 Python 写的,依赖的是 confluent-kafka(C 语言封装的高性能客户端)。
逐行拆解如下:
- 导入
from confluent_kafka import Consumer, KafkaError
Consumer
:真正负责与 Kafka Broker 通信的类。KafkaError
:里面预定义了所有 Kafka 返回的错误码,方便我们做判断。
- 创建消费者实例
consumer = Consumer({
'bootstrap.servers': 'localhost:9092', # 集群地址
'group.id': 'string-consumer-group', # 消费组 id
'auto.offset.reset': 'earliest' # 无偏移量时从最早开始
})
参数解释
字段 | 含义 | 补充 |
---|---|---|
bootstrap.servers | 初始连哪台 broker | 可写多个,用逗号分隔 |
group.id | 消费组唯一标识 | 同一 group 内的消费者会负载均衡地消费分区 |
auto.offset.reset | 当该组在 Kafka 中没有已提交偏移量时的行为 | earliest(从头)、latest(从新)、none(抛异常) |
- 订阅主题
consumer.subscribe(['user-events'])
- 支持正则:
subscribe(pattern='user-.*')
- 支持手动分配分区:
consumer.assign([TopicPartition('user-events',0)])
这里采用自动订阅方式,Kafka 会自动把该主题的分区平均分配给同组内的各个消费者。
- 循环拉消息
for _ in range(5): # 只演示拉 5 条
msg = consumer.poll(1.0) # 超时 1 秒
poll
是非阻塞的,返回一条消息或 None。- 超时单位是秒,可以填 0 立即返回。
- 返回的对象是
Message
,具有一系列属性:
常用属性 | 说明 |
---|---|
msg.value() | 字节串形式的 payload |
msg.key() | 字节串形式的 key |
msg.partition() | 所在分区号 |
msg.offset() | 该分区内偏移量 |
msg.timestamp() | 时间戳 |
msg.error() | 如果 broker 返回错误,这里会拿到一个 KafkaError 实例 |
- 错误处理
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
print(f"❌ 消费错误: {msg.error()}")
continue
_PARTITION_EOF
不是致命错误,只是读到分区末尾,这里选择忽略。- 其他错误(如 LeaderNotAvailable、OffsetOutOfRange)会被打印出来。
- 业务处理
print(f"📨 收到消息: {msg.value().decode('utf-8')}")
- 把字节串按 UTF-8 解码成字符串。
- 实际业务里这里会写 JSON 反序列化、写数据库、调用 RPC 等。
- 优雅退出
except KeyboardInterrupt:
pass
finally:
consumer.close()
KeyboardInterrupt
对应 Ctrl-C。close()
会做:
– 自动触发一次同步提交(enable.auto.commit=true 时)。
– 释放 socket 连接、心跳线程等资源。
不写close()
的话,进程直接退出可能导致重复消费或rebalance 抖动。
- 你可能还关心的点
- 提交方式:
– 默认每 5 秒自动提交一次(enable.auto.commit=true, auto.commit.interval.ms=5000)。
– 也可手动consumer.commit(msg)
实现精确一次。 - 多线程:confluent-kafka 的 Consumer 不是线程安全的,一个实例只能给一条线程用。
- 性能调优:
– fetch.min.bytes + fetch.wait.max.ms 可批量拉取,提高吞吐。
– max.poll.records 可一次返回多条,减少网络往返。
一句话总结
这段脚本启动了一个消费组为 string-consumer-group 的消费者,从 user-events 主题的最旧位置开始,拉取 5 条消息并打印,然后无论成功或 Ctrl-C 都能安全退出。