spring:
kafka:
bootstrap-servers: 112.74.55.160:9092,112.74.55.160:9093,112.74.55.160:9094
producer:
# 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
#retries: 1
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
#acks: all
#事务id
transaction-id-prefix: xdclass-tran
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 4
#listner负责ack,手动调用Acknowledgment.acknowledge()后立即提交
ack-mode: manual_immediate
#避免出现主题未创建报错
missing-topics-fatal: false
/**
* 注解方式的事务
* @param num
*/
@GetMapping("/api/v1/tran1")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage2(int num){
kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 1 i="+num);
if(num == 0){
throw new RuntimeException();
}
kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 2 i="+num);
}
/**
* 声明式事务
* @param num
*/
@GetMapping("/api/v1/tran2")
public void sendMessage3( int num){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
@Override
public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) {
kafkaOperations.send(TOPIC_NAME,"这个是事务消息 1 i="+num);
if(num == 0){
throw new RuntimeException();
}
kafkaOperations.send(TOPIC_NAME,"这个是事务消息 2 i="+num);
return true;
}
});
}