目录

Python中Kafka如何消费字符串消息

Python中Kafka如何消费字符串消息

[https://csdnimg.cn/release/blogv2/dist/pc/img/activeVector.png VibeCoding·九月创作之星挑战赛 10w+人浏览 1.6k人参与

https://csdnimg.cn/release/blogv2/dist/pc/img/arrowright-line-White.png]( )

核心代码

from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'string-consumer-group',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['user-events'])

try:
    for _ in range(5):  # 消费5条消息
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                print(f"❌ 消费错误: {msg.error()}")
            continue

        print(f"📨 收到消息: {msg.value().decode('utf-8')}")

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

代码解释

这段代码是一个最小可运行的 Kafka 消费者示例,用 Python 写的,依赖的是 confluent-kafka(C 语言封装的高性能客户端)。
逐行拆解如下:


  1. 导入
from confluent_kafka import Consumer, KafkaError
  • Consumer:真正负责与 Kafka Broker 通信的类。
  • KafkaError:里面预定义了所有 Kafka 返回的错误码,方便我们做判断。

  1. 创建消费者实例
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',   # 集群地址
    'group.id': 'string-consumer-group',     # 消费组 id
    'auto.offset.reset': 'earliest'          # 无偏移量时从最早开始
})

参数解释

字段含义补充
bootstrap.servers初始连哪台 broker可写多个,用逗号分隔
group.id消费组唯一标识同一 group 内的消费者会负载均衡地消费分区
auto.offset.reset当该组在 Kafka 中没有已提交偏移量时的行为earliest(从头)、latest(从新)、none(抛异常)

  1. 订阅主题
consumer.subscribe(['user-events'])
  • 支持正则:subscribe(pattern='user-.*')
  • 支持手动分配分区:consumer.assign([TopicPartition('user-events',0)])
    这里采用自动订阅方式,Kafka 会自动把该主题的分区平均分配给同组内的各个消费者。

  1. 循环拉消息
for _ in range(5):          # 只演示拉 5 条
    msg = consumer.poll(1.0)  # 超时 1 秒
  • poll非阻塞的,返回一条消息或 None。
  • 超时单位是,可以填 0 立即返回。
  • 返回的对象是 Message,具有一系列属性:
常用属性说明
msg.value()字节串形式的 payload
msg.key()字节串形式的 key
msg.partition()所在分区号
msg.offset()该分区内偏移量
msg.timestamp()时间戳
msg.error()如果 broker 返回错误,这里会拿到一个 KafkaError 实例

  1. 错误处理
if msg.error():
    if msg.error().code() != KafkaError._PARTITION_EOF:
        print(f"❌ 消费错误: {msg.error()}")
    continue
  • _PARTITION_EOF 不是致命错误,只是读到分区末尾,这里选择忽略。
  • 其他错误(如 LeaderNotAvailable、OffsetOutOfRange)会被打印出来。

  1. 业务处理
print(f"📨 收到消息: {msg.value().decode('utf-8')}")
  • 把字节串按 UTF-8 解码成字符串。
  • 实际业务里这里会写 JSON 反序列化、写数据库、调用 RPC 等。

  1. 优雅退出
except KeyboardInterrupt:
    pass
finally:
    consumer.close()
  • KeyboardInterrupt 对应 Ctrl-C。
  • close() 会做:
    – 自动触发一次同步提交(enable.auto.commit=true 时)。
    – 释放 socket 连接、心跳线程等资源。
    不写 close() 的话,进程直接退出可能导致重复消费rebalance 抖动

  1. 你可能还关心的点
  • 提交方式
    – 默认每 5 秒自动提交一次(enable.auto.commit=true, auto.commit.interval.ms=5000)。
    – 也可手动 consumer.commit(msg) 实现精确一次
  • 多线程:confluent-kafka 的 Consumer 不是线程安全的,一个实例只能给一条线程用。
  • 性能调优
    – fetch.min.bytes + fetch.wait.max.ms 可批量拉取,提高吞吐。
    – max.poll.records 可一次返回多条,减少网络往返。

一句话总结
这段脚本启动了一个消费组为 string-consumer-group 的消费者,从 user-events 主题的最旧位置开始,拉取 5 条消息并打印,然后无论成功或 Ctrl-C 都能安全退出。