articleList

10-Flink 预定义的Source 数据源 案例实战

2025/03/13 posted in  Flink
Tags: 

  • Source来源
    • 元素集合
      • env.fromElements
      • env.fromColletion
      • env.fromSequence(start,end);
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //相同类型元素的数据流 source
        //DataStream<String> ds1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
        //ds1.print("ds1:");

        //相同类型元素的数据流 source
        //DataStream<String> ds2 = env.fromCollection(Arrays.asList("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂"));
        //ds2.print("ds2:");

        DataStream<Long> ds3 = env.fromSequence(1, 10);
        ds3.print("ds3:");

        //DataStream需要调用execute,可以取个名称
        env.execute("source job");
    }
  • 文件/文件系统
    • env.readTextFile(本地文件);
    • env.readTextFile(HDFS文件);
  • 基于Socket
    • env.socketTextStream("ip", 8888);
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setParallelism(1);

        //  DataStream<String> ds = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");

        //  DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");

        DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1", 8888);
        stringDataStream.print();
        
        //DataStream需要调用execute,可以取个名称
        env.execute("source job");
    }