目录

SpringBoot集成Kafka

目录

SpringBoot集成Kafka

1、maven依赖

<!-- 无需指定版本,跟随springboot版本自动引入适配版本 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、application.yml 配置

spring:
  kafka:
    bootstrap-servers: 10.10.62.32:31175,10.10.62.32:31032,10.10.62.32:32224
    consumer:
      group-id: 'default-group-id'
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
      max-poll-records: 100
      auto-offset-reset: latest
    listener:
      ack-mode: manual
      type: BATCH
    properties:
      security.protocol: PLAINTEXT
      #sasl.mechanism: PLAIN
    #jaas:
      #config: #config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Wt1g2+UNRPnV";'

3、消息生产者Producer

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;


@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        //异步消息
        ListenableFuture<SendResult<String, String>> asyncResult = kafkaTemplate.send(topic, message);
        asyncResult.addCallback(
                success -> {
                    String topic1 = success.getRecordMetadata().topic();
                    int partition = success.getRecordMetadata().partition();
                    long offset = success.getRecordMetadata().offset();
                    System.out.println("异步发送成功: topic=" + topic + ", partition=" + partition + ", offset=" + offset);
                },
                failure -> {
                    System.out.println("异步发送失败: " + failure.getMessage());
                    // 可以记录日志、重试、告警等
                }
        );

        //同步消息
        try {
            SendResult<String, String> syncResult =  kafkaTemplate.send(topic, message).get();
            System.out.println("同步发送成功: " + syncResult.getRecordMetadata());
        } catch (Exception e) {
            System.out.println("发送失败: " + e.getMessage());
        }
    }
}

4、消息消费者Consumer

import org.example.springboot.starter.common.User;
import org.springframework.stereotype.Service;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;

import java.util.List;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "policy-sr-group")
    public void listen(String message, Acknowledgment ack) {
        try {
            System.out.println("消费者Received Message: " + message);
            // 处理业务逻辑
        } finally {
            // 手动提交 offset
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = "user-topic", groupId = "policy-sr-group")
    public void listenUser(List<User> user, Acknowledgment ack) {
        System.out.println("消费者Received User: " + user);
        ack.acknowledge();
    }

}

🧩 @KafkaListener 核心属性一览

属性类型说明
topicsString[]监听的 topic 名称
topicPatternString正则表达式匹配 topic(动态监听)
idString消费者 ID(用于日志、管理)
groupIdString消费者组 ID(覆盖配置文件)
concurrencyString并发线程数(控制吞吐)
containerFactoryString指定 KafkaListenerContainerFactory
errorHandlerClass自定义错误处理器
ackModeString手动提交模式(配合 Acknowledgment)
clientIdPrefixString客户端 ID 前缀
batchboolean是否启用批量消费
propertiesString[]动态设置消费者属性