消費者負責從訂閱的主題上拉取消息,消費組是邏輯概念。壹個消費者只屬於壹個消費組,壹個消費組包壹個或多個消費者。當消息發布到主題後,會被投遞到每個消費組,但每個消費組中只有壹個消費者能消費給消息。
消費者如何知道該消費哪個分區?當消費組內消費者個數發生變化時,分區分配是如何變化的呢?
按照消費者總數和分區總數進行整除運算來獲得壹個跨度,然後將分區按照跨度進行平均分配, 以保證分區盡可能均勻地分配給所有的消費者。對於 每壹個主題 該策略會將消費組內所有的消費者按照名稱的字典序排序然後為每個消費者劃分固定的分區範圍,如果不夠平均分配,那麽字典序靠前的消費者會被多分配壹個分區。
假設n=分區數/消費者數量,m=分區數%消費者數量,那麽前m個消費者每個分配n+1分區,後面的每個消費者分配n個分區。
如圖所示主題中***有7個分區,此時消費組內只有壹個消費者C0,C0訂閱7個分區。
隨著消費組內消費者不斷加入,分區逐漸從C0分配到C1~C6,當最後壹個消費者C7加入後,此時總***有8個消費者但是只有7個分區,因此C7由於分配不到分區而無法消費任何消息。
消費者並非越多越好,消費者數量應小於等於分區數量,否則會造成資源的浪費
缺點:
當壹個消費組訂閱兩個分別包含四個分區的主題時,分區分配結果如下,比較均勻。
但當兩個主題各有3個分區時,則會出現如下分區不均的問題。類似情況擴大的話,可能出現消費者過載問題。
將消費組內所有消費者及消費者訂閱的所有主題的分區按照字典序排序,然後通過輪詢方式將分區依次分配給每個消費者。如果消費組內消費者的訂閱信息都是相同的,那麽分區分配會比較均勻。如壹個消費組兩個消費者,分別訂閱兩個都有3的分區的主題,如圖。
但是當消費組內消費者的訂閱信息不同時,則會出現分配不均問題。如圖,假設消費組內有三個消費者,主題1/2/3分別有1/2/3個分區,C0訂閱主題1,C1訂閱主題1和2,C2訂閱主題1/2/3,分區結果將會如下圖所示。
後來引入的策略,主要目的:
假設三個消費者,訂閱了4個主題,每個主題有兩個分區,那麽初始分區分配結果如下:
乍壹看,跟RoundRobin分配策略結果相同,但此時如果C1下線,那麽消費組會執行再均衡操作,重新分配消息分區。如果是RoundRobin策略,分配結果如下:
而如果是Sticky分配策略,則結果如下:
StickyAssignor保留了上壹次對C0和C2的分配結果,將C1的分區分配給C0和C2使其均衡。
如果發生分區重分配,那麽對於同壹個分區而 ,有可能之前的消費者和新指派的消費者不是同壹個,之前消費者進行到壹半的處理還要在新指派的消費者中再次復現壹遍,造成重復消費。StickyAssignor分配策略如同其名稱中的"sticky"壹 樣,讓分配策略具備的“黏性”,盡可能地讓前後兩次分配相同,進而減少系統資源的損耗及其他異常情況的發生。
再來看下,消費者訂閱信息不相同的情況,拿RoundRobinAssignor中的實例來說。
假設消費組內有三個消費者,主題1/2/3分別有1/2/3個分區,C0訂閱主題1,C1訂閱主題1和2,C2訂閱主題1/2/3,RoundRobinAssignor分區結果將會如下圖所示。
而采用StickyAssignor時,分區分配結果如下:
若此時C0下線,RoundRobinAssignor重分配的結果如下:
而StickyAssignor重分配結果如下:
綜上:
StickyAssignor分配策略的優點就是可以使分區重分配具備 “黏性”,減少不必要的分區移動(壹個分區剝離之前的消費者 ,轉而分配給另壹個新的消費者)。
Kafka中的消息消費是基於拉模式。
Kafka每次拉取壹組消息,每條消息的格式如下:
在每次拉取方法時,它返回的是還沒有被消費過的消息集。要實現這個功能,就需要知道上次消費時的消費位移,消費者在消費完消息後要進行消費位移提交動作,且消費位移要進行持久化,消費位移保存在__consumer_offsets主題中。
當前拉取消息的最大offset為x,消費者消費完成提交位移的是offset其實為x+1,表示下次拉取消息的起始位置。
自動提交
默認采用自動提交,默認每隔5s會將拉取到的每個分區的最大的消息位移進行提交。真正的提交動作是在拉取消息的邏輯完成,每次拉取消息前會判斷是否可以進行位移提交,如果可以則提交上壹次的位移。這裏會有兩個問題,如下圖所示。
重復消費:當前拉取消息x+2,x+7,當前消費到X+5,在提交消費位移前,消費者宕機;新的消費者還是會從X+2開始拉取消息, 因此導致重復消費。
消息丟失:當前拉取消息x+2,x+7,當前消費X+5,到下次拉取的時候,消費位移已經提交為X+8,若此時消費者宕機,新的消費者會從X+8處開始消費,導致X+5 ~ X+7的消息沒有被消費,導致消息的丟失。
手動提交
同步提交和異步提交。
同步提交默認提交本次拉取分區消息的最大偏移量,如本次拉取X+2,X+7的消息,同步提交默認提交X+8的位置;當時同步提交也可指定提交的偏移量,比如消費壹條提交1次,因為提交本身為同步操作,所以會耗費壹定的性能。
同步提交也會導致重復消費的問題,如消費完成後,提交前消費者宕機。
異步提交消費者線程不會被阻塞,使性能得到增強,但異步提交失敗重試可能會導致提交位移被覆蓋的問題,如本次異步提交offset=X失敗,下次異步提交offset=X+y成功;此時前壹次提交重試再次提交offset=x,如果業務上沒有重試校驗,會導致offset被覆蓋,最終導致重復消費。
當新的消費組建立、消費者訂閱新的主題或之前提交的位移信息因為過期被刪除等,此時查不到紀錄的消費位移。Kafka可配置從最新或從最早處開始消費。
Kafka還支持從特定位移處開始消費,可以實現回溯消費,Kafka內部提供了Seek()方法,來重置消費位移。
當需要回溯指定時間後的消息時,可先用offsetsForTimes方法查到指定時間後第壹條消息的位移,然後再用seek重置位移。
分區的所屬權從壹個消費者轉移到另壹消費者的行為,它為消費組具備高可用性和伸縮性提供保障,使我們可以既方便又安全地刪除或添加消費者。
Kfaka提供了組協調器(GroupCoordinator)和消費者協調器(ConsumerCoordinator),前者負責管理消費組,後者負責與前者交互,兩者最重要的職責就是負責再均衡的操作。
舉例說明,當消費者加入消費組時,消費者、消費組和組協調器之間壹般會經歷以下幾個階段。
第壹階段(FIND COORDINATOR)
消費者需要確定它所屬的消費組對應的GroupCoordinator所在的broker並創建與該broker 相互通信的網絡連接。
消費者會向集群中的某個節點發送FindCoordinatorRequest請求來查找對應的組協調器。
Kafka根據請求中的coordinator_key(也就是groupld )的哈希值計算__consumer_offsets中的分區編號,如下圖所示。找到對應的分區之後,在尋找此分區leader副本所在的broker節點,該節點即為當前消費組所在的組協調器節點。
消費組最終的分區分配方案及組內消費者所提交的消費位移信息都會發送給該broker節點。該broker節點既扮演GroupCoordinato的角色又扮演保存分區分配方案和組內消費者位移的角色,這樣可以省去很多不必要的中間輪轉所帶來的開銷。
第二階段(JOIN GROUP)
在成功找到消費組所對應的GroupCoordinator之後就進入加入消費組的階段,在此階段的 消費者會向GroupCoordinator發送JoinGroupRequest請求,並處理響應。
組協調器內部主要做了以下幾件事:
選舉消費組的****leader
如果當前組內沒有leader,那麽第壹個加入消費組的則為leader。如果leader掛掉,組協調器會從內部維護的HashMap(消費者信息,key為member_id)中選擇第壹個key作為新的leader。
選舉分區分配策略
前面說的每個消費者可能會上報多個分區分配策略,選舉過程如下:
第三階段(SYNC GROUP)
leader消費者根據在第二階段中得到的分區分配策略來實施分區分配,然後將分配結果同步到組協調器。各個消費者會向組協調器發送SyncGroupRequest請求來同步分配方案。
請求結構如圖,leader發送的請求才會有group_assignment。
其中包含了各個消費者對應的具體分配方案,member_id表示消費者的唯壹標識,而 member_assignment是與消費者對應的分配方案,如圖
消費者收到具體的分區分配方案後,會開啟心跳任務,定期向組協調器發送心跳請求確定彼此在線。
第四階段(HEARTBEAT)
在正式消費之前,消費者還需要確定拉取消息的起始位置。假設之前已經將最後的消費位移提交成功,那麽消費者會請求獲取上次提交的消費位移並從此處繼續消費。
心跳線程是壹個獨立的線程,可以在輪詢消息的空檔發送。如果消費者停發送心跳的時間足夠長,組協調器會認為這個消費者已經死亡,則觸發壹次再均衡行為。