Spring Cloud AWS 中的 FIFO 佇列支援
1. 概述
AWS SQS 中的 FIFO(先進先出)佇列旨在確保訊息按照傳送的確切順序進行處理,並且每個訊息僅傳送一次。
Spring Cloud AWS v3 透過易於使用的抽象來支援此功能,使開發人員能夠使用最少的樣板程式碼處理 FIFO 佇列功能,例如訊息排序和重複資料刪除。
在本教程中,我們將在金融交易處理系統中探索 FIFO 佇列的三個實際用例:
- 確保同一帳戶內交易的訊息嚴格排序
- 並行處理來自不同帳戶的交易,同時維護每個帳戶的 FIFO 語義
- 在處理失敗的情況下處理訊息重試,確保重試遵循原始訊息順序
我們將透過設定事件驅動的應用程式並建立即時測試來示範這些場景,以斷言行為符合預期,利用 Spring Cloud AWS SQS V3 介紹文章中的環境和測試設定。
2. 依賴關係
首先,我們將透過匯入Spring Cloud AWS 物料清單(BOM) 來管理相依性並確保版本相容性:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.2.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
接下來,我們新增必要的Spring Cloud AWS 啟動器以實現核心功能和SQS 整合:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
我們還將包括Spring Boot Web Starter 。由於我們使用 Spring Cloud AWS BOM,因此不需要指定其版本:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
最後,為了進行測試,我們將使用 JUnit 5 添加LocalStack 和 TestContainers的依賴項、非同步操作驗證的 Awaitility 以及Spring Boot Test Starter :
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3.建置本地測試環境
接下來,我們將使用Testcontainers
和LocalStack
配置本地測試環境。我們將建立一個SqsLiveTestConfiguration
類別:
@Configuration
public class SqsLiveTestConfiguration {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:3.4.0";
@Bean
@ServiceConnection
LocalStackContainer localStackContainer() {
return new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
}
}
在此類中,我們將 LocalStack 測試容器聲明為 Spring Bean,並使用@ServiceConnection
註解代表我們處理連線。
4. 設定隊列名稱
我們將利用 Spring Boot 的設定外部化功能在application.yml
檔案中定義 SQS 佇列名稱:
events:
queues:
fifo:
transactions-queue: "transactions-queue.fifo"
slow-queue: "slow-queue.fifo"
failure-queue: "failure-queue.fifo"
此結構將我們的佇列名稱組織在分層結構下,以便在我們的應用程式程式碼中輕鬆管理和存取它們。對於 SQS 中的FIFO
佇列, .fifo
後綴是必要的**。**
5. 設定應用程式
讓我們透過使用事務微服務的實際範例來說明這些概念。該服務將處理**TransactionEvent
訊息,代表每個帳戶內必須維持有序的金融交易。**
首先,我們定義Transaction
實體:
public record Transaction(UUID transactionId, UUID accountId, double amount, TransactionType type) {}
連同TransactionType
枚舉:
public enum TransactionType {
DEPOSIT,
WITHDRAW
}
接下來,我們建立TransactionEvent
:
public record TransactionEvent(UUID transactionId, UUID accountId, double amount, TransactionType type) {
public Transaction toEntity() {
return new Transaction(transactionId, accountId, amount, type);
}
}
TransactionService
類別處理處理邏輯並維護一個模擬儲存庫以用於測試目的:
@Service
public class TransactionService {
private static final Logger logger = LoggerFactory.getLogger(TransactionService.class);
private final ConcurrentHashMap<UUID, List<Transaction>> processedTransactions =
new ConcurrentHashMap<>();
public void processTransaction(Transaction transaction) {
logger.info("Processing transaction: {} for account {}",
transaction.transactionId(), transaction.accountId());
processedTransactions.computeIfAbsent(transaction.accountId(), k -> new ArrayList<>())
.add(transaction);
}
public List<Transaction> getProcessedTransactionsByAccount(UUID accountId) {
return processedTransactions.getOrDefault(accountId, new ArrayList<>());
}
}
6. 按順序處理事件
在我們的第一個場景中,我們將建立一個處理事件的偵聽器,並將建立一個測試來斷言我們按照事件發送的順序接收事件。我們將使用@RepeatedTest
註釋運行測試 100 次以確保其一致性,並查看它在標準 SQS 佇列(而不是 FIFO)中的行為。
6.1.建立監聽器
讓我們建立第一個偵聽器來按順序接收和處理事件。 我們將使用@SqsListener
註釋,利用 Spring 的佔位符解析來解析application.yml
檔案中的佇列名稱:
@Component
public class TransactionListener {
private final TransactionService transactionService;
public TransactionListener(TransactionService transactionService) {
this.transactionService = transactionService;
}
@SqsListener("${events.queues.fifo.transactions-queue}")
public void processTransaction(TransactionEvent transactionEvent) {
transactionService.processTransaction(transactionEvent.toEntity());
}
}
請注意,無需進一步設定。在幕後,框架將偵測佇列類型是否為 FIFO,並進行所有必要的調整以確保偵聽器方法以正確的順序接收訊息。
6.2.建立測試
讓我們建立一個測試,斷言接收訊息的順序與發送訊息的順序完全相同。我們從一個測試套件開始,該套件擴展了我們之前創建的BaseSqsLiveTest
:
@SpringBootTest
public class SpringCloudAwsSQSTransactionProcessingTest extends BaseSqsLiveTest {
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private TransactionService transactionService;
@Value("${events.queues.fifo.transactions-queue}")
String transactionsQueue;
@Test
void givenTransactionsFromSameAccount_whenSend_shouldReceiveInOrder() {
var accountId = UUID.randomUUID();
var transactions = List.of(createDeposit(accountId, 100.0),
createWithdraw(accountId, 50.0), createDeposit(accountId, 25.0));
var messages = createTransactionMessages(accountId, transactions);
sqsTemplate.sendMany(transactionsQueue, messages);
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId),
isEqual(eventsToEntities(transactions)));
}
}
在此測試中,我們利用SqsTemplate
的sendMany()
方法,該方法使我們能夠在同一批次中發送最多 10 個訊息。然後,我們等待最多 5 秒以按順序接收訊息。
我們還將創建一些輔助方法來幫助我們保持測試邏輯的整潔。 sendMany()
方法需要一個List<Pojo>
,因此createTransactionMessages()
方法將accountId
的每個事務對應到一則訊息:
private List<Message<TransactionEvent>> createTransactionMessages(UUID accountId,
Collection<TransactionEvent> transactions) {
return transactions.stream()
.map(transaction -> MessageBuilder.withPayload(transaction)
.setHeader(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER,
accountId.toString())
.build())
.toList();
}
在 SQS FIFO 中, MessageGroupId
屬性用於通知哪些訊息應分組在一起並按順序接收。在我們的場景中,我們必須確保一個帳戶的交易保持有序,但我們不需要帳戶之間的任何排序,因此我們將使用accountId
作為MessageGroupId
。為此,我們可以使用SqsHeaders
中的標頭,框架會將它們對應到 SQS 訊息屬性:
其餘的輔助方法是將事件對應到交易並建立TransactionEvents
的簡單方法:
private List<Transaction> eventsToEntities(List<TransactionEvent> transactionEvents) {
return transactionEvents.stream()
.map(TransactionEvent::toEntity)
.toList();
}
private TransactionEvent createWithdraw(UUID accountId1, double amount) {
return new TransactionEvent(UUID.randomUUID(), accountId1, amount, TransactionType.WITHDRAW);
}
private TransactionEvent createDeposit(UUID accountId1, double amount) {
return new TransactionEvent(UUID.randomUUID(), accountId1, amount, TransactionType.DEPOSIT);
}
6.3.運行測試
當我們執行測試時,我們將看到測試通過並產生與此類似的日誌,事務發生的順序與我們聲明的順序相同:
TransactionService : Processing transaction: DEPOSIT:100.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
TransactionService : Processing transaction: WITHDRAW:50.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
TransactionService : Processing transaction: DEPOSIT:25.0 for account f97876f9-5ef9-4b62-a69d-a5d87b5b8e7e
如果我們仍然不相信並且想確定這不是巧合,我們可以添加RepeatableTest
註解來運行測試 100 次
@RepeatedTest(100)
void givenTransactionsFromSameAccount_whenSend_shouldReceiveInOrder() {
// ...test remains the same
}
所有 100 次運行都應該同樣通過,且日誌的順序相同。
為了進行額外的健全性檢查,讓我們使用標準佇列而不是 FIFO 並驗證它的行為。
為此,我們需要從 application.yml 中的佇列名稱中刪除.fifo
後綴application.yml:
transactions-queue: "transactions-queue"
接下來,我們將註解掉在createTransactionMessages()
方法中加入MessageId
標頭的程式碼,因為Standard
SQS 佇列不支援該屬性:
// .setHeader(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, accountId.toString())
現在,讓我們再次執行測試 100 次。您會注意到,測試有時會通過,因為訊息恰好按預期順序到達,但有時測試會失敗,因為無法保證標準佇列中的訊息順序。
在結束本節之前,讓我們撤銷這些變更並在佇列中加入.fifo
後綴,刪除@RepeatedTest
註釋,並取消註釋MessageGroupId
程式碼。
7. 並行處理多個訊息組
在SQS FIFO中,為了最大化訊息消費吞吐量,我們可以並行處理來自不同訊息群組的訊息,同時保持訊息群組內的訊息順序。 Spring Cloud AWS SQS 開箱即用地支援該行為,無需進一步配置。
為了說明該行為,讓我們為TransactionService
添加一個方法來模擬慢速連接:
public void simulateSlowProcessing(Transaction transaction) {
try {
processTransaction(transaction);
Thread.sleep(Double.valueOf(100)
.intValue());
logger.info("Transaction processing completed: {}:{} for account {}",
transaction.type(), transaction.amount(), transaction.accountId());
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(e);
}
}
緩慢的連線將幫助我們斷言來自不同帳戶的訊息正在並行處理,同時保留每個帳戶內的交易順序。
現在,讓我們建立一個監聽器,它將使用TransactionListener
類別中的新方法:
@SqsListener("${events.queues.fifo.slow-queue}")
public void processParallelTransaction(TransactionEvent transactionEvent) {
transactionService.simulateSlowProcessing(transactionEvent.toEntity());
}
最後,讓我們建立一個測試來斷言該行為:
@Test
void givenTransactionsFromDifferentAccounts_whenSend_shouldProcessInParallel() {
var accountId1 = UUID.randomUUID();
var accountId2 = UUID.randomUUID();
var account1Transactions = List.of(createDeposit(accountId1, 100.0),
createWithdraw(accountId1, 50.0), createDeposit(accountId1, 25.0));
var account2Transactions = List.of(createDeposit(accountId2, 50.0),
createWithdraw(accountId2, 25.0), createDeposit(accountId2, 50.0));
var allMessages = Stream.concat(createTransactionMessages(accountId1, account1Transactions).stream(),
createTransactionMessages(accountId2, account2Transactions).stream()).toList();
sqsTemplate.sendMany(slowQueue, allMessages);
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId1),
isEqual(eventsToEntities(account1Transactions)));
await().atMost(Duration.ofSeconds(5))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId2),
isEqual(eventsToEntities(account2Transactions)));
}
在此測試中,我們為兩個不同的帳戶發送兩組交易事件。我們再次利用sendMany()
方法在同一批次中發送所有訊息,並斷言訊息正在按預期順序接收。
當我們執行測試時,我們應該會看到類似這樣的日誌:
TransactionService : Processing transaction: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:100.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: DEPOSIT:100.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: WITHDRAW:50.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Processing transaction: WITHDRAW:25.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Transaction processing completed: WITHDRAW:50.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
TransactionService : Transaction processing completed: WITHDRAW:25.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:50.0 for account 639eba64-a40d-458a-be74-2457dff9d6d1
TransactionService : Processing transaction: DEPOSIT:25.0 for account 1a813756-520c-4713-a0ed-791b66e4551c
我們可以看到兩個帳戶正在並行處理,同時保持每個帳戶內的順序,這也通過測試通過得到了驗證。
8. 依序重試處理
在最後一個場景中,我們將模擬網路故障並確保處理順序保持一致。當偵聽器方法引發錯誤時,框架會停止該訊息群組的執行,並且不會確認訊息。可見性視窗到期後,SQS 再次提供剩餘訊息。
為了說明該行為,我們將向TransactionService
新增一個新方法,該方法在第一次處理訊息時總是會失敗。
首先,我們新增一個Set
來保存已經失敗的 ID:
private final Set<UUID> failedTransactions = ConcurrentHashMap.newKeySet();
然後我們加入processTransactionWithFailure()
方法:
public void processTransactionWithFailure(Transaction transaction) {
if (!failedTransactions.contains(transaction.transactionId())) {
failedTransactions.add(transaction.transactionId());
throw new RuntimeException("Simulated failure for transaction " +
transaction.type() + ":" + transaction.amount());
}
processTransaction(transaction);
}
此方法將在第一次處理交易時拋出錯誤,但在後續重試中將正常處理。
現在,讓我們新增監聽器來處理訊息。我們將messageVisibilitySeconds
設為 1,以縮小可見性視窗並加快測試中的重試速度:
@SqsListener(value = "${events.queues.fifo.failure-queue}", messageVisibilitySeconds = "1")
public void retryFailedTransaction(TransactionEvent transactionEvent) {
transactionService.processTransactionWithFailure(transactionEvent.toEntity());
}
最後,讓我們建立一個測試來斷言行為符合預期:
@Test
void givenTransactionProcessingFailure_whenSend_shouldRetryInOrder() {
var accountId = UUID.randomUUID();
var transactions = List.of(createDeposit(accountId, 100.0),
createWithdraw(accountId, 50.0), createDeposit(accountId, 25.0));
var messages = createTransactionMessages(accountId, transactions);
sqsTemplate.sendMany(failureQueue, messages);
await().atMost(Duration.ofSeconds(10))
.until(() -> transactionService.getProcessedTransactionsByAccount(accountId),
isEqual(eventsToEntities(transactions)));
}
在此測試中,我們發送三個事件並斷言它們已按預期順序處理。
當我們運行測試時,我們應該在異常堆疊追蹤中看到類似以下的日誌:
Caused by: java.lang.RuntimeException: Simulated failure for transaction DEPOSIT:100.0
其次是:
TransactionService : Processing transaction: DEPOSIT:100.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
表示該事件在第二次嘗試時已成功處理。
我們應該會在接下來的事件中看到 2 個相似的對:
Caused by: java.lang.RuntimeException: Simulated failure for transaction WITHDRAW:50.0
TransactionService : Processing transaction: WITHDRAW:50.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
Caused by: java.lang.RuntimeException: Simulated failure for transaction DEPOSIT:25.0
TransactionService : Processing transaction: DEPOSIT:25.0 for account 3f684ccb-80e8-4e40-9136-c3b59bdd980b
這表示即使有故障,事件也按照正確的順序處理。
9. 結論
在本文中,我們探討了 Spring Cloud AWS v3 對 FIFO 佇列的支援。我們創建了一個事務處理服務,該服務依賴於按順序處理的事件,並且斷言訊息順序在三種不同的場景中受到尊重:處理單一訊息群組、並行處理多個訊息群組以及在失敗後重試訊息。
我們透過設定本地測試環境並創建即時測試來測試每個場景來斷言我們的邏輯。
與往常一樣,本文中使用的完整程式碼可以 在 GitHub 上找到。