使用反應式 Kafka Stream 和 Spring WebFlux
1. 概述
在本文中,我們將探索 Reactive Kafka Streams ,將它們整合到範例Spring WebFlux 應用程式中,並研究這種組合如何使我們能夠建立具有可擴展性、效率和即時處理能力的完全反應式、資料密集型應用程式.
為了實現這一目標,我們將使用Spring Cloud Stream Reactive Kafka Binder 、 Spring WebFlux和ClickHouse 。
2.Spring Cloud Stream反應式Kafka Binder
Spring Cloud Stream 在基於串流和訊息驅動的微服務上提供了一個抽象層。 Reactive Kafka Binder 透過連接 Kafka 主題、訊息代理程式或 Spring Cloud Stream 應用程式來建立完全反應式管道。這些管道利用 Project Reactor 來回應式處理資料流,確保整個資料流的非阻塞、非同步和背壓感知處理。
與同步運行的傳統 Kafka Streams 不同,Reactive Kafka Streams 使開發人員能夠定義端到端的反應式管道,其中每個資料都可以即時映射、轉換、過濾或減少,同時仍保持高效的資源利用。
這種方法特別適合需要反應性範例以實現更好的可擴展性和響應能力的高吞吐量、事件驅動的應用程式。
2.1. Spring 的反應式 Kafka 流
借助 Spring Cloud Stream Reactive Kafka Binder,我們可以將 Reactive Kafka Streams 無縫整合到 Spring WebFlux 應用程式中,從而實現完全反應式、非阻塞的資料處理。透過利用Project Reactor提供的反應式API,我們可以處理背壓,實現非同步資料流,並在不阻塞執行緒的情況下有效率地處理流。
Reactive Kafka Streams 和 Spring WebFlux 的這種組合為建立需要分散式、即時和反應式資料管道的應用程式提供了強大的解決方案。
接下來,讓我們深入研究範例應用程式來演示這些功能的實際應用。
3. 建立反應式 Kafka 流應用程式
在此範例應用程式中,我們將模擬一個接收、處理和分發股票價格資料的股票分析應用程式。該應用程式將展示 Spring Cloud Stream、Kafka 和響應式編程範例在 Spring 生態系統中如何協同工作。
首先,讓我們取得使用Spring Boot建置此類應用程式所需的所有依賴項:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
對於此範例,我們將使用Spring Cloud BOM ,它解決所有依賴項的版本。我們還將使用Spring Boot和以下模組:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
這些模組使我們能夠響應式地建立 Web 層和資料攝取管道。儘管我們有資料處理管道,但我們仍需要一些資料持久性來保存其結果。讓我們使用一個簡單且非常強大的分析資料庫來執行此操作:
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-r2dbc</artifactId>
<version>0.7.1</version>
</dependency>
ClickHouse是一個快速、開源、以列導向的資料庫管理系統,可使用 SQL 查詢產生即時分析資料報表。由於我們的目標是建立一個完全響應式的應用程序,因此我們將使用其R2DB 驅動程式。
3.1.反應式 Kafka 生產者設置
為了啟動我們的資料處理管道,我們需要一個生產者負責建立資料並將其提交給我們的應用程式以進行資料攝取。接下來,我們將看到 Spring 如何幫助我們輕鬆定義和使用生產者:
@Component
public class StockPriceProducer {
public static final String[] STOCKS = {"AAPL", "GOOG", "MSFT", "AMZN", "TSLA"};
private static final String CURRENCY = "USD";
private final ReactiveKafkaProducerTemplate<String, StockUpdate> kafkaProducer;
private final NewTopic topic;
private final Random random = new Random();
public StockPriceProducer(KafkaProperties properties,
@Qualifier(TopicConfig.STOCK_PRICES_IN) NewTopic topic) {
this.kafkaProducer = new ReactiveKafkaProducerTemplate<>(
SenderOptions.create(properties.buildProducerProperties())
);
this.topic = topic;
}
public Flux<SenderResult<Void>> produceStockPrices(int count) {
return Flux.range(0, count)
.map(i -> {
String stock = STOCKS[random.nextInt(STOCKS.length)];
double price = 100 + (200 * random.nextDouble());
return MessageBuilder.withPayload(new StockUpdate(stock, price, CURRENCY, Instant.now()))
.setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
})
.flatMap(stock -> {
var newRecord = new ProducerRecord<>(
topic.name(),
stock.getPayload().symbol(),
stock.getPayload());
stock.getHeaders()
.forEach((key, value) -> newRecord.headers().add(key, value.toString().getBytes()));
return kafkaProducer.send(newRecord);
});
}
}
此類產生股票價格更新並將其發送到我們的 Kafka 主題。
在StockPriceProducer,
我們注入應用程式 YAML 檔案中定義的KafkaProperties
,其中包含連接到 Kafka 叢集所需的所有資訊:
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring:
json:
trusted:
packages: '*'
然後, NewTopic
保存對 Kafka 主題的引用,這就是我們創建ReactiveKafkaProducerTemplate
實例所需的全部內容。此類抽象化了我們的應用程式和 Kafka 主題之間溝通所涉及的大部分複雜性。
在produceStockPrices()
方法中,我們產生StockUpdate
物件並將它們包裝在Message
物件中。 Spring 提供了Message
類,它封裝了基於訊息的系統詳細信息,例如訊息有效負載以及我們可能需要包含為訊息內容類型的任何必要標頭。最後,我們建立一個ProducerRecord
定義訊息的目標主題及其分區鍵,然後發送它。
3.2.反應式 Kafka 串流設置
現在,讓我們假設生產者位於同一應用程式之外。我們需要連接到股票價格更新主題並將股票價格從美元轉換為歐元,以便其他應用程式部分可以使用該數據。同時,我們需要保存特定時間窗口內原始股票價格的歷史記錄。那麼,讓我們來配置我們的資料流管道:
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers: localhost:9092
bindings:
default:
content-type: application/json
processStockPrices-in-0:
destination: stock-prices-in
group: live-stock-consumers-x
processStockPrices-out-0:
destination: stock-prices-out
group: live-stock-consumers-y
producer:
useNativeEncoding: true
首先,我們使用default-binder屬性將Kafka定義為我們的預設binder。 Spring Cloud Stream 與供應商無關,允許我們在必要時在同一應用程式中使用不同的訊息系統(例如 Kafka 和 RabbitMQ) 。
接下來,我們配置綁定,它充當訊息系統(例如,Kafka 主題)與應用程式的生產者和消費者之間的橋樑:
- 輸入通道
processStockPrices-in-0
綁定到stock-prices-in
主題,訊息在其中消費。 - 輸出通道
processStockPrices-out-0
綁定到stock-prices-out
主題,在其中發布已處理的訊息。
每個綁定都與processStockPrices()
方法關聯,該方法處理來自輸入通道的資料、應用轉換並將結果傳送到輸出通道。
我們也將內容類型定義為 JSON,確保訊息被序列化和反序列化為 JSON。此外,在生產者中使用useNativeEncoding: true
可確保 Kafka 生產者負責對資料進行編碼和序列化。
群組屬性(例如live-stock-consumers-x
)支援跨消費者的訊息負載平衡。同一組中的所有消費者負責處理來自某個主題的訊息,以防止重複。
3.3.反應式 Kafka 流綁定設置
如前所述,綁定是輸入和輸出通道之間的橋樑,使我們能夠處理傳輸中的資料。 YAML 檔案中定義的名稱至關重要,因為它必須與綁定實作相對應,在我們的例子中,是一個在輸入和輸出訊息之間應用映射的函數。
接下來我們來看看Spring是怎麼做的:
@Configuration
public class StockPriceProcessor {
private static final String USD = "USD";
private static final String EUR = "EUR";
@Bean
public Function<Flux<Message<StockUpdate>>, Flux<Message<StockUpdate>>> processStockPrices(
ClickHouseRepository repository,
CurrencyRate currencyRate
) {
return stockPrices -> stockPrices.flatMapSequential(message -> {
StockUpdate stockUpdate = message.getPayload();
return repository.saveStockPrice(stockUpdate)
.flatMap(success -> Boolean.TRUE.equals(success) ? Mono.just(stockUpdate) : Mono.empty())
.flatMap(stock -> currencyRate.convertRate(USD, EUR, stock.price()))
.map(newPrice -> convertPrice(stockUpdate, newPrice))
.map(priceInEuro -> MessageBuilder.withPayload(priceInEuro)
.setHeader(KafkaHeaders.KEY, stockUpdate.symbol())
.copyHeaders(message.getHeaders())
.build());
});
}
private StockUpdate convertPrice(StockUpdate stockUpdate, double newPrice) {
return new StockUpdate(stockUpdate.symbol(), newPrice, EUR, stockUpdate.timestamp());
}
}
此配置示範如何在兩個 Kafka 主題之間被動地處理和轉換股票價格更新。 processStockPrices()
函數將輸入stock-prices-in
主題綁定到輸出stock-prices-out
主題,並在它們之間添加一個處理層。流程如下:
- 訊息處理:使用
flatMapSequential()
順序處理來自輸入主題的每個傳入StockUpdate
訊息。這可確保處理順序與輸入訊息的順序相匹配,這對於保持一致性非常重要。 - 資料庫持久性:每個庫存更新都使用
ClickHouseRepository
儲存到資料庫中以供日後參考。只有成功保存的更新才會繼續進行。 - 貨幣轉換:股票價格最初以美元為單位,使用
CurrencyRate
服務轉換為歐元。 - 訊息轉換:轉換後的價格被包裝在一個新的
StockUpdate
物件中,透過KafkaHeaders.KEY
保留原始交易品種作為 Kafka 訊息鍵。這確保了 Kafka 主題中正確的訊息分區。 - 反應式管道:整個流程是反應式的,利用 Project Reactor 的非阻塞非同步功能來實現可擴展性和效率。
3.4.輔助服務
ClickHouseRepository
和CurrencyRate
是簡單的接口,為我們提供了一個簡單的實作來說明範例應用程式:
public interface CurrencyRate {
Mono<Double> convertRate(String from, String to, double amount);
}
public interface ClickHouseRepository {
Mono<Boolean> saveStockPrice(StockUpdate stockUpdate);
Flux<StockUpdate> findMinuteAvgStockPrices(Instant from, Instant to);
}
這些功能向我們展示了應用程式在處理此類資料管道時可以應用的業務邏輯。
3.5.反應式 Kafka Streams 消費者設置
處理後,發送到輸出通道的資料可以由同一應用程式或任何其他應用程式使用。這樣的消費者也可以使用反應式 Kafka 模板來實現:
@Component
public class StockPriceConsumer {
private final ReactiveKafkaConsumerTemplate<String, StockUpdate> kafkaConsumerTemplate;
public StockPriceConsumer(@NonNull KafkaProperties properties,
@Qualifier(TopicConfig.STOCK_PRICES_OUT) NewTopic topic) {
var receiverOptions = ReceiverOptions
.<String, StockUpdate>create(properties.buildConsumerProperties())
.subscription(List.of(topic.name()));
this.kafkaConsumerTemplate = new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@PostConstruct
public void consume() {
kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> {
// simulate processing
log.info(
"received key={}, value={} from topic={}, offset={}, partition={}", consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset(),
consumerRecord.partition());
})
.doOnError(e -> log.error("Consumer error", e))
.doOnComplete(() -> log.info("Consumed all messages"))
.subscribe();
}
}
StockPriceConsumer
示範了以反應方式使用來自 stock-prices-out 主題的數據:
- 初始化:建構函式使用 YAML 配置中的 Kafka 屬性建立
ReceiverOptions
。它訂閱stock-prices-out
主題並明確分配所有分區。 - 訊息處理: Consumer 方法使用
receiveAutoAck()
訂閱輸出通道 (processStockPrices-out-0
)。每個訊息都記錄有鍵、值、主題、偏移量和分區詳細信息,模擬資料處理。 - 響應式功能:消費者在訊息到達時開始回應式處理訊息,利用非阻塞、背壓感知處理。它還記錄錯誤
doOnError()
並追蹤完成doOnComplete()
。
以下屬性配置我們的消費者:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: my-group
properties:
reactiveAutoCommit: true
該消費者反應式地處理stock-prices-out
主題,該實現突出了反應式編程與 Kafka 的無縫集成,以實現高效的流處理。
3.6.反應式 WebFlux 應用程式
最後,現在數據已保存在我們的資料庫中,我們可以向用戶充分提供此類信息,因為數據已緩存在我們的服務中,並且可以根據需要進行處理:
@RestController
public class StocksApi {
private final ClickHouseRepository repository;
@Autowired
public StocksApi(ClickHouseRepository repository) {
this.repository = repository;
}
@GetMapping("/stock-prices-out")
public Flux<StockUpdate> getAvgStockPrices(@RequestParam("from") @NotNull Instant from,
@RequestParam("to") @NotNull Instant to) {
if (from.isAfter(to)) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "'from' must come before 'to'");
}
return repository.findMinuteAvgStockPrices(from, to);
}
}
4. 連接點
我們用最少的程式碼實現了完全反應式的資料處理管道,連接兩個 Kafka 主題,應用業務邏輯,並確保高吞吐量處理。這種方法非常適合需要即時資料轉換的事件驅動系統。 Spring Cloud Stream 和 Kafka 形成了一個強大的組合,具有超越我們在此介紹的廣泛功能。
例如,綁定支援多個輸入和輸出,而死信佇列 ( DLQ ) 可以增強管道的穩健性。還可以整合各種訊息傳遞提供者、啟用通道之間的事務處理等等。
Spring Cloud Stream 是一個多功能工具。將其與反應式範例結合,可以釋放具有彈性和高吞吐量的強大資料管道。本文只觸及了使用響應式 Kafka Streams 和 Spring WebFlux 的皮毛,還有更多內容有待探索,但到目前為止我們已經觀察到了主要好處:
- 即時轉換:實現事件流的即時轉換和豐富。
- 背壓管理:動態處理資料流,避免系統過載。
- 無縫整合:將 Kafka 的事件驅動能力與 Spring WebFlux 的非阻塞功能結合。
- 可擴展設計:支援具有強大容錯機制(如 DLQ)的高吞吐量系統。
儘管這種方法提供了許多好處,正如本文所討論的,但也有一些需要注意的地方。
4.1.實際陷阱和最佳實踐
雖然反應式 Kafka 管道提供了許多優勢,但它們也帶來了挑戰:
- 反壓處理:未能管理反壓將導致記憶體膨脹或訊息遺失。我們需要在適當的情況下使用
.onBackpressureBuffer()
或.onBackpressureDrop()
。 - 序列化問題:生產者和消費者之間的架構不匹配可能會導致反序列化失敗。我們必須確保模式相容性。
- 錯誤恢復:我們必須確保適當的重試機製或使用 DLQ 來有效處理瞬態問題。
- 資源管理:低效率的訊息處理可能會壓垮應用程式管道。在這種情況下,我們可以利用
.limitRate()
或.take(
) 運算子來控制反應管道內的處理速率。我們還可以設定 Kafka 消費者獲取大小和輪詢間隔,以調整從 Kafka 檢索訊息的速率,並避免壓垮應用程式管道。 - 資料一致性:如果沒有原子操作或適當的重試處理,可能會出現不一致的資料處理。我們可以使用 Kafka 事務來實現原子性或/並編寫冪等消費者邏輯來安全地處理重試。
- 架構演進:在沒有適當版本控制的情況下演進架構可能會導致相容性問題。我們可以使用模式註冊表進行版本控制和應用向後相容的變更(例如,新增選用欄位)。
- 監控和可觀察性:監控不充分可能會使識別管道中的瓶頸或故障變得困難。我們必須整合Micrometer和Grafana (或任何其他首選提供者)等工具來進行指標和監控。我們還可以將追蹤 ID 新增到 Kafka 訊息中以進行分散式追蹤。
關注這些要點可以確保我們的系統具有非常穩定且可擴展的資料處理管道。
5. 結論
在本文中,我們示範了反應式 Kafka Streams 如何與 Spring WebFlux 集成,實現完全反應式、可擴展、高效且能夠即時處理的資料密集型管道。透過利用反應式範式,我們在 Kafka 主題之間建立了無縫資料流,應用了業務邏輯,並以最少的程式碼實現了高吞吐量、事件驅動的處理。這種強大的組合強調了現代反應技術在創建專為即時資料轉換而客製化的強大且可擴展的系統方面的潛力。
與往常一樣,本文中使用的所有程式碼範例都可以在 GitHub 上取得。