美图欣赏:
闲谈一刻
一名大二学生,Flink技术的热爱者。
学习过程中,一定要学会深入思考 (划重点)
一.窗口背景
Flink的窗口机制是其底层核心之一,也是高效流处理的关键。
A.窗口需求
在Streaming应用程序的情况下,数据是连续的,因此我们不能等待在开始处理之前流式传输整个数据。当然,我们可以处理每个传入的事件,然后转移到下一个事件,但在某些情况下,我们需要对传入的数据进行某种聚合 - 例如,有多少用户在过去10分钟内点击了您网页上的链接。在这种情况下,我们必须定义一个窗口并对窗口内的数据进行处理
Flink窗口主要分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类
这里重点分析下滑动窗口 Sliding Window
(1)滚动窗口Tumbling window
滚动窗口滚动数据流。这种类型的窗口是不重叠的 - 即,一个窗口中的事件/数据不会在其他窗口中重叠/出现。
(2)滑动窗口 Sliding Window
滑动窗口与翻滚窗口相对,滑过数据流。因此,滑动窗口可以重叠,它可以对输入的数据流进行更平滑的聚合 - 因为您不是从一组输入跳转到下一组输入,而是滑过输入的数据流。
(3)会话窗口 session Window
会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。我们可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。
二.Flink滑动窗口原理
原由:今天在运行滑动窗口Demo时,发现设置的timeWindow俩时间参数,运行结果跟想象的不一样。
自己思考后,找到了原因,因此来分享下。
滑动窗口俩时间参数设置:
先看下Flink的timeWindow源码:
- 第一个参数表示:滑动大小
- 第二个参数表示:滑动步长
Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,Slide等于窗口的Size时,相邻窗口不重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } }
讯享网
窗口时间参数图解:

自己代码参数设置
结论:
- 滑动窗口直接从当前窗口开始算
- 结果运行的显示,取决于第二个Slide参数时间设置
这里设置的是20秒,从开始运行到第20秒后,结果才能显示控制台。
举个例子:因为在第二个参数设置20秒内,如果17秒的时候突然有数据过来了,在过三秒后达到了20秒啊,就会立即计算执行(亲测)。因为Flink是流式计算。
会出现俩个相同计算结果。第一个计算结果出现后,在过20秒出现与第一个结果相同。
讯享网 .timeWindow(Time.seconds(40), Time.seconds(20)
代码实现
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; / * Author : Jackson * Version : 2020/4/21 & 1.0 */ public class DataStreamWordCount { public static void main(String[] args) throws Exception { //创建Flink的流式计算环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //监听本地9000端口 DataStreamSource<String> text = env.socketTextStream("192.168.1.125", 9000, "\n"); //将接收的数据进行拆分,分组,窗口计算并且聚合输出 SingleOutputStreamOperator<WordWithCount> word = text.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String line, Collector<WordWithCount> out) throws Exception { for (String word : line.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(40), Time.seconds(20)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count + b.count); } }); //打印结果 word.printToErr(); env.execute("Socket Window WordCount"); } public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + ";" + count; } } }
运行结果:

————保持饥饿,保持学习
Jackson_MVP


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/65200.html