- Source来源
- 元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//相同类型元素的数据流 source
//DataStream<String> ds1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
//ds1.print("ds1:");
//相同类型元素的数据流 source
//DataStream<String> ds2 = env.fromCollection(Arrays.asList("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂"));
//ds2.print("ds2:");
DataStream<Long> ds3 = env.fromSequence(1, 10);
ds3.print("ds3:");
//DataStream需要调用execute,可以取个名称
env.execute("source job");
}
- 文件/文件系统
- env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
- 基于Socket
- env.socketTextStream("ip", 8888);
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setParallelism(1);
// DataStream<String> ds = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");
// DataStream<String> textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1", 8888);
stringDataStream.print();
//DataStream需要调用execute,可以取个名称
env.execute("source job");
}