Kafka-高吞吐量消息中间件
Kafka–高吞吐量消息中间件
Kafka特点和优势
官方网站:
Kafka特点与优势
Kafka特点
1.分布式: 多机实现,单机体现不出kafka的性能
2.分区: 一个消息可以拆分成多个小的消息(化整为零),分别存储在多个位置·
3.多副本: 防止信息丢失,可以实现为一个消息增加多个备份
4.多订阅者: 可以有很多应用连接kafka
5.Zookeeper: 早期版本的Kafka依赖于zookeeper,2021年4月19日Kafka 2.8.0正式发布,
此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁来替代ZooKeeper,即Kafka将不再需要ZooKeeper!
Kafka优势
Kafka通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB级别以上的消息存储也能够保持长时间的稳定性能·
Kafka优势
1.复杂度低:
o(1)就是最低的时空复杂度,也就是耗时/耗空间与输入数据大小无关,无论输入数据增大多少倍,耗时/耗空间都不变,
哈希算法就是典型的o(1)时间复杂度,无论数据规模多大,都可以在一次计算后找到目标。
2.高吞吐量:
即使是非常普通的硬件,Kafka也可以支持每秒数百万的消息。支持通过Kafka服务器分区消息。
3.分布式:
Kafka基于分布式集群实现高可用的容错机制,可以实现自动的故障转移。
4.顺序保证:
在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)
5.支持Hadoop并行数据加载。
6.通常用于大数据场合,传递单条消息比较大,而Rabbitmq消息主要是传输业务的指令数据,单条数据较小。
Kafka角色
Producer
Producer:即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker。
Consumer
Consumer: 消费者,用于消费消息,即处理消息。
Broker:
Broker是kafka实例,
每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。
每个kafka集群内的broker都有一个不重复的编号,如: broker-O、broker-1等…...
Topic
Topic:消息的主题,可以理解为消息的分类,一个Topic相当于数据库中的一张表一条消息相当于关系数据库的一条记录,
一个Topic或者相当于Redis中列表类型的一个Key,一条消息即为列表中的一个元素。kafka的数据就保存在topic。
在每个broker上都可以创建多个topic。物理上不同topic的消息分开存储在不同的文件夹,
逻辑上一个topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,
topic在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic才能消费topic中的消息。
不同的消费者只对某个Topic感兴趣
Consumer group
Consumer group:
每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group),
同一topic的一条消息只能被同一个consumer group内的一个consumer消费,
类似于一对一的单播机制,但多个consumer group可同时消费这一消息,类似于一对多的多播机制。
Partition
Partition:
是物理上的概念,每个topic分割为一个或多个partition,即一个topic切分为多份.
创建topic时可指定partition数量,partition的表现形式就是一个一个的文件夹.
该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。
同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数,
注意同一个partition数据是有顺序的,但不同的partition则是无序的。
Replication
Replication:同样数据的副本,包括leader和follower的副本数
基于数据安全,建议至少2个是Kafka的高可靠性的保障,
和ES的副本有所不同,Kafka中的副本数包括主分片数,而ES中的副本数不包括主分片数。
为了实现数据的高可用,比如将分区0的数据分散到不同的kafka节点,
每一个分区都有一个broker作为Leader和一个broker作为Follower,类似于ES中的主分片和副本分片,
假设分区为3,即分三个分区0-2,副本为3,即每个分区都有一个leader,再加两个follower,
分区0的leader为服务器A,则服务器B和服务器C为A的fllower,
而分区1的leader为服务器B,则服务器A和C为服务器B的follower,
而分区2的leader为C,则服务器A和B为C的follower。
分区和副本的优势
分区和副本的优势
1.实现存储空间的横向扩容,即将多个kafka服务器的空间组合利用
2.提升性能,多服务器并行读写
3.实现高可用,每个分区都有一个主分区即leader分布在不同的kafka服务器,并且有对应follower分布在和leader不同的服务器
Kafka写入流程
Kafka部署
官方部署文档:
单机
$ apt update && apt -y install openjdk-8-jdk
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0
# Start the zooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
集群部署
当前版本Kafka依赖Zookeeper服务,但以后将不再依赖
1.环境准备ZooKeeper
安装kafka之前,确保的zookeeper集群节点的启动。
注意:生产中zookeeper和kafka一般是分开独立部署的, kafka安装前需要安装java环境。
2.配置文件说明
Kafka节点配置,配置文件说明
./ conf/server.propertiest
配置文件 ./conf/server.properties内容说明
################################## Server Basics ##################################
broker.id=1 broker的id,值为整数,且必须唯一,在一个集群中不能重复
################################## Socket ServerSettings ###################################
listeners=PLAINTEXT://10.0.0.101:9092 kafka监听端口,默认9092
num.network.threads=3 处理网络请求的线程数量,默认为3个
num.io.threads=8 执行磁盘IO操作的线程数量,默认为8个
socket.send.buffer.bytes=102400 socket服务发送数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400 socket服务接受数据的缓冲区大小,默认100KB
socket.request.max.bytes=104857600 socket服务所能接受的一个请求的最大大小,默认为100M
################################## Log Basics##################################
log.dirs= ../data kafka存储消息数据的目录
num.partitions=1 每个topic默认的partition
default.replication.factor=3 设置副本数量为3,当Leader的Replication故障,会进行故障自动转移。
num.recovery.threads.per.data.dir=1 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
################################## Log FlushPolicy #################################
log.flush.interval.messages=10000 消息刷新到磁盘中的消息条数阙值
log.flush.interval.ms=1000 消息刷新到磁盘中的最大时间间隔,1s
################################## Log RetentionPolicy ##################################
log.retention.hours=168 日志保留小时数,超时会自动删除,默认为7天
log.retention.bytes=1073741824 日志保留大小,超出大小会自动删除,默认为1G
log.segment.bytes=1073741824 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.retention.check.interval.ms=300000 每隔多长时间检测数据是否达到删除条件,300s
################################## zookeeper##################################
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 zk连接信息,如果是zk集群,则以逗号隔开
zookeeper.connection.timeout.ms=6000 连接zookeeper的超时时间,6s
delete.topic.enable=true 是否允许删除topic,默认为false,topic只会标记为marked for deletion
3.各节点部署Kafka
在所有节点上执行安装java
[root@node1 ~]#apt install openjdk-8-jdk -y
在所有节点上执行下载,官方下载
[root@node1 ~]#wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
[root@node1 ~]#wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
解压缩
[root@node1 ~]#tar xf kafka_2.13-2.7.0.tgz -C /usr/local/
[root@node1 ~]#ln -s /usr/local/kafka_2.13-2.7.0/ /usr/local/kafka
配置PATH变量
[root@node1 ~]#echo 'PATH=/usr/local/kafka/bin:SPATH' >/etc/profile.d/kafka.sh
[root@node1 ~]#. /etc/profile.d/kafka.sh
修改配置文件
[root@node1 ~]#vim /usr/loca1/kafka/config/server.properties
broker.id=1 每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的meta.properties文件
listeners=PLAINTEXT://10.0.0.101:9092 指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/loca1/kafka/data kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1 设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=3 指定默认的副本数为3,可以实现故障的自动转移
log.retention.hours=168 设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 设置连接zookeeper的超时时间,单位为ms ,默认6秒钟
准备数据目录
[root@node1 ~]#mkdir /usr/local/kafka/data
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 10.0.0.102:/usr/1ocal/kafka/config
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 10.0.0.103:/usr/local/kafka/config
修改第2个节点配置
[root@node2~]#vim /usr/local/kafka/config/server.properties
broker.id=2 每个broker在集群中的唯一标识,正整数。
listeners=PLAINTEXT://10.0.0.102:9092 指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
修改第2个节点配置
[root@node3~]#vim /usr/local/kafka/config/server.properties
broker.id=3 每个broker在集群中的唯一标识,正整数。
listeners=PLAINTEXT://10.0.0.103:9092 指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
可以调整内存
[root@node1 ~]#vim /usr/local/kafka/bin/kafka-server-start.sh
......
if[ "×$KAFKA_HEAP_OPTs"="x" ] ; then
export KAFKA_HEAP_OPTS=" -Xmx1G -xms16"
fi
.....
4.启动服务
在所有kafka节点执行下面操作
[root@node1 ~-]#kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
5.确保服务启动
[root@node1 ~]#ss -ntl|grep 9092
LISTEN 0 50 [: :ffff:10.0.0.101] : 9092
[root@node1 ~]#tail /usr/local/kafka/logs/server.log
[2024-02-16 12:10:01,276] INFO [ExpirationReaper-1-AlterAc1s]: starting
(kafka.server.DelayedoperationPurgatory$ExpiredoperationReaper)
[2024-02-16 12:10:01,311] INFO [/config/changes-event-process-thread]: starting(kafka . common.zkNodechangeNotificationListener$changeEventProcessThread)
[2024-02-16 12:10:01,332] INFO [Socketserver brokerId=l] starting socket server acceptors and processors(kafka.ne twork. socketserver)
[2024-02-16 12:10:01,339] INFO [socketserver brokerTd=1] started data-plane acceptor and processor(s) forendpoint : ListenerName(PLAINTEXT)(kafka.ne twork. Socketserver)
[2024-02-16 12:10:01,340] INFO [socketserver brokerTd=1] started socket server acceptors and processors(kafka. network. Socketserver)
[2024-02-16 12:10:01,344] INFO Kafka version: 2.7.0 (org.apache.kafka.common .uti1s.AppInfoParser)
[2024-02-16 12:10:01,344] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.uti1s.AppInfoParser)
[2024-02-16 12:10:01,344]INFO Kafka startTimews: 1613448601340 (org.apache .kafka .common.uti1s.AppInfoParser)
[2024-02-16 12:10:01,346] INFo [Kafkaserver id=1] started (kafka. server.KafkaServer)
[2024-02-16 12:10:01,391] IMFO [broker-1-to-controller-send-thread] : Recorded new controller,from now on wi11use broker 1 (kafka.server . BrokerTocontro1lerRequestThread)
如果使用id,还需要修改/usr/local/kafka/data/meta.properties
打开zooinspector可以看到三个id
Broker依赖于Zookeeper,
每个Broker的id和Topic、Partition这些元数据信息都会写入Zookeeper的ZNode节点中
consumer依赖于Zookeeper,
Consumer在消费消息时,每消费完一条消息,会将产生的offset保存到Zzookeeper中,
下次消费在当前offset往后继续消费.kafka0.9之前Consumer的offset存储在Zookeeper中,
kafka0.9以后offset存储在本地。
Partition依赖于Zookeeper,
Partition完成Replication备份后,选举出一个Leader,这个是依托于Zookeeper的选举机制实现的。
准备Kafka的service文件
/lib/systemd/ system/ kafka.service
[root@node1 ~]#cat /lib/systemd/system/kafka.service
[Unit]
Description=Apache kafka
After=network.target
[Service]
Type=simple
#Environment=AVA_HOME=/data/server/java
PIDFile=/usr/local/kafka/ kafka.pid
Execstart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server. propertiesExecstop=/bin/kill -TERM ${MAINPID}
Restart=always
Restartsec=20
[Install]
wantedBy=multi-user.target
[root@node1 ~]#systemctl daemon-load
[root@node1 ~]#systemctl restart kafka.service
Kafka读写数据
常见命令
kafka-topics.sh #消息的管理命令
kafka-console-producer.sh #生产者的模拟命令
kafka-console-consumer.sh #消费者的模拟命令
创建Topic
创建topic名为wang,partitions(分区)为3,replication(每个分区的副本数/每个分区的分区因子)为2
新版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --create --topic wang --bootstrap-server 10.0.0.101:9092 --partitions 3 --replication-factor 2
在各节点上观察生成的相关数据
[root@node1 ~]#ls /usr/local/kafka/data/
[root@node2 ~]#ls /usr/local/kafka/data/
[root@node3 ~]#ls /usr/local/kafka/data/
旧版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper
10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 --partitions 3 --replication-factor 2 --topic wang
created topic wang.
获取Topic
新版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.0.0.101:9092
旧版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper10.0.0.101:2181,10.0.0.102 :2181,10.0.0.103:2181
wang
验证Topic详情
状态说明: wang有三个分区分别为0、1、2,分区0的leader是3(broker.id);分区0有2个副本,并且状态都为 Isr (In-sync,表示可以参加选举成为leader) .
新版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 10.0.0.101:9092 --topi cwang
Topic: wang TopicId: beg6bPXwTowlyp7cuv7F8w PartitionCount: 3 ReplicationFactor: 2 configs:
Topic: wang Partition: 0 Leader: 3 Replicas: 3,1 ISr: 3,1
Topic: wang Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: wang Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
旧版命令
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 --topic wang
Topic: wang Partitioncount: 3 ReplicationFactor: 2 configs:
Topic: wang Partition:0 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: wang Partition:1 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: wang Partition:2 Leader: 2 Replicas: 2,1 Isr: 2,1
生成Topic
kafka-console-producer.sh格式
发送消息命令格式:
kafka-console-producer.sh --broker-list <kafkaIP1>:<端口>,<kafkaIP2> :<端口> --topic <topic名称>
范例
交互式输入消息,按ctrl+C退出
[root@node1 ~]#/usr/local/kafka/bin/kafka-console-producer.sh --broker-list
10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic wang
>message1
>message2
>message3
或者下面方式
[root@node1 ~]#/usr/local/kafka/bin/kafka-console-producer.sh --topic wang --bootstrap-server 10.0.0.101:9092
消费Topic
kafka-console-consumer.sh格式
#接收消息命令格式:
kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称> --from-beginning --consumer-property group.id=<组名称>
注意:
·生产者先生产消息,消费者后续启动也能收到之前生产的消息
·同一个消息在同一个group内的消费者只有被一个消费者消费,
比如:共100条消息,在一个group内有A,B两个消费者,其中A消费50条,B消费另外的50条消息。
从而实现负载均衡,不同group内的消费者则可以同时消费同一个消息
·--from-beginning表示消费前发布的消息也能收到,默认只能收到消费后发布的新消息
交互式持续接收消息,按Ctrl+C退出
[root@node1 ~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic wang --bootstrap-server 10.0.0.102:9092 --from-beginning
message1
message3
message2
一个消息同时只能被同一个组内一个消费者消费(单播机制),实现负载均衡,而不能组可以同时消费同一个消息(多播机制)
[root@node2~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic wang --bootstrap-server 10.0.0.102:9092 --from-beginning --consumer-property group.id=group1
[root@node2~]#/usr/local/kafka/bin/kafka-console-consumer.sh --topic wang --bootstrap-server 10.0.0.102:9092 --from-beginning --consumer-property group.id=group1
删除 Topic
注意:需要修改配置文件server.properties中的delete.topic.enable=true并重启
新版本
[root@node3 ~]#/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server10.0.0.101:9092,10.0.0.102:9092,10.0.0.103:9092 --topic wang
旧版本
[root@node3 ~-]#/usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 --topic wang
Topic wang is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
范例:删除zk下面topic test
无需修改配置文件server.properties,此方法很危险
[root@zookeeper-node1 ~]#zkCli.sh -server 10.0.0.103:2181
[zk: 10.0.0.103:2181(CONNECTED)0] ls /brokers/topics
[zk: 10.0.0.103:2181(CONNECTED)0] delete all /brokers/topics/test
[zk: 10.0.0.103:2181(CONNECTED)0] ls /brokers/topics
Docker容器部署
创建zookeeper容器
拉取镜像
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
创建kafka容器
拉取镜像
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=10.0.0.206 \
--env KAFKA_ZOOKEEPER_CONNECT=10.0.0.206:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.206:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
查看kafka启动日志
查看zk和kafka相关进程
ss -tnulp
Java创建生产者消费者
生产者生产Topic
ProducerQuickStart.java
package com.lei.kafkademo.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerQuickStart {
public static void main(String[] args) {
//1.kafka连接配置信息
Properties properties = new Properties();
//配置连接地址
properties.put("bootstrap.servers", "10.0.0.206:9092");
//key和value的序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.创建kafka生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//3.发送消息
/**
* 第一个参数:topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
// ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic-first", "key-001","hello kafka");
ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic-leilei", "key-002","hello leilei");
producer.send(record);
//4.关系消息通道
producer.close();
}
}
查看生产者创建的topics
找到kafka的容器id
进入到kafka容器内
docker exec -it 4141a15d3c76 /bin/bash
找到kafka的脚本文件
find / -name "*.sh" |grep kafka
kafka-topics.sh --bootstrap-server localhost:9092 --list
消费者消费topic
ConsumerQuickStart.java
package com.lei.kafkademo.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties properties = new Properties();
//配置连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.0.0.206:9092");
//key和value反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
//2.创建消费者对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
//3.订阅主题
consumer.subscribe(Collections.singletonList("topic-leilei"));
//4.拉取消息
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
消费者组
消费者组是 Kafka 中用于协调多个消费者(Consumer)共同消费同一个主题(Topic)数据的机制。
每个消费者组由一个或多个消费者实例组成,这些消费者实例会共同处理分配给这个组的分区消息。
Kafka 消费者组是 Kafka 中实现消息消费负载均衡、并发处理以及容错机制的重要手段。通过消费者组,多个消费者可以协同处理同一个主题的数据,确保高效的消息消费和分区管理,同时支持多个应用程序独立处理相同的消息流。
消费者组的工作原理
消费者组的工作原理
1.组内分区分配:
Kafka 将主题的每个分区分配给同一个消费者组中的消费者。
每个分区只能被同一个消费者组内的一个消费者消费,确保消息不会重复消费。
2.多分区并发消费:
当多个消费者组成同一个消费者组时,Kafka 会将主题中的分区划分给组内的消费者,
每个消费者负责处理一部分分区的消息。这种机制允许通过增加消费者来提高消费能力。
3.组协调器(Group Coordinator):
Kafka 依赖一个称为组协调器的组件来管理消费者组的加入、离开和分区分配。
组协调器是 Kafka Broker 角色的一部分,负责确保分配消费者给分区的一致性。
消费者的工作流程
消费者的工作原理
1.订阅主题:
消费者通过订阅特定的主题来获取消息。
2.读取消息:
消费者从分区中读取消息,可以选择按偏移量读取(默认)或从特定时间点读取。
3.提交偏移量:
消费者在处理完消息后,需要提交偏移量以记录已处理的消息。提交可以是自动的(自动提交)或手动的(手动提交)。
4.消费者组:
如果多个消费者属于同一组,Kafka会为每个消费者分配不同的分区,以实现负载均衡和并发处理。