articleList

29-Kafka事务消息-整合SpringBoot实战

2025/03/13 posted in  Kafka
Tags: 

  • Kafka 从 0.11 版本开始引入了事务支持
    -事务可以保证对多个分区写入操作的原子性

    • 操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能
  • 配置

spring:
  kafka:
    bootstrap-servers: 112.74.55.160:9092,112.74.55.160:9093,112.74.55.160:9094
    producer:
      # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      #retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      #acks: all
      #事务id
      transaction-id-prefix: xdclass-tran
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 4
      #listner负责ack,手动调用Acknowledgment.acknowledge()后立即提交
      ack-mode: manual_immediate
      #避免出现主题未创建报错
      missing-topics-fatal: false
  • SpringBoot代码编写
    /**
     * 注解方式的事务
     * @param num
     */
    @GetMapping("/api/v1/tran1")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage2(int num){

        kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 1 i="+num);

        if(num == 0){
            throw new RuntimeException();
        }
        kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 2 i="+num);

    }


    /**
     * 声明式事务
     * @param num
     */
    @GetMapping("/api/v1/tran2")
    public void sendMessage3( int num){

        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) {

                kafkaOperations.send(TOPIC_NAME,"这个是事务消息 1 i="+num);

                if(num == 0){
                    throw new RuntimeException();
                }
                kafkaOperations.send(TOPIC_NAME,"这个是事务消息 2 i="+num);
                return true;
            }
        });
    }