- 自定义Source,实现接口自定义数据源
- 并行度为1
- SourceFunction
- RichSourceFunction
- 并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- 创建接口
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
private String tradeNo;
private String title;
private int money;
private int userId;
private Date createTime;
}
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
static {
list.add("spring boot2.x课程");
list.add("微服务SpringCloud课程");
list.add("RabbitMQ消息队列");
list.add("Kafka课程");
list.add("小滴课堂面试专题第一季");
list.add("Flink流式技术课程");
list.add("工业级微服务项目大课训练营");
list.add("Linux课程");
}
/**
* run 方法调用前 用于初始化连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
*
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 产生数据的逻辑
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag) {
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
VideoOrder videoOrder = new VideoOrder(id, title, money, userId, new Date());
ctx.collect(videoOrder);
}
}
/**
* 控制任务取消
*/
@Override
public void cancel() {
flag = false;
}
}
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
videoOrderDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("source job");
}