articleList

14-Flink 自定义的Sink 连接Mysql存储商品订单案例实战

2025/03/13 posted in  Flink
Tags: 

  • 自定义

    • SinkFunction
    • RichSinkFunction
      • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
  • Flink连接mysql的几种方式(都需要加jdbc驱动)

    • 方式一:自带flink-connector-jdbc 需要加依赖包
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-jdbc_2.12</artifactId>
         <version>1.12.0</version>
     </dependency>
    
    • 方式二:自定义sink
  • 保存视频订单到Mysql

    • Mysql环境自己本地搭建
    • 建表
     CREATE TABLE `video_order` (
       `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
       `user_id` int(11) DEFAULT NULL,
       `money` int(11) DEFAULT NULL,
       `title` varchar(32) DEFAULT NULL,
       `trade_no` varchar(64) DEFAULT NULL,
       `create_time` date DEFAULT NULL,
       PRIMARY KEY (`id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
    • 添加jdbc依赖
     <!--mysql驱动-->
     <dependency>
         <groupId>mysql</groupId>
         <artifactId>mysql-connector-java</artifactId>
         <version>8.0.25</version>
     </dependency>
    
    • 编码
    public class MysqlSink extends RichSinkFunction<VideoOrder> {
    
         private Connection conn;
    
         private PreparedStatement ps;
    
         /**
          * 初始化连接
          *
          * @param parameters
          * @throws Exception
          */
         @Override
         public void open(Configuration parameters) throws Exception {
             System.out.println("open=======");
             conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net");
    
             String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
             ps = conn.prepareStatement(sql);
         }
    
         /**
          * 关闭链接
          *
          * @throws Exception
          */
         @Override
         public void close() throws Exception {
             System.out.println("close=======");
             if (conn != null) {
                 conn.close();
             }
             if (ps != null) {
                 ps.close();
             }
         }
    
         /**
          * 执行对应的sql
          *
          * @param value
          * @param context
          * @throws Exception
          */
         @Override
         public void invoke(VideoOrder value, Context context) throws Exception {
    
             ps.setInt(1, value.getUserId());
             ps.setInt(2, value.getMoney());
             ps.setString(3, value.getTitle());
             ps.setString(4, value.getTradeNo());
             ps.setDate(5, new Date(value.getCreateTime().getTime()));
    
             ps.executeUpdate();
         }
     }
    
  • Flink整合自定义Sink实战

     public static void main(String[] args) throws Exception {
    
         //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
         env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
         env.setParallelism(1);
    
         DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
    
         DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
             @Override
             public boolean filter(VideoOrder videoOrder) throws Exception {
                 return videoOrder.getMoney() > 50;
             }
         });
    
         filterDS.print();
    
         filterDS.addSink(new MysqlSink());
    
         //DataStream需要调用execute,可以取个名称
         env.execute("custom mysql sink job");
     }
    
  • 自定义MysqlSink

    • 建议继承RichSinkFunction函数
      • 用open()函数初始化JDBC连接
      • invoke SQL预编译器等运行时环境
      • close()函数做清理工作
    • 如果选择继承SinkFunction,会在每次写入一条数据时都会创建一个JDBC连接