Flink以兩種方式處理障礙:
關鍵是getNextNonBlocked方法。
當這個動作在沒有屏障對齊的情況下完成時,currentBuffered == null,currentBuffered是當前要處理的緩沖區。當buffer為數據時,會正常消耗數據並遵循Flink消耗消息的全過程,遇到barrier時,開始處理barrier。
numBarriersReceived的默認值是0,所以當第壹個屏障進來時,它將進入beginNewAlignment方法。
當另壹個相同的屏障進入時,barrierId == currentCheckpointId為真,直到NumBarriersReceived+NumClosedChannels = = TotalNumberofInputChannels,觸發notifyCheckpoint,並報告對齊緩沖區和對齊時間。(彩蛋:檢查點稍後更新。歡迎全程關註)。
如果其他通道中的障礙被延遲,即numbar riers Received+NumClosedChannels!= totalNumberOfInputChannels,接收屏障對應的通道數據將進入bufferBlocker。
BufferBlocker是通過ArrayDeque實現的
當numbar riers Received+NumClosedChannels = = TotalNumberofInputChannels時,首先執行releaseBlocksAndResetBarriers(),然後執行notifyCheckpoint。
releaseBlocksAndResetBarriers的主要目的是首先使用已經添加到緩存中的數據。
當releaseBlocksAndResetBarriers方法完成時,currentBuffered!=null,將輸入
然後直接消費數據。
緩存中的數據將壹直被使用(該過程將阻止inputGate中的數據被使用),直到使用完成。
當它完成的時候,就像程序第壹次在這裏運行壹樣,來來回回。