articleList

18-Consumer手工提交offset配置和从头消费配置

2025/03/13 posted in  Kafka
Tags: 

  • 如果需要从头消费partition消息,怎操作?

    • auto.offset.reset 配置策略即可
    • 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费
      props.put("auto.offset.reset", "earliest");
  • 自动提交offset问题

    • 没法控制消息是否正常被消费
    • 适合非严谨的场景,比如日志收集发送
  • 手工提交offset配置和测试

    • 初次启动消费者会请求broker获取当前消费的offset值
  • 手工提交offset

    • 同步 commitSync 阻塞当前线程 (自动失败重试)
    • 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)
public class KafkaConsumerTest {
    
    public static Properties getProperties() {

        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "112.74.55.160:9092");

        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "xdclass-g1");

        //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效
        props.put("auto.offset.reset", "earliest");

        //开启自动提交offset
        //props.put("enable.auto.commit", "true");
        props.put("enable.auto.commit", "false");

        //自动提交offset延迟时间
        //props.put("auto.commit.interval.ms", "1000");

        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }
    
    @Test
    public void simpleConsumerTest() {

        Properties properties = getProperties();

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));

        while (true) {
            //领取时间,阻塞超时时间
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord record : records) {
                System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n", record.topic(), record.offset(), record.key(), record.value());
            }

            //同步阻塞提交offset
            //kafkaConsumer.commitSync();

            if (!records.isEmpty()) {
                //异步提交offset
                kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

                        if (exception == null) {
                            System.err.println("手工提交offset成功:" + offsets.toString());
                        } else {
                            System.err.println("手工提交offset失败:" + offsets.toString());
                        }
                    }
                });
            }
        }
    }
    
}