<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!--flink客户端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志输出-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--json依赖包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
- 日志配置 resources/log4j.properties
### 配置appender名称
log4j.rootLogger = debugFile, errorFile
### debug级别以上的日志到:src/logs/flink.log
log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.debugFile.File = src/logs/flink.log
log4j.appender.debugFile.Append = true
#Threshold属性指定输出等级
log4j.appender.debugFile.Threshold = info
log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
### error级别以上的日志 src/logs/error.log
log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorFile.File = src/logs/error.log
log4j.appender.errorFile.Append = true
log4j.appender.errorFile.Threshold = error
log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//相同类型元素的数据流 source
DataStream<String> stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
stringDS.print("处理前");
// FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型
DataStream<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> collector) throws Exception {
String[] arr = value.split(",");
for (String str : arr) {
collector.collect(str);
}
}
});
//输出 sink
flatMapDS.print("处理后");
//DataStream需要调用execute,可以取个名称
env.execute("flat map job");
}
// 处理前> java,SpringBoot
// 处理后> java
// 处理后> SpringBoot
// 处理前> spring cloud,redis
// 处理后> spring cloud
// 处理后> redis
// 处理前> kafka,小滴课堂
// 处理后> kafka
// 处理后> 小滴课堂
- 注意
- Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,案例都会优先使用DataStream流式API