溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么進行kafka的原理分析

發布時間:2021-12-15 09:16:28 來源:億速云 閱讀:155 作者:柒染 欄目:大數據

今天就跟大家聊聊有關怎么進行kafka的原理分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

簡介

kafka是一個分布式消息隊列。具有高性能、持久化、多副本備份、橫向擴展能力。生產者往隊列里寫消息,消費者從隊列里取消息進行業務邏輯。一般在架構設計中起到解耦、削峰、異步處理的作用。

kafka對外使用topic的概念,生產者往topic里寫消息,消費者從讀消息。為了做到水平擴展,一個topic實際是由多個partition組成的,遇到瓶頸時,可以通過增加partition的數量來進行橫向擴容。單個parition內是保證消息有序。

每新寫一條消息,kafka就是在對應的文件append寫,所以性能非常高。

kafka的總體數據流是這樣的:

怎么進行kafka的原理分析

kafka data flow

大概用法就是,Producers往Brokers里面的指定Topic中寫消息,Consumers從Brokers里面拉去指定Topic的消息,然后進行業務處理。
圖中有兩個topic,topic 0有兩個partition,topic 1有一個partition,三副本備份??梢钥吹絚onsumer gourp 1中的consumer 2沒有分到partition處理,這是有可能出現的,下面會講到。

關于broker、topics、partitions的一些元信息用zk來存,監控和路由啥的也都會用到zk。


生產

基本流程是這樣的:

怎么進行kafka的原理分析

kafka sdk product flow.png

創建一條記錄,記錄中一個要指定對應的topic和value,key和partition可選。先序列化,然后按照topic和partition,放進對應的發送隊列中。kafka produce都是批量請求,會積攢一批,然后一起發送,不是調send()就進行立刻進行網絡發包。
如果partition沒填,那么情況會是這樣的:

  1. key有填
    按照key進行哈希,相同key去一個partition。(如果擴展了partition的數量那么就不能保證了)

  2. key沒填
    round-robin來選partition

這些要發往同一個partition的請求按照配置,攢一波,然后由一個單獨的線程一次性發過去。

API

有high level api,替我們把很多事情都干了,offset,路由啥都替我們干了,用以來很簡單。
還有simple api,offset啥的都是要我們自己記錄。

partition

當存在多副本的情況下,會盡量把多個副本,分配到不同的broker上。kafka會為partition選出一個leader,之后所有該partition的請求,實際操作的都是leader,然后再同步到其他的follower。當一個broker歇菜后,所有leader在該broker上的partition都會重新選舉,選出一個leader。(這里不像分布式文件存儲系統那樣會自動進行復制保持副本數)

然后這里就涉及兩個細節:怎么分配partition,怎么選leader。

關于partition的分配,還有leader的選舉,總得有個執行者。在kafka中,這個執行者就叫controller。kafka使用zk在broker中選出一個controller,用于partition分配和leader選舉。

partition的分配
  1. 將所有Broker(假設共n個Broker)和待分配的Partition排序

  2. 將第i個Partition分配到第(i mod n)個Broker上 (這個就是leader)

  3. 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上

leader容災

controller會在Zookeeper的/brokers/ids節點上注冊Watch,一旦有broker宕機,它就能知道。當broker宕機后,controller就會給受到影響的partition選出新leader。controller從zk的/brokers/topics/[topic]/partitions/[partition]/state中,讀取對應partition的ISR(in-sync replica已同步的副本)列表,選一個出來做leader。
選出leader后,更新zk,然后發送LeaderAndISRRequest給受影響的broker,讓它們改變知道這事。為什么這里不是使用zk通知,而是直接給broker發送rpc請求,我的理解可能是這樣做zk有性能問題吧。

如果ISR列表是空,那么會根據配置,隨便選一個replica做leader,或者干脆這個partition就是歇菜。如果ISR列表的有機器,但是也歇菜了,那么還可以等ISR的機器活過來。

多副本同步

這里的策略,服務端這邊的處理是follower從leader批量拉取數據來同步。但是具體的可靠性,是由生產者來決定的。
生產者生產消息的時候,通過request.required.acks參數來設置數據的可靠性。

ackswhat happen
0which means that the producer never waits for an acknowledgement from the broker.發過去就完事了,不關心broker是否處理成功,可能丟數據。
1which means that the producer gets an acknowledgement after the leader replica has received the data. 當寫Leader成功后就返回,其他的replica都是通過fetcher去同步的,所以kafka是異步寫,主備切換可能丟數據。
-1which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里所有機器同步成功,才能返回成功,延時取決于最慢的機器。強一致,不會丟數據。

在acks=-1的時候,如果ISR少于min.insync.replicas指定的數目,那么就會返回不可用。

這里ISR列表中的機器是會變化的,根據配置replica.lag.time.max.ms,多久沒同步,就會從ISR列表中剔除。以前還有根據落后多少條消息就踢出ISR,在1.0版本后就去掉了,因為這個值很難取,在高峰的時候很容易出現節點不斷的進出ISR列表。

從ISA中選出leader后,follower會從把自己日志中上一個高水位后面的記錄去掉,然后去和leader拿新的數據。因為新的leader選出來后,follower上面的數據,可能比新leader多,所以要截取。這里高水位的意思,對于partition和leader,就是所有ISR中都有的最新一條記錄。消費者最多只能讀到高水位;

從leader的角度來說高水位的更新會延遲一輪,例如寫入了一條新消息,ISR中的broker都fetch到了,但是ISR中的broker只有在下一輪的fetch中才能告訴leader。

也正是由于這個高水位延遲一輪,在一些情況下,kafka會出現丟數據和主備數據不一致的情況,0.11開始,使用leader epoch來代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)

思考:
當acks=-1時

  1. 是follwers都來fetch就返回成功,還是等follwers第二輪fetch?

  2. leader已經寫入本地,但是ISR中有些機器失敗,那么怎么處理呢?


消費

訂閱topic是以一個消費組來訂閱的,一個消費組里面可以有多個消費者。同一個消費組中的兩個消費者,不會同時消費一個partition。換句話來說,就是一個partition,只能被消費組里的一個消費者消費,但是可以同時被多個消費組消費。因此,如果消費組內的消費者如果比partition多的話,那么就會有個別消費者一直空閑。

怎么進行kafka的原理分析

untitled_page.png

API

訂閱topic時,可以用正則表達式,如果有新topic匹配上,那能自動訂閱上。

offset的保存

一個消費組消費partition,需要保存offset記錄消費到哪,以前保存在zk中,由于zk的寫性能不好,以前的解決方法都是consumer每隔一分鐘上報一次。這里zk的性能嚴重影響了消費的速度,而且很容易出現重復消費。
在0.10版本后,kafka把這個offset的保存,從zk總剝離,保存在一個名叫__consumeroffsets topic的topic中。寫進消息的key由groupid、topic、partition組成,value是偏移量offset。topic配置的清理策略是compact??偸潜A糇钚碌膋ey,其余刪掉。一般情況下,每個key的offset都是緩存在內存中,查詢的時候不用遍歷partition,如果沒有緩存,第一次就會遍歷partition建立緩存,然后查詢返回。

確定consumer group位移信息寫入__consumers_offsets的哪個partition,具體計算公式:

__consumers_offsets partition =           Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認是50個分區。

思考:
如果正在跑的服務,修改了offsets.topic.num.partitions,那么offset的保存是不是就亂套了?

分配partition--reblance

生產過程中broker要分配partition,消費過程這里,也要分配partition給消費者。類似broker中選了一個controller出來,消費也要從broker中選一個coordinator,用于分配partition。
下面從頂向下,分別闡述一下

  1. 怎么選coordinator。

  2. 交互流程。

  3. reblance的流程。

選coordinator
  1. 看offset保存在那個partition

  2. 該partition leader所在的broker就是被選定的coordinator

這里我們可以看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一臺機器。

交互流程

把coordinator選出來之后,就是要分配了
整個流程是這樣的:

  1. consumer啟動、或者coordinator宕機了,consumer會任意請求一個broker,發送ConsumerMetadataRequest請求,broker會按照上面說的方法,選出這個consumer對應coordinator的地址。

  2. consumer 發送heartbeat請求給coordinator,返回IllegalGeneration的話,就說明consumer的信息是舊的了,需要重新加入進來,進行reblance。返回成功,那么consumer就從上次分配的partition中繼續執行。

reblance流程
  1. consumer給coordinator發送JoinGroupRequest請求。

  2. 這時其他consumer發heartbeat請求過來時,coordinator會告訴他們,要reblance了。

  3. 其他consumer發送JoinGroupRequest請求。

  4. 所有記錄在冊的consumer都發了JoinGroupRequest請求之后,coordinator就會在這里consumer中隨便選一個leader。然后回JoinGroupRespone,這會告訴consumer你是follower還是leader,對于leader,還會把follower的信息帶給它,讓它根據這些信息去分配partition

5、consumer向coordinator發送SyncGroupRequest,其中leader的SyncGroupRequest會包含分配的情況。
6、coordinator回包,把分配的情況告訴consumer,包括leader。

當partition或者消費者的數量發生變化時,都得進行reblance。
列舉一下會reblance的情況:

  1. 增加partition

  2. 增加消費者

  3. 消費者主動關閉

  4. 消費者宕機了

  5. coordinator自己也宕機了


消息投遞語義

kafka支持3種消息投遞語義
At most once:最多一次,消息可能會丟失,但不會重復
At least once:最少一次,消息不會丟失,可能會重復
Exactly once:只且一次,消息不丟失不重復,只且消費一次(0.11中實現,僅限于下游也是kafka)

在業務中,常常都是使用At least once的模型,如果需要可重入的話,往往是業務自己實現。

At least once

先獲取數據,再進行業務處理,業務處理成功后commit offset。
1、生產者生產消息異常,消息是否成功寫入不確定,重做,可能寫入重復的消息
2、消費者處理消息,業務處理成功后,更新offset失敗,消費者重啟的話,會重復消費

At most once

先獲取數據,再commit offset,最后進行業務處理。
1、生產者生產消息異常,不管,生產下一個消息,消息就丟了
2、消費者處理消息,先更新offset,再做業務處理,做業務處理失敗,消費者重啟,消息就丟了

Exactly once

思路是這樣的,首先要保證消息不丟,再去保證不重復。所以盯著At least once的原因來搞。首先想出來的:

  1. 生產者重做導致重復寫入消息----生產保證冪等性

  2. 消費者重復消費---消滅重復消費,或者業務接口保證冪等性重復消費也沒問題

由于業務接口是否冪等,不是kafka能保證的,所以kafka這里提供的exactly once是有限制的,消費者的下游也必須是kafka。所以一下討論的,沒特殊說明,消費者的下游系統都是kafka(注:使用kafka conector,它對部分系統做了適配,實現了exactly once)。

生產者冪等性好做,沒啥問題。

解決重復消費有兩個方法:

  1. 下游系統保證冪等性,重復消費也不會導致多條記錄。

  2. 把commit offset和業務處理綁定成一個事務。

本來exactly once實現第1點就ok了。

但是在一些使用場景下,我們的數據源可能是多個topic,處理后輸出到多個topic,這時我們會希望輸出時要么全部成功,要么全部失敗。這就需要實現事務性。既然要做事務,那么干脆把重復消費的問題從根源上解決,把commit offset和輸出到其他topic綁定成一個事務。

生產冪等性

思路是這樣的,為每個producer分配一個pid,作為該producer的唯一標識。producer會為每一個<topic,partition>維護一個單調遞增的seq。類似的,broker也會為每個<pid,topic,partition>記錄下最新的seq。當req_seq == broker_seq+1時,broker才會接受該消息。因為:

  1. 消息的seq比broker的seq大超過時,說明中間有數據還沒寫入,即亂序了。

  2. 消息的seq不比broker的seq小,那么說明該消息已被保存。

    怎么進行kafka的原理分析


    解決重復生產

事務性/原子性廣播

場景是這樣的:

  1. 先從多個源topic中獲取數據。

  2. 做業務處理,寫到下游的多個目的topic。

  3. 更新多個源topic的offset。

其中第2、3點作為一個事務,要么全成功,要么全失敗。這里得益與offset實際上是用特殊的topic去保存,這兩點都歸一為寫多個topic的事務性處理。

怎么進行kafka的原理分析

基本思路是這樣的:
引入tid(transaction id),和pid不同,這個id是應用程序提供的,用于標識事務,和producer是誰并沒關系。就是任何producer都可以使用這個tid去做事務,這樣進行到一半就死掉的事務,可以由另一個producer去恢復。
同時為了記錄事務的狀態,類似對offset的處理,引入transaction coordinator用于記錄transaction log。在集群中會有多個transaction coordinator,每個tid對應唯一一個transaction coordinator。
注:transaction log刪除策略是compact,已完成的事務會標記成null,compact后不保留。

做事務時,先標記開啟事務,寫入數據,全部成功就在transaction log中記錄為prepare commit狀態,否則寫入prepare abort的狀態。之后再去給每個相關的partition寫入一條marker(commit或者abort)消息,標記這個事務的message可以被讀取或已經廢棄。成功后在transaction log記錄下commit/abort狀態,至此事務結束。

數據流:

怎么進行kafka的原理分析

Kafka Transactions Data Flow.png

  1. 首先使用tid請求任意一個broker(代碼中寫的是負載最小的broker),找到對應的transaction coordinator。

  2. 請求transaction coordinator獲取到對應的pid,和pid對應的epoch,這個epoch用于防止僵死進程復活導致消息錯亂,當消息的epoch比當前維護的epoch小時,拒絕掉。tid和pid有一一對應的關系,這樣對于同一個tid會返回相同的pid。

  1. client先請求transaction coordinator記錄<topic,partition>的事務狀態,初始狀態是BEGIN,如果是該事務中第一個到達的<topic,partition>,同時會對事務進行計時;client輸出數據到相關的partition中;client再請求transaction coordinator記錄offset的<topic,partition>事務狀態;client發送offset commit到對應offset partition。

  2. client發送commit請求,transaction coordinator記錄prepare commit/abort,然后發送marker給相關的partition。全部成功后,記錄commit/abort的狀態,最后這個記錄不需要等待其他replica的ack,因為prepare不丟就能保證最終的正確性了。

這里prepare的狀態主要是用于事務恢復,例如給相關的partition發送控制消息,沒發完就宕機了,備機起來后,producer發送請求獲取pid時,會把未完成的事務接著完成。

當partition中寫入commit的marker后,相關的消息就可被讀取。所以kafka事務在prepare commit到commit這個時間段內,消息是逐漸可見的,而不是同一時刻可見。

詳細細節可看:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TransactionalGuarantees

消費事務

前面都是從生產的角度看待事務。還需要從消費的角度去考慮一些問題。
消費時,partition中會存在一些消息處于未commit狀態,即業務方應該看不到的消息,需要過濾這些消息不讓業務看到,kafka選擇在消費者進程中進行過來,而不是在broker中過濾,主要考慮的還是性能。kafka高性能的一個關鍵點是zero copy,如果需要在broker中過濾,那么勢必需要讀取消息內容到內存,就會失去zero copy的特性。


文件組織

kafka的數據,實際上是以文件的形式存儲在文件系統的。topic下有partition,partition下有segment,segment是實際的一個個文件,topic和partition都是抽象概念。

在目錄/${topicName}-{$partitionid}/下,存儲著實際的log文件(即segment),還有對應的索引文件。

每個segment文件大小相等,文件名以這個segment中最小的offset命名,文件擴展名是.log;segment對應的索引的文件名字一樣,擴展名是.index。有兩個index文件,一個是offset index用于按offset去查message,一個是time index用于按照時間去查,其實這里可以優化合到一起,下面只說offset index??傮w的組織是這樣的:

怎么進行kafka的原理分析

kafka 文件組織.png

為了減少索引文件的大小,降低空間使用,方便直接加載進內存中,這里的索引使用稀疏矩陣,不會每一個message都記錄下具體位置,而是每隔一定的字節數,再建立一條索引。索引包含兩部分,分別是baseOffset,還有position。

baseOffset:意思是這條索引對應segment文件中的第幾條message。這樣做方便使用數值壓縮算法來節省空間。例如kafka使用的是varint。

position:在segment中的絕對位置。

查找offset對應的記錄時,會先用二分法,找出對應的offset在哪個segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍歷查找message。


常用配置項

broker配置

配置項作用
broker.idbroker的唯一標識
auto.create.topics.auto設置成true,就是遇到沒有的topic自動創建topic。
log.dirslog的目錄數,目錄里面放partition,當生成新的partition時,會挑目錄里partition數最少的目錄放。

topic配置

配置項作用
num.partitions新建一個topic,會有幾個partition。
log.retention.ms對應的還有minutes,hours的單位。日志保留時間,因為刪除是文件維度而不是消息維度,看的是日志文件的mtime。
log.retention.bytespartion最大的容量,超過就清理老的。注意這個是partion維度,就是說如果你的topic有8個partition,配置1G,那么平均分配下,topic理論最大值8G。
log.segment.bytes一個segment的大小。超過了就滾動。
log.segment.ms一個segment的打開時間,超過了就滾動。
message.max.bytesmessage最大多大

關于日志清理,默認當前正在寫的日志,是怎么也不會清理掉的。
還有0.10之前的版本,時間看的是日志文件的mtime,但這個指是不準確的,有可能文件被touch一下,mtime就變了。因此在0.10版本開始,改為使用該文件最新一條消息的時間來判斷。
按大小清理這里也要注意,Kafka在定時任務中嘗試比較當前日志量總大小是否超過閾值至少一個日志段的大小。如果超過但是沒超過一個日志段,那么就不會刪除。

看完上述內容,你們對怎么進行kafka的原理分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女