真实分析Flink滑动窗口原理

真实分析Flink滑动窗口原理美图欣赏 闲谈一刻 一名大二学生 Flink 技术的热爱者 学习过程中 一定要学会深入思考 划重点 一 窗口背景 Flink 的窗口机制是其底层核心之一 也是高效流处理的关键 A 窗口需求 在 Streaming 应用程序的情况下

大家好,我是讯享网,很高兴认识大家。

美图欣赏:


讯享网

 

闲谈一刻

一名大二学生,Flink技术的热爱者。 

学习过程中,一定要学会深入思考 (划重点)


一.窗口背景

Flink的窗口机制是其底层核心之一,也是高效流处理的关键。

A.窗口需求

在Streaming应用程序的情况下,数据是连续的,因此我们不能等待在开始处理之前流式传输整个数据。当然,我们可以处理每个传入的事件,然后转移到下一个事件,但在某些情况下,我们需要对传入的数据进行某种聚合 - 例如,有多少用户在过去10分钟内点击了您网页上的链接。在这种情况下,我们必须定义一个窗口并对窗口内的数据进行处理

Flink窗口主要分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类

这里重点分析下滑动窗口 Sliding Window

(1)滚动窗口Tumbling window

滚动窗口滚动数据流。这种类型的窗口是不重叠的 - 即,一个窗口中的事件/数据不会在其他窗口中重叠/出现。

(2)滑动窗口 Sliding Window

滑动窗口与翻滚窗口相对,滑过数据流。因此,滑动窗口可以重叠,它可以对输入的数据流进行更平滑的聚合 - 因为您不是从一组输入跳转到下一组输入,而是滑过输入的数据流。

(3)会话窗口 session Window

会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。我们可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。

 

二.Flink滑动窗口原理

原由:今天在运行滑动窗口Demo时,发现设置的timeWindow俩时间参数,运行结果跟想象的不一样。

自己思考后,找到了原因,因此来分享下。

 

滑动窗口俩时间参数设置:

先看下Flink的timeWindow源码

  • 第一个参数表示:滑动大小
  • 第二个参数表示:滑动步长

Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,Slide等于窗口的Size时,相邻窗口不重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } } 

讯享网

窗口时间参数图解:

自己代码参数设置

结论

  • 滑动窗口直接从当前窗口开始算
  • 结果运行的显示,取决于第二个Slide参数时间设置

这里设置的是20秒,从开始运行到第20秒后,结果才能显示控制台。

举个例子:因为在第二个参数设置20秒内,如果17秒的时候突然有数据过来了,在过三秒后达到了20秒啊,就会立即计算执行(亲测)。因为Flink是流式计算。

会出现俩个相同计算结果。第一个计算结果出现后,在过20秒出现与第一个结果相同。

讯享网 .timeWindow(Time.seconds(40), Time.seconds(20)

 

代码实现

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; / * Author : Jackson * Version : 2020/4/21 & 1.0 */ public class DataStreamWordCount { public static void main(String[] args) throws Exception { //创建Flink的流式计算环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //监听本地9000端口 DataStreamSource<String> text = env.socketTextStream("192.168.1.125", 9000, "\n"); //将接收的数据进行拆分,分组,窗口计算并且聚合输出 SingleOutputStreamOperator<WordWithCount> word = text.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String line, Collector<WordWithCount> out) throws Exception { for (String word : line.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(40), Time.seconds(20)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word, a.count + b.count); } }); //打印结果 word.printToErr(); env.execute("Socket Window WordCount"); } public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + ";" + count; } } }

运行结果:

 

                                                         ————保持饥饿,保持学习

                                                                              Jackson_MVP

 

 

 

小讯
上一篇 2025-02-05 21:13
下一篇 2025-03-02 15:36

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/65200.html