在上一篇文章中,我们学习了 Spring Batch 的配置方式(Java 和 XML)以及调度机制(Spring Scheduler、Quartz、手动触发),掌握了如何定义和运行作业。在实际生产环境中,批处理任务难免会遇到异常,如数据格式错误、数据库连接失败或外部服务不可用。Spring Batch 提供了强大的错误处理机制,包括跳过(Skip)、重试(Retry)、重启(Restart)和监听器(Listener),确保作业在异常情况下依然可靠运行。
本文将聚焦以下内容:
- 跳过(Skip):忽略无效记录,继续处理后续数据。
- 重试(Retry):自动重试失败的操作,如网络超时。
- 重启(Restart):恢复中断的作业,从上次失败点继续执行。
- 监听器(Listener):捕获和记录错误信息,自定义错误处理逻辑。
- 通过代码示例和 Mermaid 图表展示错误处理流程。
通过本文,你将学会如何配置 Spring Batch 的错误处理机制,提升作业的健壮性和可维护性。
Spring Batch 的错误处理机制旨在平衡任务的可靠性与性能,主要包括以下功能:
- Skip:当某些记录导致异常时,跳过这些记录,继续处理后续数据。适合处理数据格式错误等非致命异常。
- Retry:当操作失败时(如网络问题),自动重试指定次数。适合处理临时性错误。
- Restart:允许从上次失败的点恢复作业,依赖 JobRepository 存储的状态。
- Listener:通过监听器捕获 Job 或 Step 的生命周期事件,记录错误或执行自定义逻辑。
这些机制可以通过配置或编程方式实现,Spring Batch 提供了灵活的 API 支持。
以下是用 Mermaid 绘制的 Spring Batch 错误处理流程图,展示异常发生时的处理逻辑:
graph TD
A[Start Chunk] --> B[ItemReader: Read Item]
B --> C[ItemProcessor: Process Item]
C --> D[ItemWriter: Write Chunk]
D -->|异常| E{Retry Configured?}
E -->|是| F[Retry Operation]
F -->|成功| G[Commit Transaction]
F -->|失败| H{Skip Configured?}
E -->|否| H
H -->|是| I[Skip Item]
I --> J[Log Skip]
J --> K{More Items?}
H -->|否| L[Fail Job]
L --> M[Save State to JobRepository]
K -->|是| B
K -->|否| G
G --> N[End Chunk]
M --> O[Restart Possible]
说明:
- 异常发生时,先检查是否配置了 Retry,如果有则重试。
- 重试失败或无重试配置时,检查是否配置了 Skip,如果是则跳过记录并记录日志。
- 如果既无 Retry 也无 Skip,作业失败,状态保存到 JobRepository。
- 保存的状态支持后续 Restart。
Skip 允许 Spring Batch 在遇到特定异常时跳过当前记录,继续处理后续数据。适用于数据质量问题(如格式错误、空值),避免因单个记录失败导致整个作业终止。
通过 StepBuilder
的 .faultTolerant()
和 .skip()
方法配置跳过策略。
示例:跳过格式错误的记录
假设 CSV 文件 products.csv
包含以下数据:
1
2
3
4
5
| id,name,price
1,Laptop,10000
2,Phone,invalid
3,Headphones,-100
4,Tablet,8000
|
如果 price
字段无法解析为数字(如 "invalid"),我们希望跳过该记录。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class BatchConfiguration {
@Bean
public FlatFileItemReader<Product> reader() {
FlatFileItemReader<Product> reader = new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource("products.csv"))
.targetType(Product.class)
.build();
// 自定义 LineMapper 处理格式错误
DefaultLineMapper<Product> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "price");
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSet -> {
Product product = new Product();
product.setId(fieldSet.readLong("id"));
product.setName(fieldSet.readString("name"));
try {
product.setPrice(fieldSet.readDouble("price"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid price format: " + fieldSet.readString("price"));
}
return product;
});
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step importStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("importStep", jobRepository)
.<Product, Product>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.faultTolerant()
.skip(IllegalArgumentException.class) // 跳过价格格式错误
.skipLimit(10) // 最多跳过 10 条记录
.build();
}
@Bean
public Job importProductsJob(JobRepository jobRepository, Step importStep) {
return new JobBuilder("importProductsJob", jobRepository)
.start(importStep)
.build();
}
}
|
Processor 实现(与之前一致):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
private static final double EXCHANGE_RATE = 0.14;
@Override
public Product process(Product item) {
if (item.getPrice() <= 0) {
return null; // 过滤负价格
}
item.setPrice(item.getPrice() * EXCHANGE_RATE);
return item;
}
}
|
实体类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| package com.example.springbatchdemo.entity;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;
@Entity
@Data
public class Product {
@Id
private Long id;
private String name;
private Double price;
}
|
说明:
reader
自定义 LineMapper
,在解析 price
时抛出 IllegalArgumentException
表示格式错误。.faultTolerant()
启用容错模式。.skip(IllegalArgumentException.class)
指定跳过的异常类型。.skipLimit(10)
限制最多跳过 10 条记录,超出后作业失败。
运行结果:
- 第二条记录(price="invalid")抛出
IllegalArgumentException
,被跳过。 - 第三条记录(price=-100)在 Processor 中被过滤(返回
null
)。 - 数据库最终包含第一条和第四条记录(价格转换为美元)。
日志输出(示例):
Skipped item due to: java.lang.IllegalArgumentException: Invalid price format: invalid
为了记录跳过的记录,可以使用 SkipListener
捕获跳过事件。
示例:记录跳过记录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.SkipListener;
import org.springframework.stereotype.Component;
@Component
public class ProductSkipListener implements SkipListener<Product, Product> {
@Override
public void onSkipInRead(Throwable t) {
System.out.println("Skipped in read: " + t.getMessage());
}
@Override
public void onSkipInProcess(Product item, Throwable t) {
System.out.println("Skipped in process: " + item + ", error: " + t.getMessage());
}
@Override
public void onSkipInWrite(Product item, Throwable t) {
System.out.println("Skipped in write: " + item + ", error: " + t.getMessage());
}
}
|
修改 Step 配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| @Bean
public Step importStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
ProductSkipListener skipListener) {
return new StepBuilder("importStep", jobRepository)
.<Product, Product>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(10)
.listener(skipListener)
.build();
}
|
说明:
SkipListener
分别捕获读取、处理和写入阶段的跳过事件。- 可以将跳过记录写入日志文件、数据库或发送通知。
最佳实践:
- 限制
skipLimit
,避免无限跳过导致数据丢失。 - 使用
SkipListener
记录跳过详情,便于后续分析。 - 仅跳过非致命异常(如数据格式错误),致命异常(如数据库连接失败)应触发作业失败。
Retry 允许 Spring Batch 在遇到特定异常时自动重试操作,适合处理临时性错误(如网络超时、锁竞争)。Spring Batch 使用 Spring Retry 库提供重试功能。
通过 .retry()
和 .retryLimit()
配置重试策略。
示例:重试数据库写入失败
假设写入数据库时可能因锁竞争抛出 DataAccessException
,我们希望重试 3 次。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| @Bean
public Step importStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
ProductSkipListener skipListener) {
return new StepBuilder("importStep", jobRepository)
.<Product, Product>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(10)
.retry(org.springframework.dao.DataAccessException.class) // 重试数据库异常
.retryLimit(3) // 最多重试 3 次
.listener(skipListener)
.build();
}
|
说明:
.retry(DataAccessException.class)
指定重试的异常类型。.retryLimit(3)
设置最多重试 3 次。- 如果重试仍失败,触发 Skip(如果配置了)或作业失败。
日志输出(示例):
Retrying due to: org.springframework.dao.DataAccessException: ...
Retry attempt 1 of 3
对于更复杂的重试需求,可以使用 Spring Retry 的 @Retryable
注解。
示例:重试 Processor 中的外部服务调用
假设 Processor 需要调用外部 API 转换价格,可能因网络问题失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
@Component
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000))
public double convertPrice(double price) {
// 模拟外部服务调用,可能抛出异常
if (Math.random() > 0.7) {
throw new RuntimeException("Temporary service failure");
}
return price * 0.14;
}
@Override
public Product process(Product item) {
if (item.getPrice() <= 0) {
return null;
}
item.setPrice(convertPrice(item.getPrice()));
return item;
}
}
|
启用 Retry:
在主应用类添加 @EnableRetry
:
1
2
3
4
5
6
7
8
9
10
11
12
13
| package com.example.springbatchdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;
@SpringBootApplication
@EnableRetry
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}
|
说明:
@Retryable
指定重试逻辑,maxAttempts=3
表示最多重试 3 次。@Backoff(delay = 1000)
设置重试间隔 1 秒。- 如果重试失败,抛出异常,触发 Skip 或作业失败。
最佳实践:
- 仅对临时性异常(如网络问题)启用 Retry,永久性错误(如数据格式错误)应跳过。
- 设置合理的重试间隔(如指数退避),避免过载。
- 记录重试日志,便于调试。
Restart 允许从上次失败的点恢复作业,依赖 JobRepository 存储的元数据。Spring Batch 默认支持重启,前提是 Job 是 restartable(默认启用)。
确保 JobRepository 使用持久化数据库(如 H2、MySQL),内存数据库不支持重启。
示例:模拟作业失败并重启
修改 Processor 模拟随机失败:
1
2
3
4
5
6
7
8
9
10
11
| @Override
public Product process(Product item) {
if (item.getPrice() <= 0) {
return null;
}
if (item.getId() == 4 && Math.random() > 0.5) {
throw new RuntimeException("Simulated failure on item 4");
}
item.setPrice(item.getPrice() * 0.14);
return item;
}
|
运行与重启:
第一次运行:
1
2
3
4
5
| JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFile", "products.csv")
.addLong("runTime", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importProductsJob, jobParameters);
|
如果 ID=4 的记录触发异常,作业失败,状态保存到 JobRepository。
重启作业:
使用相同的 JobParameters 重启:
1
| jobLauncher.run(importProductsJob, jobParameters);
|
Spring Batch 从上次失败的 Chunk 继续执行。
说明:
- JobRepository 记录已处理的 Chunk 和记录,Restart 从上次提交的 Chunk 开始。
- 如果 Job 已完成(COMPLETED),无法重启,除非使用新的 JobParameters。
配置非重启作业:
1
2
3
4
5
6
7
| @Bean
public Job importProductsJob(JobRepository jobRepository, Step importStep) {
return new JobBuilder("importProductsJob", jobRepository)
.start(importStep)
.preventRestart() // 禁用重启
.build();
}
|
最佳实践:
- 使用持久化数据库(如 MySQL、PostgreSQL)存储 JobRepository。
- 确保 JobParameters 一致,避免创建新 JobInstance。
- 测试重启场景,确保数据一致性。
Listener 允许捕获 Job 或 Step 的生命周期事件(如开始、结束、错误),用于记录日志、发送通知或执行清理逻辑。
StepListener
是一个接口,常用子接口包括:
ItemReadListener
:监听读取事件。ItemProcessListener
:监听处理事件。ItemWriteListener
:监听写入事件。ChunkListener
:监听 Chunk 生命周期。SkipListener
:监听跳过事件(已展示)。
示例:记录 Chunk 完成时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| package com.example.springbatchdemo.config;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.stereotype.Component;
@Component
public class ProductChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
System.out.println("Before chunk: " + context.getStepContext().getStepName());
}
@Override
public void afterChunk(ChunkContext context) {
System.out.println("After chunk: " + context.getStepContext().getStepName() +
", items processed: " + context.getStepContext().getWriteCount());
}
@Override
public void afterChunkError(ChunkContext context) {
System.out.println("Chunk error: " + context.getStepContext().getStepName());
}
}
|
修改 Step 配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| @Bean
public Step importStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
ProductSkipListener skipListener, ProductChunkListener chunkListener) {
return new StepBuilder("importStep", jobRepository)
.<Product, Product>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(10)
.retry(org.springframework.dao.DataAccessException.class)
.retryLimit(3)
.listener(skipListener)
.listener(chunkListener)
.build();
}
|
日志输出(示例):
Before chunk: importStep
After chunk: importStep, items processed: 10
JobExecutionListener
捕获 Job 的开始和结束事件。
示例:记录 Job 执行时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| package com.example.springbatchdemo.config;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
@Component
public class ProductJobListener implements JobExecutionListener {
private long startTime;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
System.out.println("Job started: " + jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
long duration = System.currentTimeMillis() - startTime;
System.out.println("Job ended: " + jobExecution.getJobInstance().getJobName() +
", status: " + jobExecution.getStatus() +
", duration: " + duration + "ms");
}
}
|
修改 Job 配置:
1
2
3
4
5
6
7
8
| @Bean
public Job importProductsJob(JobRepository jobRepository, Step importStep,
ProductJobListener jobListener) {
return new JobBuilder("importProductsJob", jobRepository)
.start(importStep)
.listener(jobListener)
.build();
}
|
日志输出(示例):
Job started: importProductsJob
Job ended: importProductsJob, status: COMPLETED, duration: 1250ms
最佳实践:
- 使用 Listener 记录详细日志,便于监控和调试。
- 避免在 Listener 中执行复杂逻辑,防止影响性能。
- 结合监控工具(如 Prometheus、Grafana)收集 Listener 数据。
以下是一个综合示例,展示 Skip、Retry 和 Listener 的协同工作:
- 场景:从 CSV 读取商品数据,跳过格式错误,写入数据库时重试,重启失败的作业,记录所有错误。
完整配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
| @Configuration
public class BatchConfiguration {
@Bean
public FlatFileItemReader<Product> reader() {
FlatFileItemReader<Product> reader = new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource("products.csv"))
.targetType(Product.class)
.build();
DefaultLineMapper<Product> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("id", "name", "price");
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSet -> {
Product product = new Product();
product.setId(fieldSet.readLong("id"));
product.setName(fieldSet.readString("name"));
try {
product.setPrice(fieldSet.readDouble("price"));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid price format: " + fieldSet.readString("price"));
}
return product;
});
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step importStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
ProductSkipListener skipListener, ProductChunkListener chunkListener) {
return new StepBuilder("importStep", jobRepository)
.<Product, Product>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.faultTolerant()
.skip(IllegalArgumentException.class)
.skipLimit(10)
.retry(org.springframework.dao.DataAccessException.class)
.retryLimit(3)
.listener(skipListener)
.listener(chunkListener)
.build();
}
@Bean
public Job importProductsJob(JobRepository jobRepository, Step importStep,
ProductJobListener jobListener) {
return new JobBuilder("importProductsJob", jobRepository)
.start(importStep)
.listener(jobListener)
.build();
}
}
|
运行场景:
- 格式错误的记录(如 "invalid")被跳过,触发
SkipListener
。 - 数据库写入失败(如锁竞争)触发 3 次重试,重试失败后作业失败。
- 作业失败后,使用相同 JobParameters 重启,从上次失败的 Chunk 继续。
JobListener
和 ChunkListener
记录执行时间和 Chunk 状态。
Skip:
- 仅跳过数据相关的非致命异常。
- 设置合理的
skipLimit
,避免忽略过多错误。 - 使用
SkipListener
记录跳过详情。
Retry:
- 针对临时性异常(如网络、数据库锁)配置 Retry。
- 设置重试间隔和次数,防止无限循环。
- 结合监控工具分析重试频率。
Restart:
- 使用持久化 JobRepository(如 MySQL)。
- 保留 JobParameters,确保重启一致性。
- 测试重启场景,验证数据完整性。
Listener:
- 记录关键事件(如跳过、重试、失败),便于审计。
- 避免复杂逻辑,确保性能。
Q:Skip 和返回 null 的区别?
A:返回 null
在 Processor 中表示过滤记录,不计入 Skip 统计;Skip 处理异常记录,计入 skipLimit
。
Q:如何调试 Retry 失败的原因?
A:启用详细日志(如 spring.batch.*=DEBUG
),结合 RetryListener
记录重试详情。
Q:重启失败时如何处理?
A:检查 JobParameters 是否一致,验证 JobRepository 数据完整性,可能需要清理元数据。
本文详细讲解了 Spring Batch 的错误处理机制,包括 Skip、Retry、Restart 和 Listener 的配置与应用。通过示例和 Mermaid 图表,你学会了如何提升作业的健壮性。下一篇文章将聚焦 并行处理与性能优化,内容包括:
- 配置多线程 Step。
- 实现分区(Partitioning)处理。
- 性能调优技巧(如 Chunk 大小、缓冲区)。