猪猪吧博客2 外汇 正文

Flink笔记-延迟数据处理


Flink的窗口处理流式数据虽然提供了基础EventTime的WaterMark机制,但是只能在一定程度上解决数据乱序问题。而某些极端情况下数据延迟会非常严重,即便通过WaterMark机制也无法等到数据全部进入窗口再进行处理。默认情况下,Flink会将这些严重迟到的数据丢弃掉;如果用户希望即使数据延迟到达,也能够按照流程处理并输出结果,此时可以借助Allowed Lateness机制来对迟到的数据进行额外的处理。







其都是为了处理乱序问题而产生的概念,区别如下:







通过watermark机制来处理out-of-order的问题,属于第一层防护,属于全局性的防护,通常说的乱序问题的解决办法,就是指这类;



通过窗口上的allowedLateness机制来处理out-of-order的问题,属于第二层防护,属于特定window operator的防护,late element的问题就是指这类。



AllowedLateness&OutputTag







DataStream API提供了allowedLateness方法来指定是否对迟到数据进行处理,指定后,Flink窗口计算过程中会将window的Endtime加上该时间作为窗口最后被释放的时间,当接入的数据中EventTime未超过窗口最后被释放的时间,但WaterMark已经超过Window的EndTime时,直接触发窗口计算。相反,如果事件时间超过了窗口最后被释放的时间(最大延时时间),则只能对数据进行丢弃处理。



默认情况下,GlobleWindow的最大Lateness时间为Long.MAX_VALUE,即不超时,因此数据会源源不断累积到窗口中,等待被触发。







demo



public class AllowLateness {



    // def OutputTag



    private static final OutputTag<Tuple2<String, Integer>> myTag = new OutputTag<Tuple2<String, Integer>>("myTag") {



    };



    public static void main(String[] args) throws Exception {



        List<Tuple2<String, Integer>> source = Lists.newArrayList();



        source.add(new Tuple2<>("qingh1", 1));



        source.add(new Tuple2<>("qingh2", 2));



        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromCollection(source);







        env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);



        //env.enableCheckpointing(20000);



        env.getCheckpointConfig() //  清除策略



                .enableExternalizedCheckpoints(CheckpointConfig.



                        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);



        env.setRestartStrategy(RestartStrategies.



                fixedDelayRestart(3,



                        10000));



        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);



        SingleOutputStreamOperator<String> result = dataStreamSource.assignTimestampsAndWatermarks(



                new AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>>() {



                    @Nullable



                    @Override



                    public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> lastElement, long extractedTimestamp) {



                        return new Watermark(System.currentTimeMillis()-500);



                    }







                    @Override



                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {



                        return System.currentTimeMillis()-1000;



                    }



                }



        ).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {



            @Override



            public String getKey(Tuple2<String, Integer> value) throws Exception {



                return "key";



            }



        }).timeWindow(Time.milliseconds(10)).allowedLateness(Time.milliseconds(10)).



                sideOutputLateData(myTag)



                //.sum(1);



                .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>(){



                    @Override



                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {



                        for (Tuple2<String, Integer> element : elements) {



                            out.collect(element.f0);



                        }



                    }



                });



        DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(myTag);



        //只输出outPutTag中内容



        sideOutput.print();



        env.execute("qinghh Demo");



    }



}







测试结果:










为什么第一条数据没有被展示出来?问题好像在这里org.apache.flink.streaming.api.operators.InternalTimeServiceManager#advanceWatermark。测试时也可以手动改变isSkippedElement的值为true,简单mock 窗口没有late,也就是说isSkippedElement 反应的是当前窗口是否late,即窗口的清除时间(eventtime类型下:窗口中数据最大时间戳+allowedLateness)小于当前水位线。










加上 result.print();之后结果如下:










关于测输出(OutputTag)







OutputTag是一个带有名称及类型信息的side output标识;flink允许ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction这些function输出side output,这些function的Context有一个output(OutputTag outputTag, X value)方法用于输出元素到side output



SingleOutputStreamOperator提供了getSideOutput方法,可以根据OutputTag来获取之前在function里输出的site output;WindowOperator的processElement方法在最后会判断,如果isSkippedElement为true而且isElementLate也为true,则在lateDataOutputTag不为null的情况下会将late的element输出到side output







demo如下:



~~~~.timeWindow(Time.milliseconds(10)).allowedLateness(Time.milliseconds(10)).



                sideOutputLateData(myTag)



                .process(new ProcessWindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>(){



                    @Override



                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {



                        for (Tuple2<String, Integer> element : elements) {



                           //忽略正常邏輯



                            //向outPutTag中輸出數據



                            context.output(myTag,element);



                        }



                    }



                });



        DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(myTag);







重点是 context.output(myTag,element);





本文转载自互联网,版权归原作者所有,转载目的在于传递更多的信息,并不代表本网站的观点和立场。 如发现本站文章存在内容、版权或其它问题,烦请联系,我们将及时删除。

评论列表

    快捷回复: