您的位置 首页 > 腾讯云社区

Flink 自定义 countAndTimeTrigger---shengjk1

1.背景

项目中需要自定义 trigger,需要基于两个条件:1. count 即 msg 的个数,当个数大于某个数时触发窗口 2. time 即每个固定的时间触发窗口

2.代码样例/** * @author shengjk1 * @date 2019/9/4 */ public class CountAndTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); public CountAndTimeTrigger(long maxCount) { super(); this.maxCount = maxCount; } @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ctx.registerProcessingTimeTimer(window.maxTimestamp()); ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public boolean canMerge() { return false; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } } ---来自腾讯云社区的---shengjk1

关于作者: 瞎采新闻

这里可以显示个人介绍!这里可以显示个人介绍!

热门文章

留言与评论(共有 0 条评论)
   
验证码: