articleList

28-Springboot项目整合spring-kafka监听消费消息

2025/03/13 posted in  Kafka
Tags: 

  • 配置文件修改增加消费者信息
spring:
  kafka:
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      #手工ack,调用ack后立刻提交offset
      ack-mode: manual_immediate
      #容器运行的线程数
      concurrency: 4
  • 代码编写
@Component
public class MQListener {
    /**
     *  消费监听
     * @param record
     */
    @KafkaListener(topics = {"user.register.topic"},groupId = "xdlcass-test-gp")
    public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        // 打印出消息内容
        System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        ack.acknowledge();
    }
}