我們知道 Kafka 網絡通信架構使用到了 Java NIO 以及 Reactor 設計模式。我們先從整體上看一下完整的網絡通信層架構,如下圖所示:
01 Accept 線程1)從上圖中我們可以看出,Kafka 網絡通信架構中用到的組件主要由兩大部分構成:SocketServer 和 RequestHandlerPool。
2)SocketServer 組件是 Kafka 超高并發網絡通信層中最重要的子模塊。它包含 Acceptor 線程、Processor 線程和 RequestChannel 等對象,都是網絡通信的重要組成部分。
3)RequestHandlerPool 組件就是我們常說的 I/O 工作線程池,里面定義了若干個 I/O 線程,主要用來執行真實的請求處理邏輯。
在經典的 Reactor 設計模式有個「Dispatcher」的角色,主要用來接收外部請求并分發給下面的實際處理線程。在 Kafka 網絡架構設計中,這個 Dispatcher 就是「Acceptor 線程」,用來接收和創建外部 TCP 連接的線程。在 Broker 端每個 SocketServer 實例只會創建一個 Acceptor 線程。它的主要功能就是創建連接,并將接收到的 Request 請求傳遞給下游的 Processor 線程處理。
02 Processor線程1)我們可以看出 Acceptor 線程主要使用了 Java NIO 的 Selector 以及 SocketChannel 的方式循環的輪詢準備就緒的I/O事件。
2)將 ServerSocketChannel 通道注冊到nioSelector 上,并關注網絡連接創事件:SelectionKey.OP_ACCEPT。
3)事件注冊好后,一旦后續接收到連接請求后,Acceptor 線程就會指定一個 Processor 線程,并將該請求交給它并創建網絡連接用于后續處理。
Acceptor只是做了請求入口連接處理的,那么,真正創建網絡連接以及分發網絡請求是由 Processor 線程來完成的。而每個 Processor 線程在創建時都會創建 3 個隊列。
03 RequestHandlerPool線程池1)newConnections 隊列:它主要是用來保存要創建的新連接信息,也就是SocketChannel 對象,目前是硬編碼隊列長度大小為20。每當 Processor 線程接收到新的連接請求時,都會將對應的 SocketChannel 對象放入隊列,等到后面創建連接時,從該隊列中獲取 SocketChannel,然后注冊新的連接。
2)inflightResponse 隊列:它是一個臨時的 Response 隊列,當 Processor 線程將 Repsonse 返回給 Client 之后,要將 Response 放入該隊列。它存在的意義:由于有些 Response 回調邏輯要在 Response 被發送回 Request 發送方后,才能執行,因此需要暫存到臨時隊列。
3)ResponseQueue 隊列:它主要是存放需要返回給Request 發送方的所有 Response 對象。每個 Processor 線程都會維護自己的 Response 隊列。
Acceptor 線程和 Processor 線程只是請求和響應的「搬運工」,而「真正處理 Kafka 請求」是KafkaRequestHandlerPool線程池,在上面網絡超高并發通信架構圖,有兩個參數跟整個流程有關系,分別是「num.network.threads」、「num.io.threads」。其中num.io.threads 就是I/O 工作線程池的大小配置。
下面我們結合 Kafka 超高并發網絡架構圖來講解下一個完整請求處理核心流程:
2 了解Kafka高吞吐日志存儲架構是如何設計嗎?1)Clients 發送請求給 Acceptor 線程。
2)Acceptor 線程會創建 NIO Selector 對象,并創建 ServerSocketChannel 通道實例,然后將 Channel 和 OP_ACCEPT 事件綁定到 Selector 多路復用器上。
3)Acceptor 線程默認創建3個Processor 線程參數:num.network.threads, 并輪詢的將請求對象 SocketChannel 放入到連接隊列中。
4)這時候連接隊列就源源不斷有請求數據了,然后不停地執行 NIO Poll, 獲取對應 SocketChannel 上已經準備就緒的 I/O 事件。
5)Processor 線程向 SocketChannel 注冊了 OP_READ/OP_WRITE 事件,這樣 客戶端發過來的請求就會被該 SocketChannel 對象獲取到,具體就是 processCompleteReceives 方法。
6)這個時候客戶端就可以源源不斷進行請求發送了,服務端通過 Selector NIO Poll 不停的獲取準備就緒的 I/O 事件。
7)然后根據Channel中獲取已經完成的Receive對象,構建Request對象,并將其存入到 Requestchannel 的 RequestQueue 請求隊列中 。
8)這個時候就該I/O 線程池上場了,KafkaRequestHandler 線程循環地從請求隊列RequestQueue 中獲取 Request 實例,然后交由KafkaApis 的 handle 方法,執行真正的請求處理邏輯,并最終將數據存儲到磁盤中。
9)待處理完請求后,KafkaRequestHandler 線程會將 Response 對象放入 Processor 線程的 Response 隊列。
10)然后 Processor 線程通過 Request 中的 ProcessorID 不停地從 Response 隊列中來定位并取出 Response 對象,返還給 Request 發送方。
對于 Kafka 來說,它主要用來處理海量數據流,這個場景的特點主要包括:
1) 寫操作:寫并發要求非常高,基本得達到百萬級 TPS,順序追加寫日志即可,無需考慮更新操作。
2)讀操作:相對寫操作來說,比較簡單,只要能按照一定規則高效查詢即可,支持(offset或者時間戳)讀取。
根據上面兩點分析,對于寫操作來說,直接采用「順序追加寫日志」的方式就可以滿足 Kafka 對于百萬TPS寫入效率要求。
如何解決高效查詢這些日志呢?我們可以設想把消息的 Offset 設計成一個有序的字段,這樣消息在日志文件中也就有序存放了,也不需要額外引入哈希表結構,可以直接將消息劃分成若干個塊,對于每個塊我們只需要索引當前塊的第一條消息的 Offset ,這個是不是有點二分查找算法的意思。即先根據 Offset 大小找到對應的塊,然后再從塊中順序查找。如下圖所示:
這樣就可以快速定位到要查找的消息的位置了,在 Kafka 中,我們將這種索引結構叫做「稀疏哈希索引」。
上面得出了 Kafka 最終的存儲實現方案,即基于順序追加寫日志 +稀疏哈希索引。
接下來我們來看看 Kafka 日志存儲結構:
從上圖看出來,Kafka 是基于「主題 +分區 + 副本 + 分段 + 索引」的結構進行日志存儲的。
了解了整體的日志存儲架構,我們來看下 Kafka 日志格式,Kafka 日志格式也經歷了多個版本迭代,這里我們主要看下V2版本的日志格式:
通過上圖可以得出:V2 版本日志格式主要是通過可變長度提高了消息格式的空間使用率,并將某些字段抽取到消息批次(RecordBatch)中,同時消息批次可以存放多條消息,從而在批量發送消息時,可以大幅度地節省了磁盤空間。
接下來我們來看看日志消息寫入磁盤的整體過程如下圖所示:
3 針對 Kafka 線上集群部署方案, 你是怎么做的?這里我們從架構師必備能力出發, 以電商平臺為例講述了 Kafka 生產級容量評估方案該如何做?如何讓公司領導以及運維部門得到認可,獲準你的方案。
詳細可以深讀:八大步驟帶你深度剖析Kafka生產級容量評估方案
4 針對 Kafka 線上系統, 你是如何進行監控的?Kafka 作為大型系統架構中重要的一環,有著舉足輕重的作用,因此 Kafka 集群的穩定性尤為重要,我們要對生產的 Kafka 集群進行全方位的監控,一般線上系統可以從以下五個維度進行監控:
01 主機節點監控所謂主機節點監控就是監控 Kafka 集群 Broker 所在節點機器的性能。主機節點監控對于 Kafka 來說是最重要的,因為很多線上環境問題首先就是由于主機的某些性能出現了問題。
因此對于 Kafka 來說,主機監控通常是發現問題的第一步,主要性能指標如下:
「機器負載(Load)」、「CPU 使用率」、「內存使用率」、「磁盤 I/O 使用率」、「網絡 I/O 使用率」、「TCP 連接數」、「打開文件數」、「inode 使用情況」。
如果想要更好的監控主機性能的話,有以下兩個教程可以學習和參考:
02 JVM 監控另一個重要的監控維度就是 JVM 監控。監控 JVM 進程主要是為了讓你全面地了解Kafka Broker 進程。
要監控JVM 進程,需要關注 3 個指標:
「監控Full GC 發生頻率和時長」、「監控堆上活躍對象大小」、「監控應用線程總數」
03 Kafka 集群監控另外一個重要監控維度就是 Kafka Broker 集群和各類客戶端的監控,主要有3個方法:
04 Kafka 客戶端監控1)查看 Broker 端重要日志:主要包括 Broker 端服務器日志 server.log,控制器日志 controller.log 以及主題分區狀態變更日志 state-change.log。其中,server.log 是最重要的,如果你的 Kafka 集群出現了故障,你要第一時間查看 server.log,定位故障原因。
2)查看 Broker 端關鍵線程運行狀態,例如:
Log Compaction 線程:日志壓縮清理。一旦它掛掉了,所有 Compaction 操作都會中斷,但用戶對此通常是無感知的。
副本拉取消息的線程:主要執行 Follower 副本向 Leader 副本拉取消息的邏輯。如果它們掛掉了,系統會表現為 Follower 副本延遲 Leader 副本越來越大。
3)查看 Broker 端關鍵的 JMX 性能指標:主要有BytesIn/BytesOut、NetworkProcessorAvgIdlePercent、RequestHandlerAvgIdlePercent、UnderReplicatedPartitions、ISRShrink/ISRExpand、ActiveControllerCount 這幾個指標。
客戶端監控主要是生產者和消費者的監控,生產者往 Kafka 發送消息,此時我們要了解客戶端機器與 Broker 機器之間的往返時延 RTT 是多少,對于跨數據中心或者異地集群來說,RTT 會更大,很難支撐很大的 TPS。
Producer角度:request-latency 是需要重點關注的JMX指標,即消息生產請求的延時;另外 Sender 線程的運行狀態也是非常重要的, 如果 Sender 線程掛了,對于用戶是無感知的,表象只是 Producer 端消息發送失敗。
Consumer角度:對于 Consumer Group,需要重點關注 join rate 和 sync rate 指標,它表示 Rebalance 的頻繁程度。另外還包括消息消費偏移量、消息堆積數量等。
05 Broker 之間的監控最后一個監控維度就是 Broker 之間的監控,主要指副本拉取的性能。Follower 副本實時拉取 Leader 副本的數據,此時我們希望拉取過程越快越好。Kafka 提供了一個特別重要的 JMX 指標,叫做「under replicated partitions」,意思就是比如我們規定這條消息,應該在兩個 Broker 上面保存,假設只有一個 Broker 上保存該消息,那么這條消息所在的分區就叫 under replicated partitions,這種情況是特別關注的,因為有可能造成數據的丟失。
另外還有一個比較重要的指標是「activecontrollor count」。在整個 Kafka 集群中應該確保只能有一臺機器的指標是1,其他全應該是0,如果發現有一臺機器大于1,一定是出現腦裂了,此時應該去檢查下是否出現了網絡分區。Kafka本身是不能對抗腦裂的,完全依靠 Zookeeper 來做,但是如果真正出現網絡分區的話,也是沒有辦法處理的,應該讓其快速失敗重啟。
5 針對 Kafka 線上系統, 你是如何進行調優的?對 Kafka 來說,「吞吐量」和「延時」是非常重要的優化指標。
吞吐量 TPS:是指 Broker 端或 Client 端每秒能處理的消息數,越大越好。
延時:表示從 Producer 端發送消息到 Broker 端持久化完成到 Consumer 端成功消費之間的時間間隔。與吞吐量 TPS 相反,延時越短越好。
總之,高吞吐量、低延時是我們調優 Kafka 集群的主要目標。
01 提升吞吐量首先是提升吞吐量參數和措施:
Broker | num.replica.fetchers:表示 Follower 副本用多少個線程來拉取消息,默認1個線程。如果 Broker 端 CPU 資源很充足,適當增加該值「但不要超過CPU核數」,以加快 Follower 副本的同步速度。這是因為在生產環境中,配置了 acks=all 的 Producer 端影響吞吐量的首要因素就是副本同步性能。適當增加該值后通??梢钥吹?Producer 端吞吐量增加 |
replica.lag.time.max.ms:在ISR 中,如果 Follower 長時間未向 Leader 發送通信請求或同步數據,則該 Follower 將被踢出 ISR,該值默認為 30s。 | |
num.network.threads:單個Acceptor創建Processor處理器的線程個數,默認值為3, 可以適當提高該值為9。 | |
num.io.threads:服務器用于處理請求的線程數,可能包括磁盤 I/O,默認值是 8,可以適當提高該值為32。 | |
調優參數以避免頻繁性的 Full GC | |
Producer | batch.size:表示消息批次大小,默認是 16kb。 如果 batch 設置太小,會導致頻繁網絡請求,吞吐量下降; 如果 batch 設置太大,會導致一條消息需要等待很久才能被發送出去,增加網絡延時。 所以適當增加會提高吞吐量,建議從默認的16kb增加到512kb或者1M。 |
buffer.memory:RecordAccumulator 發送消息的緩沖區總大小,默認值是 32M,可以增加到 64M。 | |
linger.ms:表示批次緩存時間,如果數據遲遲未達到 batch.size,sender 等待 linger.ms 之后就會發送數據。單位 ms,默認值是 0,意思就是消息必須立即被發送。 如果設置的太短,會導致頻繁網絡請求,吞吐量下降; 如果設置的太長,會導致一條消息需要等待很久才能被發送出去,增加網絡延時。 所以適當增加會提高吞吐量,建議10~100毫秒。 | |
compression.type:默認是 none,不壓縮,但是也可以使用 lz4 壓縮,效率還是不錯的,壓縮之后可以減小數據量,提升吞吐量,但是會加大 producer 端的 CPU 開銷。支持壓縮類型:none、gzip、snappy、lz4 和 zstd。 | |
設置acks=0/1,retries=0,優化目標是吞吐量,不要設置 acks=all「副本同步時間拉長」及開啟重試「執行時間拉長」。 | |
Consumer | 利用多線程增加整體吞吐量 |
fetch.min.bytes:表示只要 Broker 端積攢了多少數據,就可以返回給 Consumer 端。默認值1字節,適當增大該值為1kb或者更大。 | |
fetch.max.bytes:消費者獲取服務器端一批消息最大的字節數。一批次的大小受 message.max.bytes 【broker 配置】或 max.message.bytes 【topic config】影響,默認是50M。 | |
max.poll.records:表示一次 poll 拉取數據返回消息的最大條數大小,默認是 500 條。 | |
分 區 | 增加分區來提高吞吐量 |
降低延時的目的就是盡量減少端到端的延時。
對比上面提升吞吐量的參數,我們只能調整 Producer 端和 Consumer 端的參數配置。
對于 Producer 端,此時我們希望可以快速的將消息發送出去,必須設置 linger.ms=0,同時關閉壓縮,另外設置 acks = 1,減少副本同步時間。
而對于 Consumer 端我們只保持 fetch.min.bytes=1 ,即 Broker 端只要有能返回的數據,就立即返回給 Consumer,減少延時。
03 合理設置分區數分區數不是越多越好,也不是越少越好,需要搭建完集群,進行壓測,再靈活調整分區個數。
這里可以用 Kafka 官方自帶的腳本,對 Kafka 進行壓測。
1)生產者壓測:kafka-producer-perf-test.sh
2)消費者壓測:kafka-consumer-perf-test.sh
X 關閉
X 關閉