在 Spring Batch 中,失敗時重新啟動作業並繼續
1. 簡介
Spring Batch 提供了強大的失敗作業重新啟動機制。這些機制允許作業從故障點恢復處理。此功能對於高效處理大規模資料處理任務至關重要。
Spring Batch 內建的JobRepository
會持久保存作業的執行狀態。這允許作業預設具有可重啟性。因此,失敗的作業可以從中斷處精確恢復。這可確保不會發生重複處理或資料遺失。
在本教程中,我們將研究如何有效地配置和重新啟動失敗的 Spring Batch 作業。
2. Maven依賴項
讓我們先將spring-boot-starter-batch
、 [spring-boot-starter-data-jpa](https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-jpa)
和[h2](https://mvnrepository.com/artifact/com.h2database/h2)
依賴項匯入到我們的pom.xml
中:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
我們需要基於檔案的 H2 資料庫來透過在應用程式運行期間保留作業執行狀態來實現作業重新啟動能力。
3. 定義一個簡單的 Spring Batch 作業
在本節中,我們將探索一個 Spring Batch 作業配置,該配置示範了一個簡單的批次工作流程。我們將定義一個只有一個步驟的作業:處理一個 CSV 檔案。
在 Spring Boot 3 中,我們應該避免使用@EnableBatchProcessing
,因為它會停用 Spring Boot 的有用的自動配置(例如建立 Spring Batch 表)。
3.1. 配置
讓我們建立BatchConfig
類,
它設定一個名為simpleJob
的作業:
@Configuration
public class BatchConfig {
@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("simpleJob", jobRepository)
.start(step1(jobRepository, transactionManager))
.build();
}
}
simpleJob
bean 使用JobBuilder
定義批次作業。作業包含一個步驟: step1
,用於讀取、處理和寫入 CSV 檔案:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(flatFileItemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
Step 1
使用StepBuilder
定義了一個基於區塊的步驟。此步驟每次處理兩個項目的數據,從FlatFileItemReader
讀取字串,使用ItemProcessor
進行轉換,最後使用ItemWriter
寫入結果-所有操作均在PlatformTransactionManager
控制的事務中進行管理,以確保資料完整性。
JobRepository
保留步驟的執行狀態,包括FlatFileItemReader
的位置,以便在作業失敗時從最後一個未提交的區塊重新啟動。
3.2. 物品閱讀器
讓我們定義提供輸入資料的flatFileItemReader
bean:
@Bean
@StepScope
public FlatFileItemReader<String> flatFileItemReader() {
return new FlatFileItemReaderBuilder<String>()
.name("itemReader")
.resource(new ClassPathResource("data.csv"))
.lineMapper(new PassThroughLineMapper())
.saveState(true)
.build();
}
程式碼定義了一個名為flatFileItemReader
的FlatFileItemReader<String>
bean,並使用@StepScope
進行註釋,以確保每次執行步驟時都會建立一個新實例,從而實現正確的狀態管理和可重新啟動性。它從位於類別路徑下的data.csv
檔案中讀取字串,並使用PassThroughLineMapper
將每一行對應到一個字串。
此外,它還具有saveState(true)
方法來將其讀取位置持久化到ExecutionContext
中。這樣,如果作業失敗,讀取器就可以從最後一行未處理的行繼續執行,並利用JobRepository
進行狀態持久化。
為了讓作業真正可重啟, ItemReader
必須將其狀態持久化到 Spring Batch 的執行上下文中。像[FlatFileItemReader](https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/file/FlatFileItemReader.html)
這樣的讀取器會自動在區塊之間保存關鍵的進度資訊(例如行號或記錄數)。
3.3. 專案處理器
讓我們聲明轉換輸入資料的itemProcessor
bean:
@Bean
public RestartItemProcessor itemProcessor() {
return new RestartItemProcessor();
}
static class RestartItemProcessor implements ItemProcessor<String, String> {
private boolean failOnItem3 = true;
public void setFailOnItem3(boolean failOnItem3) {
this.failOnItem3 = failOnItem3;
}
@Override
public String process(String item) throws Exception {
System.out.println("Processing: " + item + " (failOnItem3=" + failOnItem3 + ")");
if (failOnItem3 && item.equals("Item3")) {
throw new RuntimeException("Simulated failure on Item3");
}
return "PROCESSED " + item;
}
}
它透過在每個項目前加上PROCESSED
來處理每個項目,並模擬Item3.
3.4. 條目寫入器
現在,讓我們建立輸出處理後的資料的itemWriter
bean:
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
System.out.println("Writing items:");
for (String item : items) {
System.out.println("- " + item);
}
};
}
這會將處理過的項目列印到控制台。現在,我們的應用程式已經準備就緒。
4. 重新啟動失敗的 Spring Batch 作業
Spring Batch 作業預設設計為可重啟,允許它們從故障點無縫恢復。因此,無需額外配置即可啟用此功能。
但是,為了使其有效運作,作業狀態必須持久保存在JobRepository
中。此外, JobRepository
必須由資料庫支持,以確保可靠地儲存和擷取作業的執行狀態。
以下小節介紹如何模擬作業失敗並重新啟動它。
4.1. 模擬作業失敗
為了模擬作業失敗, ItemProcessor
配置為在處理Item3
時拋出RuntimeException
。當Item3
發生故障時, JobRepository
會儲存此狀態,並將該作業標記為FAILED
。
使用mvn spring-boot:run
執行應用程式會產生以下內容:
Starting new job execution...
Processing: Item1
Processing: Item2
Writing items:
- PROCESSED Item1
- PROCESSED Item2
Processing: Item3
[Exception: Simulated failure on Item3]
Job started with status: FAILED
此輸出確認Item1
和Item2
已被處理和寫入,但Item3
上的故障導致作業停止,並且狀態將持續到隨後的重新啟動。
4.2. 重新啟動作業
要重新啟動失敗的作業,我們使用CommandLineRunner
透過JobExplorer
和固定的JobParameters
來偵測失敗的作業實例:
@Bean
CommandLineRunner run(JobLauncher jobLauncher, Job job, JobExplorer jobExplorer,
JobOperator jobOperator, BatchConfig.RestartItemProcessor itemProcessor) {
return args -> {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", "test-job-" + System.currentTimeMillis())
.toJobParameters();
List<JobInstance> instances = jobExplorer.getJobInstances("simpleJob", 0, 1);
if (!instances.isEmpty()) {
JobInstance lastInstance = instances.get(0);
List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
if (!executions.isEmpty()) {
JobExecution lastExecution = executions.get(0);
if (lastExecution.getStatus() == BatchStatus.FAILED) {
itemProcessor.setFailOnItem3(false);
JobExecution restartedExecution = jobLauncher.run(job, jobParameters);
// final Long restartId = jobOperator.restart(lastExecution.getId());
// final JobExecution restartedExecution = jobExplorer.getJobExecution(restartedExecution);
// ...
}
}
}
};
}
程式碼透過JobExplorer
檢查作業儲存庫中是否存在simpleJob
實例。
它偵測到狀態為FAILED
的失敗執行。它會使用JobLauncher.run()
或JobOperator.restart()
來還原該特定作業。該作業將從其上次持久化的狀態恢復。這確保了先前處理過的項目不會被重新處理。
為了忽略重啟期間Item3
的失敗,我們在啟動重啟的作業之前設定了itemProcessor.setFailOnItem3(false)
,從而允許RestartItemProcessor
處理Item3
而不會引發異常。
現在,我們再次運行該應用程式。
讓我們檢查一下輸出:
Restarting failed job execution with ID: [execution_id]
Processing: Item3
Processing: Item4
Writing items:
- PROCESSED Item3
- PROCESSED Item4
Processing: Item5
Writing items:
- PROCESSED Item5
Restarted job status: COMPLETED
Spring Batch 作業在Item3
上失敗了。它成功地從故障點重新啟動。然後,作業以區塊的形式處理了Item3
到Item5
,寫入了結果,並以COMPLETED
狀態完成。
4.3. 測驗作業重啟
讓我們新增一個單元測試來驗證 Spring Batch 的作業重新啟動功能,而不是使用CommandLineRunner
:
@Test
public void givenItems_whenFailed_thenRestartFromFailure() throws Exception {
// Given
createTestFile("Item1\nItem2\nItem3\nItem4");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
// When
JobExecution firstExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals(BatchStatus.FAILED, firstExecution.getStatus());
Long executionId = firstExecution.getId();
itemProcessor.setFailOnItem3(false);
// Then
JobExecution restartedExecution = jobLauncherTestUtils.launchJob(jobParameters);
assertEquals(BatchStatus.COMPLETED, restartedExecution.getStatus());
assertEquals(
firstExecution.getJobInstance().getInstanceId(),
restartedExecution.getJobInstance().getInstanceId()
);
}
此測驗方法首先執行一個作業,該作業在處理Item3
時故意失敗(斷言預期的FAILED
狀態)。然後,它修改處理器行為,使其不再在該專案上失敗。最後,它使用相同的參數重新啟動該作業,以確認它能夠從失敗點成功完成。
該測試驗證了三個關鍵方面。首先,它確認初始故障是否如預期發生,從而測試故障模擬。其次,它確保重新啟動的作業從故障點繼續處理,從而驗證重新啟動邏輯。最後,它檢查兩次執行是否屬於同一個作業實例,從而驗證實例追蹤。
4.4. 防止作業重啟
我們可以使用preventRestart()
方法阻止重新啟動作業:
@Bean
public Job simpleJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("simpleJob", jobRepository)
.start(step1(jobRepository, transactionManager))
.preventRestart()
.build();
}
在JobBuilder
中新增.preventRestart()
方法,可以將作業配置為始終從頭開始(例如, Item1
),而不是在重新啟動時從失敗點(例如, Item3
)繼續執行。這將覆寫 Spring Batch 的預設行為,即為了方便重啟,將作業狀態持久保存在JobRepository
中。
5. 結論
Spring Batch 的預設可重新啟動性支援從作業失敗中進行穩健恢復,確保失敗的作業可以從故障點恢復,而無需重新處理已完成的專案或遺失資料。
在本文中,我們建立了一個簡單的作業來示範此重啟功能。我們配置了一個作業來分塊處理項目,模擬Item3
發生故障。重新啟動後,作業從Item3
恢復,處理完Item5
後成功完成,狀態為COMPLETED
。
我們也探討如何使用JobBuilder
上的preventRestart()
來覆蓋此行為。這會強製作業從頭開始,例如Item1
,而不是從失敗點恢復。
與往常一樣,原始碼可在 GitHub 上取得。