生產環境下,為了盡可能提升Kafka的整體吞吐量,可以對Kafka的相關配置參數進行調整,以達到提升整體性能的目的。本文主要從Kafka的不同組件出發,講解各組件涉及的配置參數和參數含義。
一、生產者
1、acks:Producer需要Leader確認的Producer請求的應答數。
(1)acks = 0: 表示Producer請求立即返回,不需要等待Leader的任何確認。這種方案有最高的吞吐率,但是不保證消息是否真的發送成功。
(2)acks = -1: 表示分區Leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認為Producer請求成功。這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的。
(3)acks = 1: 表示Leader副本必須應答此Producer請求并寫入消息到本地日志,之后Producer請求被認為成功。如果此時Leader副本應答請求之后掛掉了,消息會丟失。這個方案,提供了不錯的持久性保證和吞吐。
2、buffer.memory:該參數用于指定Producer端用于緩存消息的緩沖區大小,單位為字節,默認值為:33554432即32MB。
3、compression.type:壓縮器,目前支持none(不壓縮),gzip,snAppy和lz4。
4、retries:Producer發送消息失敗重試的次數。重試時Producer會重新發送之前由于瞬時原因出現失敗的消息。瞬時失敗的原因可能包括:元數據信息失效、副本數量不足、超時、位移越界或未知分區等。倘若設置了retries > 0,那么這些情況下Producer會嘗試重新發送。
5、batch.size:默認值為16KB,Producer按照batch進行發送,當batch滿了后,Producer會把消息發送出去。
6、linger.ms:Producer是按照batch進行發送的,但是還要看linger.ms的值,默認是0,表示不做停留。為了減少了網絡IO,提升整體的性能。建議設置5-100ms。
二、Broker
1、replica.lag.time.max.ms:ISR中,如果Follower長時間未向Leader發送通信請求或同步數據,則該Follower將被踢出ISR。該時間閾值,默認30s。
2、auto.leader.rebalance.enable:默認是true。自動Leader Partition 平衡。
3、leader.imbalance.per.broker.percentage:默認是10%。每個Broker允許的不平衡的Leader的比率。如果每個Broker超過了這個值,控制器會觸發Leader的平衡。
4、leader.imbalance.check.interval.seconds:默認值300秒。檢查Leader負載是否平衡的間隔時間。
5、log.segment.bytes:Kafka中log日志是分成一塊塊存儲的,此配置是指log日志劃分 成塊的大小,默認值1G。
6、log.index.interval.bytes:默認4KB,Kafka里面每當寫入了4KB大小的日志(.log),然后就往index文件里面記錄一個索引。
7、log.retention.hours:Kafka中數據保存的時間,默認7天。
8、log.retention.minutes:Kafka中數據保存的時間,分鐘級別,默認關閉。
9、log.retention.ms:Kafka中數據保存的時間,毫秒級別,默認關閉。
10、log.retention.check.interval.ms:檢查數據是否保存超時的間隔,默認是5分鐘。
11、log.retention.bytes:默認等于-1,表示無窮大。超過設置的所有日志總大小,刪除最早的segment。
12、log.cleanup.policy:默認是delete,表示所有數據啟用刪除策略;如果設置值為compact,表示所有數據啟用壓縮策略。
13、num.io.threads:默認是8。負責寫磁盤的線程數。整個參數值要占總核數的50%。
14、num.replica.fetchers:副本拉取線程數,這個參數占總核數的50%的1/3。
15、num.NETwork.threads:默認是3。數據傳輸線程數,這個參數占總核數的50%的2/3 。
16、log.flush.interval.messages:強制頁緩存刷寫到磁盤的條數,默認是long的最大值,9223372036854775807。一般不建議修改,交給系統自己管理。
17、log.flush.interval.ms:每隔多久,刷數據到磁盤,默認是null。一般不建議修改,交給系統自己管理。
三、消費者
1、bootstrap.servers:向Kafka集群建立初始連接用到的host/port列表。
2、key.deserializer和value.deserializer:指定接收消息的key和value的反序列化類型。一定要寫全類名。
3、group.id:標記消費者所屬的消費者組。
4、enable.auto.commit:默認值為true,消費者會自動周期性地向服務器提交偏移量。
5、auto.commit.interval.ms:如果設置了 enable.auto.commit 的值為true, 則該值定義了消費者偏移量向Kafka提交的頻率,默認5s。
6、auto.offset.reset:當Kafka中沒有初始偏移量或當前偏移量在服務器中不存在(如,數據被刪除了),該如何處理?
(1)earliest:自動重置偏移量到最早的偏移量。
(2)latest:默認,自動重置偏移量為最新的偏移量。
(3)none:如果消費組原來的偏移量不存在,則向消費者拋異常。
7、offsets.topic.num.partitions:__consumer_offsets的分區數,默認是50個分區。
8、heartbeat.interval.ms:Kafka消費者和coordinator之間的心跳時間,默認3s。該條目的值必須小于 session.timeout.ms ,也不應該高于 session.timeout.ms 的1/3。
9、session.timeout.ms:Kafka消費者和coordinator之間連接超時時間,默認45s。超過該值,該消費者被移除,消費者組執行再平衡。
10、max.poll.interval.ms:消費者處理消息的最大時長,默認是5分鐘。超過該值,該消費者被移除,消費者組執行再平衡。
11、fetch.min.bytes:默認1個字節。消費者獲取服務器端一批消息最小的字節數。
12、fetch.max.wAIt.ms:默認500ms。如果沒有從服務器端獲取到一批數據的最小字節數。該時間到,仍然會返回數據。
13、fetch.max.bytes:默認值: 52428800字節,即50MB。消費者獲取服務器端一批消息最大的字節數。如果服務器端一批次的數據大于該值仍然可以拉取回來這批數據,因此,這不是一個絕對最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影響。
14、max.poll.records:一次poll拉取數據返回消息的最大條數,默認500條。
四、總結
本文總結了Kafka參數,包含了Producer、Broker和Consumer的參數,并且給出了調優Kafka的關鍵參數配置,可以直接用于生產環境。