articleList

13-Kafka 生产者自定义partition分区规则实战

2025/03/13 posted in  Kafka
Tags: 

  • 源码解读默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
  • 自定义分区规则
    • 创建类,实现Partitioner接口,重写方法
    • 配置 partitioner.class 指定类即可
public class XdclassPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        if (keyBytes == null) {
            throw new IllegalArgumentException("key 参数不能为空");
        }

        if("xdclass".equals(key)){
            return 0;
        }

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition

        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 发送消息到自定义分区
    /**
     * 自定义分区策略
     */
    @Test
    public void testSendWithPartitionStrategy() {
        
        Properties properties = getProperties();

        properties.put("partitioner.class", "net.xdclass.xdclasskafka.config.XdclassPartitioner");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("xdclass-v1-sp-topic-test", "xdclass", "xdclass-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.err.println("发送状态:" + metadata.toString());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }