更新時間:2021年01月15日17時49分 來源:傳智教育 瀏覽次數(shù):
問題分析
假如我們自己寫一個流式框架。我們該如何處理消息。正常情況下,我們看到消息按照順序一個個發(fā)送,接受后按照順序處理,這是沒有什么問題的。然而也要考慮到一些特殊情況下,消息不在是按照順序發(fā)送,產(chǎn)生了亂序,這時候該怎么處理?
核心問題講解
(1)watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。但是對于late element,我們又不能無限期的等下去,必須要有個機(jī)制來保證一個特定的時間后,必須觸發(fā)window去進(jìn)行計算。這個特別的機(jī)制,就是watermark。
(2)通常,在接收到source的數(shù)據(jù)后,應(yīng)該立刻生成watermark;但是,也可以在接收source后,應(yīng)用簡單的map或者filter操作,然后再生成watermark。
(3)如果延遲的數(shù)據(jù)有業(yè)務(wù)需要,則設(shè)置好允許延遲的時間,因為我們不能無限期的等下去。每個窗口都有屬于自己的最大等待延遲數(shù)據(jù)的時間限制,窗口結(jié)束時間+延遲時間=最大waterMark值,即當(dāng)waterMark值大于的上述計算出的最大waterMark值,該窗口內(nèi)的數(shù)據(jù)就屬于遲到的數(shù)據(jù),無法參與window計算。
問題擴(kuò)展
結(jié)合項目中使用
watermark如何處理亂序數(shù)據(jù)?
假如我們設(shè)置10s的時間窗口(window),那么0~10s,10~20s都是一個窗口,以0~10s為例,0位start-time,10為end-time。假如有4個數(shù)據(jù)的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設(shè)置Watermarks為當(dāng)前所有到達(dá)數(shù)據(jù)event-time的最大值減去延遲值3.5秒。
當(dāng)A到達(dá)的時候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發(fā)計算;
當(dāng)B到達(dá)的時候,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會觸發(fā)計算;
當(dāng)C到達(dá)的時候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發(fā)計算;
當(dāng)D到達(dá)的時候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發(fā)計算;
觸發(fā)計算的時候,會將AC(因為他們都小于10)都計算進(jìn)去。
通過上面這種方式,我們就將遲到的C計算進(jìn)去了。這里的延遲3.5s是我們假設(shè)一個數(shù)據(jù)到達(dá)的時候,比他早3.5s的數(shù)據(jù)肯定也都到達(dá)了,這個是需要根據(jù)經(jīng)驗推算的,加入D到達(dá)以后有到達(dá)了一個E,event-time=6,但是由于0~10的時間窗口已經(jīng)開始計算了,所以E就丟了。
猜你喜歡: