articleList

11-Flink自定义的Source 数据源案例-订单来源实战

2025/03/13 posted in  Flink
Tags: 

  • 自定义Source,实现接口自定义数据源
    • 并行度为1
      • SourceFunction
      • RichSourceFunction
    • 并行度大于1
      • ParallelSourceFunction
      • RichParallelSourceFunction
    • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
  • 创建接口
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;
}
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {

    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();

    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("小滴课堂面试专题第一季");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }
    
    /**
     * run 方法调用前 用于初始化连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }
    
    /**
     * 产生数据的逻辑
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag) {
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            VideoOrder videoOrder = new VideoOrder(id, title, money, userId, new Date());

            ctx.collect(videoOrder);
        }
    }

    /**
     * 控制任务取消
     */
    @Override
    public void cancel() {
        flag = false;
    }
}
  • 案例
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        DataStream<VideoOrder> videoOrderDS =  env.addSource(new VideoOrderSource());

        videoOrderDS.print();

        //DataStream需要调用execute,可以取个名称
        env.execute("source job");
    }