articleList

03-⻜速体验第⼀个Flink案例

2025/03/13 posted in  Flink
Tags: 

  • 创建项目

    • maven项目
    • jdk8或11
  • 添加依赖

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.13.1</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>

        <!--flink客户端-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--scala版本-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--java版本-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--streaming的scala版本-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--streaming的java版本-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <!--日志输出-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <!--json依赖包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.44</version>
        </dependency>
    </dependencies>
  • 日志配置 resources/log4j.properties
### 配置appender名称
log4j.rootLogger = debugFile, errorFile

### debug级别以上的日志到:src/logs/flink.log
log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.debugFile.File = src/logs/flink.log
log4j.appender.debugFile.Append = true
#Threshold属性指定输出等级
log4j.appender.debugFile.Threshold = info
log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout
log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %n%m%n

### error级别以上的日志 src/logs/error.log
log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.errorFile.File = src/logs/error.log
log4j.appender.errorFile.Append = true
log4j.appender.errorFile.Threshold = error
log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout
log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %n%m%n
  • 需求

    • 根据字符串的逗号进行分割,输出
  • 代码

    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度
        env.setParallelism(1);

        //相同类型元素的数据流 source
        DataStream<String> stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");

        stringDS.print("处理前");

        // FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型
        DataStream<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> collector) throws Exception {
                String[] arr = value.split(",");
                for (String str : arr) {
                    collector.collect(str);
                }
            }
        });

        //输出 sink
        flatMapDS.print("处理后");

        //DataStream需要调用execute,可以取个名称
        env.execute("flat map job");
    }

// 处理前> java,SpringBoot
// 处理后> java
// 处理后> SpringBoot
// 处理前> spring cloud,redis
// 处理后> spring cloud
// 处理后> redis
// 处理前> kafka,小滴课堂
// 处理后> kafka
// 处理后> 小滴课堂
  • 注意
    • Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,案例都会优先使用DataStream流式API