articleList

17-Flink Source 读取消息队列Kafka连接器整合实战

2025/03/13 posted in  Flink
Tags: 

  • 之前自定义SourceFunction,Flink官方也有提供对接外部系统的,比如读取Kafka

  • flink官方提供的连接器

    • 添加依赖
     <!--kafka connector-->
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-kafka_${scala.version}</artifactId>
         <version>${flink.version}</version>
     </dependency>
    
  • 编写代码

    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "localhost:9092");
        //组名
        props.setProperty("group.id", "video-order-group");
        //字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //自动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测一下Kafka的分区变化情况
        props.setProperty("flink.partition-discovery.interval-millis", "10000");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("xdclass-topic", new SimpleStringSchema(), props);

        //设置从记录的消费者组内的offset开始消费
        consumer.setStartFromGroupOffsets();

        DataStream<String> kafkaDS = env.addSource(consumer);

        kafkaDS.print("kafka:");

        //DataStream需要调用execute,可以取个名称
        env.execute("kafka source job");
    }