日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

消息引擎的核心職責就是將生產(chǎn)者生產(chǎn)的消息傳輸?shù)较M者,設(shè)計消息格式是各大消息引擎框架的關(guān)鍵問題,因為消息格式?jīng)Q定了消息引擎的性能和效率。本文帶大家探究消息引擎kafka當前所用的message格式是什么。

一、Kafka message format

kafka從0.11.0版本開始所使用的消息格式版本為v2,參考了 Protocol Buffer而引入了變長整型(Varints)和 ZigZag 編碼。Varints是使用一個或多個字節(jié)來序列化整數(shù)的一種方法,數(shù)值越小,其所占用的字節(jié)數(shù)就越少。ZigZag編碼以一種鋸齒形(zig-zags)的方式來回穿梭于正負整數(shù)之間, 以使得帶符號整數(shù)映射為無符號整數(shù),這樣可以使得絕對值較小的負數(shù)仍然享有較小的Varints編碼值,比如-1編碼為1,1編碼為2,-2編碼為3。

kafka v0和v1版本的消息格式,如果消息本身沒有key,那么key length字段為-1,int類型的需要4個字節(jié)來保存,而如果采用Varints來編碼則只需要一個字節(jié)。根據(jù)Varints的規(guī)則可以推導(dǎo)出0-63之間的數(shù)字占1個字節(jié),64-8191之間的數(shù)字占2個字節(jié),8192-1048575之間的數(shù)字占3個字節(jié)。而kafka broker的配置message.max.bytes的默認大小為1000012(Varints編碼占3個字節(jié)),如果消息格式中與長度有關(guān)的字段采用Varints的編碼的話, 絕大多數(shù)情況下都會節(jié)省空間,而v2版本的消息格式也正是這樣做的。不過需要注意的是Varints并非一直會省空間,一個int32最長會占用5個字節(jié)(大于默認的4字節(jié)), 一個int64最長會占用10字節(jié)(大于默認的8字節(jié))。

因為Kafka的message經(jīng)歷過幾次的版本迭代更改,本文以v2版本為例講述。

二、Record Batch

在Kafka中,數(shù)據(jù)是按照topic和partition的方式進行組織和存儲的。每個partition的數(shù)據(jù)被分成一個或多個segment文件,并且每個segment文件包含若干個Record Batch。因此,Record Batch也是Kafka中重要的數(shù)據(jù)結(jié)構(gòu)之一。

在Kafka中,Record Batch指的是一組相關(guān)的消息集合,它們具有相同的key、value類型和所屬的topic和partition。每個Record Batch包含若干條消息(Record),并且這些消息被順序地寫入到磁盤中,以提高讀取效率。

具體而言,Record Batch由以下幾部分構(gòu)成:

Record Batch Header:包含了當前Batch的元數(shù)據(jù),如Magic Code、Batch Size、First Offset等信息。

Record Header:每個Record都附帶有一個Header,用于描述該Record的元數(shù)據(jù)信息,例如時間戳、壓縮類型、CRC校驗值等。

Record Body:記錄具體的消息內(nèi)容,包括Key、Value等字段。

需要注意的是,Kafka的Record Batch通常具有比較大的體積(默認大小為16KB),因此可以將多個相關(guān)的消息打包在一起進行傳輸和處理,從而提高了消息的傳輸效率和吞吐量。另外,Kafka還支持對Record Batch進行壓縮和批量操作,以進一步提高數(shù)據(jù)的傳輸效率和性能。

總的來說,Record Batch是Kafka中定義的一個重要數(shù)據(jù)結(jié)構(gòu),用于管理和組織消息,提高消息的讀寫效率和傳輸性能。

baseoffset: int64 標識當前的batch的起始偏移量

batchLength: int32 該batch的長度

partitionLeaderEpoch: int32 確保數(shù)據(jù)可靠性

magic: int8 魔法數(shù)字,當前為2,也即當前的message版本為v2版本

crc: int32 crc校驗

attributes: int16 消息屬性

bit 0~2: 是否壓縮和壓縮的格式

0: no compression

1: gzip

2: snAppy

3: lz4

4: zstd

bit 3: timestampType

bit 4: isTransactional (0 means not transactional)

bit 5: isControlBatch (0 means not a control batch)

bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)

bit 7~15: unused

lastOffsetDelta: int32 RecordBatch中最后一個Record的offset與first offset的差值

baseTimestamp: int64 第一條時間戳

maxTimestamp: int64 最大的時間戳,保證消息組裝時的正確性

producerId: int64 支持冪等性

producerEpoch: int16 支持冪等性

baseSequence: int32 支持冪等性,消息序號

records: [Record] Record個數(shù)

用以下圖表示 V2 版本消息批次的樣子:

三、Record

在Kafka中,Record Batch和Record是兩種不同的數(shù)據(jù)結(jié)構(gòu),但它們之間存在著緊密的關(guān)系。

Record是指Kafka中的一條消息,通常由Key、Value、Timestamp等字段組成。而Record Batch是指將多個相關(guān)的Record打包在一起進行傳輸和處理的數(shù)據(jù)結(jié)構(gòu),每個Record Batch通常包含若干條記錄,并且這些記錄具有相同的key、value類型和所屬的topic和partition。

具體來說,每個Record Batch中的Record都被依次存儲在一個連續(xù)的二進制數(shù)據(jù)塊中,每個Record包含自己的Header和Body部分。而Record Batch則包含了當前Batch的元數(shù)據(jù)信息和所有記錄的元數(shù)據(jù)信息,如Batch Size、First Offset、Last Offset、CRC校驗值等。

消息格式如下所示:

#消息長度

length: varint

#消息屬性

attributes: int8

# 時間戳增量

bit 0~7: unusedtimestampDelta: varlong

#偏移量增量

offsetDelta: varint

#key長度

keyLength: varint

#key值

key: byte[]

#value長度

valueLen: varint

#value值

value: byte[]

#header信息

Headers => [Header]

Record信息通過如下方式封裝

public static int writeTo(DataOutputStream out,

int offsetDelta,

long timestampDelta,

ByteBuffer key,

ByteBuffer value,

Header[] headers) throws IOException {

// 消息總數(shù)

int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);

ByteUtils.writeVarint(sizeInBytes, out);

// 屬性

byte attributes = 0; // there are no used record attributes at the moment

out.write(attributes);

// 時間增量

ByteUtils.writeVarlong(timestampDelta, out);

// 位移增量

ByteUtils.writeVarint(offsetDelta, out);

// key

if (key == null) {

ByteUtils.writeVarint(-1, out);

} else {

int keySize = key.remAIning();

// key size

ByteUtils.writeVarint(keySize, out);

// key

Utils.writeTo(out, key, keySize);

}

// Value

if (value == null) {

ByteUtils.writeVarint(-1, out);

} else {

int valueSize = value.remaining();

// value size

ByteUtils.writeVarint(valueSize, out);

// value

Utils.writeTo(out, value, valueSize);

}

// header

ByteUtils.writeVarint(headers.length, out);

for (Header header : headers) {

// header key

String headerKey = header.key();

byte[] utf8Bytes = Utils.utf8(headerKey);

// header key 長度

ByteUtils.writeVarint(utf8Bytes.length, out);

// header key 值

out.write(utf8Bytes);

// header value

byte[] headerValue = header.value();

if (headerValue == null) {

ByteUtils.writeVarint(-1, out);

} else {

// header value 長度

ByteUtils.writeVarint(headerValue.length, out);

// header value 值

out.write(headerValue);

}

}

return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;

}

根據(jù)以上代碼邏輯,用以下圖表示 V2 版本消息格式的樣子:

四、總結(jié)

message(又稱record)總是分批寫入的。一批消息的技術(shù)術(shù)語是一個record batch:

 

  • 一個record batch包含一個或多個record。
  • 在退化的情況下,我們可以有一個包含單個record的record batch。
  • record batch和record有它們自己的headers。

分享到:
標簽:Kafka
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定