竹笋

首页 » 问答 » 环境 » 大数据培训Flink窗口的开始时间的计算
TUhjnbcbe - 2024/8/17 0:26:00
中科医院曝光 https://m.39.net/pf/a_5131641.html

我还记得的在我刚学习flink的时候,B站的老师说过,Flink窗口的开始时间和结束时间和你想的不一样。那个时候我好像记得老师说过,flink的窗口大小会根据你的时间单位来进行修正。

然后在现如今,很多人还是不是很了解窗口机制,以及watermark。更别提什么窗口什么时候,什么时候结束。所以呢,今天从源码角度给大家普及一下窗口什么时候开始,什么时候结束。

我们可以来编写一个简单的代码,来看一下效果,我习惯用java来写flink,所以也就使用java了。

Override

publicCollectionTimeWindowassignWindows(Objectelement,longtimestamp,WindowAssignerContextcontext){

if(timestampLong.MIN_VALUE){

ListTimeWindowwindows=newArrayList((int)(size/slide));

//获取窗口开始时间

longlastStart=TimeWindow.getWindowStartWithOffset(timestamp,offset,slide);

for(longstart=lastStart;

starttimestamp-size;

start-=slide){

windows.add(newTimeWindow(start,start+size));

}

returnwindows;

}else{

thrownewRuntimeException("RecordhasLong.MIN_VALUEtimestamp(=notimestampmarker)."+

"IsthetimecharacteristicsettoProcessingTime,ordidyouforgettocall"+

"DataStream.assignTimestampsAndWatermarks(...)?");

}

}

publicstaticlonggetWindowStartWithOffset(longtimestamp,longoffset,longwindowSize){

//窗口开始计算的时间

returntimestamp-(timestamp-offset+windowSize)%windowSize;

}

我们可以看出来窗口开始时间,是取模过后的时间,我们来简单的分析一番。

假如我们第一条数据的时间戳是,offset暂时不需要管,因为他是时间的偏移量例如,东八区什么的。我们假如窗口大小是5s,

那么接下来的公式计算也就是-(-0+)%,那么我们可以计算出来的结果就是0,也就是说,窗口开始的时间是0.更大的时间窗口大小,各位大佬可以下面自己算一下。

也就是说开始时间是0,结束的时间窗口也就是,因为到的时候就触发计算了。那么我们接下来就进行验证一番和我们分析的是否一致。

接下来我们写一个简单的wordcount,因为在多并行度下,不是很好分析,我们设置为单并行读。

/**

*

author:qingzhi.wu

*

date:/1/:36下午

*/

publicclassWindowSizeTest{

publicstaticvoidmain(String[]args)throwsException{

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStreamSourceStringsource=env.socketTextStream("localhost",);

SingleOutputStreamOperatorStringsource1=source.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractorString(Time.of(0,TimeUnit.MILLISECONDS)){

Override

publiclongextractTimestamp(Strings){

returnLong.parseLong(s.split(",")[0]);

}

});

SingleOutputStreamOperatorTuple2String,Integermap=source1.map(newMapFunctionString,Tuple2String,Integer(){

Override

publicTuple2String,Integermap(Strings)throwsException{

returnTuple2.of(s.split(",")[1],1);

}

});

WindowedStreamTuple2String,Integer,Tuple,TimeWindowwindow=map.keyBy(0).window(TumblingEventTimeWindows.of(Time.of(,TimeUnit.MILLISECONDS)));

window.sum(1).print();

env.execute();

}

}

接下来我们,就看看我们分析的是否正确

很明显是正确的。那么一天的窗口大小你会计算吗?

文章来源于好胖子的大数据之路

1
查看完整版本: 大数据培训Flink窗口的开始时间的计算