Flink窗口函数
先聚合一个批次,再写入数据库,减轻数据库的压力,countWindowAll(10)
表示当数据汇总到10个,执行一次。
env.fromSource(**)
.map(new MapFunction<String, String>() {
***
})
.countWindowAll(10)
.apply(new AllWindowFunction<String, List<String>, GlobalWindow>() {
@Override
public void apply(GlobalWindow globalWindow, Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
List<String> skuInfos = Lists.newArrayList(iterable);
if (skuInfos.size() > 0) {
collector.collect(skuInfos);
}
}
})
.addSink(new RichSinkFunction<List<String>>() {
@Override
public void invoke(List<String> value, Context context) {
***
}
});
以下TumblingEventTimeWindows.of(Time.seconds(10))
表示10秒触发一次操作,具体使用哪个根据业务场景决定。
env.fromSource(***)
.map(new MapFunction<String, String>() {
***
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new AllWindowFunction<String, List<String>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<String> iterable, Collector<List<String>> collector) throws Exception {
List<String> skuInfos = Lists.newArrayList(iterable);
if (skuInfos.size() > 0) {
collector.collect(skuInfos);
}
}
})
.addSink(new RichSinkFunction<List<String>>() {
@Override
public void invoke(List<String> value, Context context) {
***
}
});