了解多個分區的訊息傳遞
1.概述
Apache Kafka 是一個分散式串流處理平台,它透過基於分區的架構處理高吞吐量資料。當我們向 Kafka 主題發送訊息時,它們會分佈在多個分區上進行平行處理。這種設計使 Kafka 能夠在保持效能的同時進行水平擴展。
了解 Kafka 如何跨分區傳遞訊息對於建立可靠的分散式系統至關重要。分區策略會影響訊息排序、消費者擴充以及整體系統行為。在本文中,我們將探討當主題包含多個分區時 Kafka 如何傳遞訊息,重點關注路由策略、排序保證和消費者協調。
2. 訊息路由到分區
Kafka 使用兩種主要策略來根據訊息是否包含鍵來決定哪個分區接收訊息。這項決策從根本上影響了訊息的分發和處理方式。
2.1. 基於鍵的分區
當我們發送帶有鍵的訊息時,Kafka 會使用確定性雜湊函數(很可能是 Murmur2 雜湊)將其一致地路由到同一分區。這確保了相關訊息保持在一起:
public void sendMessagesWithKey() {
String key = "user-123";
for (int i = 0; i <= 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", key, "Event " + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
logger.info("Key: {}, Partition: {}, Offset: {}", key, metadata.partition(),
metadata.offset());
}
});
}
producer.flush();
}
Kafka 對鍵應用 MurmurHash2 演算法,並使用分區計數進行模運算來選擇目標分區。所有帶有鍵“user-123”的訊息將始終位於同一分區,從而確保它們按順序處理。當我們需要維護特定實體的狀態或順序時,這尤其有用。
2.2. 無密鑰訊息的循環機制
沒有金鑰的訊息使用黏性分區進行分發,這是一種透過有效地批次訊息來提高吞吐量的策略:
public Map<Integer, Integer> sendMessagesWithoutKey() {
Map<Integer, Integer> partitionCounts = new HashMap<>();
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("events", null, // no key
"Message " + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
synchronized (partitionCounts) {
partitionCounts.merge(metadata.partition(), 1, Integer::sum);
}
}
});
}
producer.flush();
logger.info("Distribution across partitions: {}", partitionCounts);
return partitionCounts;
}
由於沒有按鍵,Kafka 會先將批次填入一個分割區,然後再移動到下一個分割區。與純循環分配相比,這減少了網路請求並提高了壓縮率。黏性行為會持續到批次已滿或延遲時間到期。
3. 跨分區的排序保證
Kafka 的順序保證完全依賴分區結構。理解分區結構對於設計正確處理順序操作的系統至關重要。
3.1. 分區排序
每個分區通過順序偏移分配來保持嚴格的順序。訊息將附加到分區日誌中,並按照該順序進行使用:
public void demonstratePartitionOrdering() throws InterruptedException {
String orderId = "order-789";
String[] events = { "created", "validated", "paid", "shipped", "delivered" };
for (String event : events) {
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, event);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
logger.info("Event: {} -> Partition: {}, Offset: {}", event, metadata.partition(),
metadata.offset());
}
});
// small delay to demonstrate sequential processing
Thread.sleep(100);
}
producer.flush();
}
由於所有訊息共用相同的金鑰,它們會路由到同一個分區,並在消費過程中保持其順序。此保證在同一個分區內是絕對的:消費者將始終按照事件的生成順序讀取它們。
3.2. 無跨分區排序
具有不同鍵的訊息可能會落在不同的分區中,且 Kafka 不提供跨分區的排序保證:
public void demonstrateCrossPartitionBehavior() {
long startTime = System.currentTimeMillis();
// these will likely go to different partitions
producer.send(new ProducerRecord<>("events", "key-A", "First at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.send(new ProducerRecord<>("events", "key-B", "Second at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.send(new ProducerRecord<>("events", "key-C", "Third at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.flush();
}
即使我們按順序發送這些訊息,由於它們位於不同的分區,消費者也可能無序地處理它們。由於消費者負載或網路狀況,一個分區的處理速度可能比另一個分區更快。
4.消費者組協調
Kafka 透過在群組中的消費者之間分配分區來實現水平擴展。這種協調是 Kafka 可擴展性模型的基礎。
4.1. 組內分區分配
當多個消費者加入同一個群組時,Kafka 會將每個分區分配給一個消費者,從而防止群組內重複處理:
public void createConsumerGroup() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "order-processors");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
int recordCount = 0;
while (recordCount < 10) { // process limited records for demo
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("Consumer: {}, Partition: {}, Offset: {}, Value: {}", Thread.currentThread()
.getName(), record.partition(), record.offset(), record.value());
recordCount++;
}
consumer.commitSync();
}
consumer.close();
}
如果群組中有六個分區和三個消費者,則每個消費者通常處理兩個分區。這種分配方式可確保負載平衡,且群組內不會出現訊息重複。 Kafka 的群組協調器會自動管理這些分配。
4.2. 多組用於扇出
不同的消費者群組可以獨立處理相同的訊息,使多個應用程式能夠對相同的事件做出反應:
public void startMultipleGroups() {
String[] groupIds = { "analytics-group", "audit-group", "notification-group" };
CountDownLatch latch = new CountDownLatch(groupIds.length);
for (String groupId : groupIds) {
startConsumerGroup(groupId, latch);
}
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
}
private void startConsumerGroup(String groupId, CountDownLatch latch) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
// other properties
new Thread(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("orders"));
int recordCount = 0;
while (recordCount < 5) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
recordCount += processRecordsForGroup(groupId, records);
}
} finally {
latch.countDown();
}
}).start();
}
每個組維護各自的偏移量跟踪,允許不同的服務按照各自的節奏處理訊息。此模式支援事件驅動架構,其中多個系統可以對相同業務事件做出回應。
5. 處理消費者重新平衡
當消費者加入或離開某個群組時,Kafka 會重新平衡分區分配。此程序可確保持續運行,但可能造成暫時中斷。我們可以使用協作式重新平衡來最大程度地降低影響:
public void configureCooperativeRebalancing() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "cooperative-group");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection partitions) {
logger.info("Revoked partitions: {}", partitions);
// complete processing of current records
}
@Override
public void onPartitionsAssigned(Collection partitions) {
logger.info("Assigned partitions: {}", partitions);
// initialize any partition-specific state
}
});
// process a few records to demonstrate
int recordCount = 0;
while (recordCount < 5) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
recordCount += records.count();
}
consumer.close();
}
協作式重新平衡允許未受影響的消費者繼續處理,同時僅重新分配必要的分區。這顯著降低了擴展操作的影響。
6. 加工保證
為了實現可靠的訊息處理,我們通常透過手動控制偏移提交來實現至少一次傳遞:
public void processWithManualCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "manual-commit-group");
props.put("enable.auto.commit", "false");
props.put("max.poll.records", "10");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
int totalProcessed = 0;
while (totalProcessed < 10) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processOrder(record);
totalProcessed++;
} catch (Exception e) {
logger.error("Processing failed for offset: {}", record.offset(), e);
break;
}
}
if (!records.isEmpty()) {
consumer.commitSync();
logger.info("Committed {} records", records.count());
}
}
consumer.close();
}
這種方法可以確保我們的訊息在處理失敗時不會遺失,儘管我們必須設計處理邏輯來優雅地處理潛在的重複訊息。
7. 整合
當生產者向 Kafka 發送訊息時,該過程從分區選擇開始。帶鍵的訊息使用一致性雜湊演算法來確保相關資料保持在同一分區中。無鍵訊息使用黏性分區來提高批次效率。在每個分區內,Kafka 會分配連續的偏移量以保持嚴格的順序,但跨分區不存在全域排序。
每個分區分配給每個群組的一個消費者,從而實現並行處理,避免重複。不同的消費者群組獨立消費相同的訊息,每個群組都有單獨的偏移量追蹤。當消費者加入或離開時,Kafka 會使用協作策略重新平衡分區分配,以減少中斷。這種設計使 Kafka 能夠水平擴展,同時保持每個分區內的順序。
9. 結論
在本文中,我們探討了 Kafka 基於分區的架構如何在處理訊息傳遞的同時,在最重要的位置保持排序保證。我們已經看到,Kafka 優先考慮可擴展性和吞吐量,而不是全域排序,從而提供了符合大多數實際需求的分區級保證。
關鍵在於理解分區是 Kafka 中並行性和排序的單位。透過圍繞這些約束設計應用程序,我們可以建立可擴展的系統,高效處理數百萬個訊息。本文相關的程式碼可在 GitHub 上取得。