-
Flink怎么操作redis?
- 方式一:自定义sink
- 方式二:使用connector
-
Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
- getCommandDescription 选择对应的数据结构和key名称配置
- getKeyFromData 获取key
- getValueFromData 获取value
-
使用
<!--redis connector-->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
-
编码
public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {
/***
* 选择需要用到的命令,和key名称
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
}
/**
* 获取对应的key或者filed
*
* @param data
* @return
*/
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
System.out.println("getKeyFromData=" + data.f0);
return data.f0;
}
/**
* 获取对应的值
*
* @param data
* @return
*/
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
System.out.println("getValueFromData=" + data.f1.toString());
return data.f1.toString();
}
}
- docker部署redis6.x
docker run -d -p 6379:6379 redis
- 编码实战
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
// DataStream<VideoOrder> ds = env.fromElements(
// new VideoOrder("21312","java",32,5,new Date()),
// new VideoOrder("314","java",32,5,new Date()),
// new VideoOrder("542","springboot",32,5,new Date()),
// new VideoOrder("42","redis",32,5,new Date()),
// new VideoOrder("4252","java",32,5,new Date()),
// new VideoOrder("42","springboot",32,5,new Date()),
// new VideoOrder("554232","flink",32,5,new Date()),
// new VideoOrder("23323","java",32,5,new Date())
// );
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//transformation
DataStream<Tuple2<String, Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(), 1);
}
});
// DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
// @Override
// public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
// out.collect(new Tuple2<>(value.getTitle(),1));
// }
// });
//分组
KeyedStream<Tuple2<String, Integer>, String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//统计每组有多少个
DataStream<Tuple2<String, Integer>> sumDS = keyByDS.sum(1);
//控制台打印
sumDS.print();
//单机redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
sumDS.addSink(new RedisSink<>(conf, new VideoOrderCounterSink()));
//DataStream需要调用execute,可以取个名称
env.execute("custom redis sink job");
}