目录

Kafka中的Metadata

Kafka中的Metadata

强烈推荐去看 ,写的太详细了。这里只做个人学习笔记。

1.Metadata

Metadata 是指 Kafka 集群的元数据,包含了 Kafka 集群的各种信息:

public class Metadata implements Closeable {
    //打印日志
    private final Logger log;
    //retry.backoff.ms: 默认值为100ms,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。
    private final long refreshBackoffMs;
    //metadata.max.age.ms: 默认值为300000,如果在这个时间内元数据没有更新的话会被强制更新。
    private final long metadataExpireMs;
    //更新版本号,每更新成功1次,version自增1,主要是用于判断metadata是否更新
    private int updateVersion;  // bumped on every metadata response
    //请求版本号,每发送一次请求,version自增1
    private int requestVersion; // bumped on every new topic addition
    //上一次更新的时间(包含更新失败)
    private long lastRefreshMs;
    //上一次更新成功的时间
    private long lastSuccessfulRefreshMs;
    private KafkaException fatalException;
    //非法的topics
    private Set<String> invalidTopics;
    //未认证的topics
    private Set<String> unauthorizedTopics;
    //// 元数据信息的Cache缓存
    private MetadataCache cache = MetadataCache.empty();
    private boolean needFullUpdate;
    private boolean needPartialUpdate;
    //会收到metadata updates的Listener列表
    private final ClusterResourceListeners clusterResourceListeners;
    private Boolean isClosed;
    // 存储Partition最近一次的leaderEpoch
    private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
public class MetadataCache {
    //kafka集群中的node、topic信息
    private final String clusterId;
    private final Map<Integer, Node> nodes;
    private final Set<String> unauthorizedTopics;
    private final Set<String> invalidTopics;
    private final Set<String> internalTopics;
    private final Node controller;
    private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
    private final Map<String, Uuid> topicIds;

    private Cluster clusterInstance;
public final class Cluster {
    
    private final boolean isBootstrapConfigured;
    //集群的节点列表node
    private final List<Node> nodes;
    //未认证的topic
    private final Set<String> unauthorizedTopics;
    //不合理的topic
    private final Set<String> invalidTopics;
    //内部topic
    private final Set<String> internalTopics;

    private final Node controller;
    // partition对应的信息,如:leader所在节点、所有的副本、ISR中的副本、offline的副本
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    topic和partition对应的信息
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    //topic和可用partition(leader不为null)的对应关系
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    //node和partition的对应关系
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    //节点id和节点的对应关系map
    private final Map<Integer, Node> nodesById;
    //集群信息
    private final ClusterResource clusterResource;
    private final Map<String, Uuid> topicIds;
    private final Map<Uuid, String> topicNames;
public class ClusterResource {

    private final String clusterId;
public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
    private final Node[] offlineReplicas;

Metadata的数据结构主要包含了集群的各种信息,包括节点有哪些、topic有哪些、在哪些partition上,partition的leader在哪个节点上,副本有哪些。

2.Metadata的应用场景

1.比如发送消息时,需要知道分区的情况、topic要发送到哪个分区上,目标分区的leader在哪个节点上这些信息都从metadata里获取。

2.kafka集群中发生了leader选举,或partition的副本发生变化,都需要更新metadata中的数据。

3.Metadata何时更新,更新流程

doSend()方法中调用waitOnMetadata()方法等待更新集群的元数据。(但如果超过了配置的时间metadataExpireMs没有更新, 就会强制更新)。
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
        //获取集群的元数据
        Cluster cluster = this.metadata.fetch();
        if (cluster.invalidTopics().contains(topic)) {
        //判断topic是否合法,如果topic分配到的partition没有leader就是非法
            throw new InvalidTopicException(topic);
        } else {
            //将topic加入到元数据中topic的列表,并设置过期时间为nowMs微秒,如果topic列表中不存在该topic,那么强制更新metadata并设置更新版本requestVersion+1,并将lastRefreshMs时间更新为0,needUpdate设置为true;
            this.metadata.add(topic, nowMs);
            //获取给定topic的分区数量
            Integer partitionsCount = cluster.partitionCountForTopic(topic);
            //如果没有获取到分区数量或指定了分区但是分区数量超过获取的分区数量进入判断 否则直接返回缓存中的cluster信息
            if (partitionsCount == null || partition != null && partition >= partitionsCount) {
                //等待更新metadata的最大时间
                long remainingWaitMs = maxWaitMs;
                long elapsed = 0L;
                
                //循环重试下边的代码,一直等待metadata的更新,直到metadata中有申请的topic和metadata信息,或超过最大等待时间
                do {
                    if (partition != null) {
                        this.log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
                    } else {
                        this.log.trace("Requesting metadata update for topic {}.", topic);
                    }
                    //尝试添加topic
                    this.metadata.add(topic, nowMs + elapsed);
                    //获取上次更新的version,并将needUpdate设为true,强制更新
                    int version = this.metadata.requestUpdateForTopic(topic);
                    //唤醒sender线程, sender线程又换唤醒NetworkClient线程,并发送updateMetadataRequest请求
                    this.sender.wakeup();

                    try {
                        //等待更新metadata,直到当前的updateVersion大于上次的updateVersion或timeout
                        this.metadata.awaitUpdate(version, remainingWaitMs);
                    } catch (TimeoutException var15) {
                        throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs));
                    }
                    //从缓存后去最新的cluster信息
                    cluster = this.metadata.fetch();
                    elapsed = this.time.milliseconds() - nowMs;
                    if (elapsed >= maxWaitMs) {
                        throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs));
                    }

                    this.metadata.maybeThrowExceptionForTopic(topic);
                    remainingWaitMs = maxWaitMs - elapsed;
                    partitionsCount = cluster.partitionCountForTopic(topic);
                } while(partitionsCount == null || partition != null && partition >= partitionsCount);

                return new ClusterAndWaitTime(cluster, elapsed);
            } else {
                return new ClusterAndWaitTime(cluster, 0L);
            }
        }
    }

总结上述流程:

  • 首先会从缓存中获取 cluster 信息,并从中获取 partition 信息,如果可以取到则返回当前的 cluster 信息,如果不含有所需要的 partition 信息时就会更新 metadata;
  • 更新 metadata 的操作会在一个 do ….while 循环中进行,直到 metadata 中含有所需 partition 的信息,该循环中主要做了以下事情:
    • 调用 metadata.requestUpdateForTopic() 方法来获取 updateVersion,即上一次更新成功时的 version,并将 needUpdate 设为 true,强制更新;
    • 调用 sender.wakeup() 方法来唤醒 Sender 线程,Sender 线程中又会唤醒 NetworkClient 线程,在 NetworkClient 中会对 UpdateMetadataRequest 请求进行操作,待会下面会详细介绍;
    • 调用 metadata.awaitUpdate(version, remainingWaitMs) 方法来等待 metadata 的更新,通过比较当前的 updateVersion 与步骤 1 中获取的 updateVersion 来判断是否更新成功

sender.wakeup() 方法来唤醒 Sender 线程,Sender 线程中又会唤醒 NetworkClient 线程,在 NetworkClient 中会对 UpdateMetadataRequest 请求进行操作。在 NetworkClient 中真正处理请求的是 NetworkClient.poll() 方法。

public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        if (!abortedSends.isEmpty()) {
            // If there are aborted sends because of unsupported version exceptions or disconnects,
            // handle them immediately without waiting for Selector#poll.
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }

        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutConnections(responses, updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
    }

这里的maybeUpdate()方法更新metaData,

        public long maybeUpdate(long now) {
            // should we update our metadata?
            //获取下一次更新时间,若needUpdate为true返回0立马更新,否则返回剩余过期时间
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);

            //计算要等待的时间,有正在处理的请求,则返回默认的请求间隔否则返回0
            long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;

            long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
            //大于0说明还需等待一段时间才能更新
            if (metadataTimeout > 0) {
                return metadataTimeout;
            }
            
            // Beware that the behavior of this method and the computation of timeouts for poll() are
            // highly dependent on the behavior of leastLoadedNode.
            Node node = leastLoadedNode(now);
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                return reconnectBackoffMs;
            }

            return maybeUpdate(now, node);
        }

总结Metadata 会在下面两种情况下进行更新:

  • 强制更新:调用 Metadata.requestUpdate() 将 needFullUpdate 置为 true 来强制更新。
  • 周期性更新:通过 Metadata 的 lastSuccessfulRefreshMs 和 metadataExpireMs 来实现,一般情况下,默认周期时间就是 metadataExpireMs,5 分钟时长。

在 NetworkClient 的 poll() 方法调用时,会去检查两种更新机制,只要达到一种,就会触发更新操作。

Metadata 的强制更新会在以下几种情况下进行:

  • initConnect 方法调用时,初始化连接;
  • poll() 方法中对 handleDisconnections() 方法调用来处理连接断开的情况,这时会触发强制更新;
  • poll() 方法中对 handleTimedOutRequests() 来处理请求超时时;
  • 发送消息时,如果无法找到 partition 的 leader;
  • 处理 Producer 响应(handleProduceResponse),如果返回关于 Metadata 过期的异常,比如:没有 topic-partition 的相关 meta 或者 client 没有权限获取其 metadata。

强制更新主要是用于处理各种异常情况。

ZooKeeper 在 Kafka 中扮演着重要的角色,用来存储 Kafka 的元数据。ZooKeeper 存储着 Partition 和 Broker 的元数据 ,同时也负责 Kafka Controller 的选举工作。

对于 Kafka 来讲,ZooKeeper 是一套外部系统,要想部署一套 Kafka 集群,就要同时部署、管理、监控 ZooKeeper。ZooKeeper 有自己的配置方式、管理工具,和 Kafka 完全不一样,所以,一起搞两套分布式系统,自然就提升了复杂度,也更容易出现问题。有时工作量还会加倍,例如要开启一些安全特性,Kafka 和 ZooKeeper 中都需要配置。除了复杂度,外部存储也会降低系统效率。

例如:

  • Kafka 集群每次启动的时候,Controller 必须从 ZooKeeper 加载集群的状态信息。
  • 选举出一个新的 Controller 之后也会比较麻烦,因为需要加载元数据,而此时元数据的量可能已经非常大了,这就产生了效率问题。

所以,ZooKeeper 带来的复杂度、系统效率这两个问题已经成为 Kafka 的痛点,Kafka 团队一直在努力去除对 ZooKeeper 的依赖。Kafka 2.8.0 这个版本终于实现了。

Kafka 2.8.0 版本实现了 Raft 分布式一致性机制,意味着可以脱离 ZooKeeper 独立运行了。使用 Raft 模式之后,元数据、配置信息都会保存在 @metadata 这个 Topic 中,自动在集群中复制。这样 Kafka 就会简单轻巧很多。