articleList

27-Springboot项目整合spring-kafka依赖发送消息

2025/03/13 posted in  Kafka
Tags: 

  • 添加pom文件
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 配置文件修改增加生产者信息
spring:
  kafka:
    bootstrap-servers: 112.74.55.160:9092,112.74.55.160:9093,112.74.55.160:9094
    producer:
      # 消息重发的次数。
      retries: 0
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
  • 发送消息
    private static final String TOPIC_NAME = "user.register.topic";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 发送消息
     * @param phone
     */
    @GetMapping("/api/user/{phone}")
    public void sendMessage1(@PathVariable("phone") String phone) {
        kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }