<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: 112.74.55.160:9092,112.74.55.160:9093,112.74.55.160:9094
producer:
# 消息重发的次数。
retries: 0
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
private static final String TOPIC_NAME = "user.register.topic";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送消息
* @param phone
*/
@GetMapping("/api/user/{phone}")
public void sendMessage1(@PathVariable("phone") String phone) {
kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}