org.apache.kafka.clients.producer.internals.DefaultPartitioner
- 自定义分区规则
- 创建类,实现Partitioner接口,重写方法
- 配置 partitioner.class 指定类即可
public class XdclassPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
throw new IllegalArgumentException("key 参数不能为空");
}
if("xdclass".equals(key)){
return 0;
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
/**
* 自定义分区策略
*/
@Test
public void testSendWithPartitionStrategy() {
Properties properties = getProperties();
properties.put("partitioner.class", "net.xdclass.xdclasskafka.config.XdclassPartitioner");
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("xdclass-v1-sp-topic-test", "xdclass", "xdclass-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.err.println("发送状态:" + metadata.toString());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}