在上一篇文章中,我们学习了 Spring Batch 与数据库的集成(JDBC、JPA、MyBatis),掌握了如何高效读写数据、配置事务和优化性能。Spring Batch 的强大之处不仅在于其内置功能,还在于其高度可扩展性。通过自定义组件、监听器、拦截器和动态配置,你可以根据具体业务需求灵活定制批处理流程。
本文将聚焦以下内容:
- 自定义 ItemReader、ItemProcessor 和 ItemWriter,适配非标准数据源和目标。
- 使用监听器(Listener)和拦截器(ItemStream、StepExecutionListener 等)增强监控和控制。
- 动态配置 Job 和 Step,支持运行时调整流程。
- 通过代码示例和 Mermaid 图表展示扩展与定制的实现。
通过本文,你将学会如何将 Spring Batch 应用于复杂业务场景,打造高度定制化的批处理解决方案。
Spring Batch 的扩展性体现在其模块化设计,允许开发者通过以下方式定制功能:
- 自定义组件:实现 ItemReader、ItemProcessor、ItemWriter 接口,处理特殊数据源(如 API、文件系统)或复杂逻辑。
- 监听器:通过 JobExecutionListener、StepExecutionListener、ChunkListener 等捕获生命周期事件,记录日志或干预执行。
- 拦截器:使用 ItemStream 或自定义拦截器在读/写/处理阶段插入逻辑。
- 动态配置:通过 JobParameters、Spring 表达式或代码动态生成 Job 和 Step,适应变化的需求。
这些机制依赖 Spring 的依赖注入和 AOP,支持无缝集成到现有项目。
以下是用 Mermaid 绘制的 Spring Batch 扩展流程图,展示自定义组件和监听器的协作:
graph TD
A[Job] --> B[Step]
B --> C[Custom ItemReader]
B --> D[Custom ItemProcessor]
B --> E[Custom ItemWriter]
B --> F[ChunkListener]
B --> G[StepExecutionListener]
C -->|读取| H[External Data Source]
D -->|处理| I[Business Logic]
E -->|写入| J[Target System]
C --> K[ItemStream: Open/Close]
E --> L[ItemStream: Open/Close]
A --> M[JobExecutionListener]
A --> N[JobRepository]
N -->|存储元数据| O[Database]
说明:
- 自定义组件(Reader/Processor/Writer)处理非标准数据流。
- 监听器(Chunk/Step/Job)捕获事件,记录或干预。
- 拦截器(ItemStream)管理资源(如文件句柄、连接)。
- JobRepository 记录状态,支持动态调整。
ItemReader 负责读取数据,自定义 Reader 可以适配非标准数据源,如 REST API、Kafka、文件系统等。
示例:从 REST API 读取数据
假设我们需要从外部 API 读取商品数据,每次调用返回一页 JSON。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| package com.example.springbatchdemo.reader;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemReader;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import java.util.List;
public class ApiItemReader implements ItemReader<Product> {
private final RestTemplate restTemplate;
private final String apiUrl;
private int page = 0;
private List<Product> currentPage;
private int currentIndex = 0;
public ApiItemReader(String apiUrl) {
this.restTemplate = new RestTemplate();
this.apiUrl = apiUrl;
}
@Override
public Product read() {
if (currentPage == null || currentIndex >= currentPage.size()) {
// 获取下一页
ResponseEntity<ProductPage> response = restTemplate.getForEntity(
apiUrl + "?page=" + page, ProductPage.class);
if (!response.getStatusCode().is2xxSuccessful() || response.getBody() == null) {
return null; // API 错误或无数据
}
currentPage = response.getBody().getProducts();
if (currentPage.isEmpty()) {
return null; // 没有更多数据
}
currentIndex = 0;
page++;
}
return currentPage.get(currentIndex++);
}
}
|
辅助类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| package com.example.springbatchdemo.reader;
import com.example.springbatchdemo.entity.Product;
import java.util.List;
public class ProductPage {
private List<Product> products;
public List<Product> getProducts() {
return products;
}
public void setProducts(List<Product> products) {
this.products = products;
}
}
|
配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import com.example.springbatchdemo.reader.ApiItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class ApiReaderConfiguration {
@Bean
public ApiItemReader apiReader() {
return new ApiItemReader("http://api.example.com/products");
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step apiStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("apiStep", jobRepository)
.<Product, Product>chunk(100)
.reader(apiReader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Job apiJob(JobRepository jobRepository, Step apiStep) {
return new JobBuilder("apiJob", jobRepository)
.start(apiStep)
.build();
}
}
|
Processor 实现(同前):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
private static final double EXCHANGE_RATE = 0.14;
@Override
public Product process(Product item) {
if (item.getPrice() <= 0) {
return null;
}
item.setPrice(item.getPrice() * EXCHANGE_RATE);
return item;
}
}
|
实体类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| package com.example.springbatchdemo.entity;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;
@Entity
@Data
public class Product {
@Id
private Long id;
private String name;
private Double price;
}
|
说明:
ApiItemReader
每次调用 API 获取一页数据,逐条返回 Product
。- 返回
null
表示数据读取结束。 - 使用
RestTemplate
模拟 HTTP 请求,实际项目可替换为生产级客户端(如 WebClient)。
适用场景:
- 外部 API(如 REST、GraphQL)。
- 消息队列(如 Kafka、RabbitMQ)。
- 非标准文件(如 Excel、XML)。
ItemProcessor 负责数据转换或业务逻辑,自定义 Processor 可以实现复杂处理,如调用外部服务、数据校验等。
示例:调用外部服务校验数据
假设需要调用外部服务校验商品名称是否合规。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| package com.example.springbatchdemo.processor;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
public class ValidationItemProcessor implements ItemProcessor<Product, Product> {
private final RestTemplate restTemplate;
private final String validationApiUrl;
public ValidationItemProcessor(String validationApiUrl) {
this.restTemplate = new RestTemplate();
this.validationApiUrl = validationApiUrl;
}
@Override
public Product process(Product item) {
// 调用外部服务校验名称
ResponseEntity<Boolean> response = restTemplate.getForEntity(
validationApiUrl + "?name=" + item.getName(), Boolean.class);
if (!response.getStatusCode().is2xxSuccessful() || !response.getBody()) {
return null; // 名称不合规,跳过
}
return item;
}
}
|
配置:
1
2
3
4
| @Bean
public ValidationItemProcessor validationProcessor() {
return new ValidationItemProcessor("http://api.example.com/validate");
}
|
说明:
ValidationItemProcessor
调用外部 API 校验名称,返回 null
跳过不合规记录。- 可扩展为复杂逻辑,如格式转换、聚合计算。
适用场景:
- 数据校验(如正则、外部服务)。
- 复杂转换(如多源合并)。
- 业务规则应用。
ItemWriter 负责写入数据,自定义 Writer 可以适配非标准目标,如文件、API、队列等。
示例:写入 Kafka 队列
假设处理后的数据需要发送到 Kafka。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| package com.example.springbatchdemo.writer;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.kafka.core.KafkaTemplate;
public class KafkaItemWriter implements ItemWriter<Product> {
private final KafkaTemplate<String, Product> kafkaTemplate;
private final String topic;
public KafkaItemWriter(KafkaTemplate<String, Product> kafkaTemplate, String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@Override
public void write(Chunk<? extends Product> chunk) {
for (Product item : chunk.getItems()) {
kafkaTemplate.send(topic, String.valueOf(item.getId()), item);
}
}
}
|
添加依赖(pom.xml
):
1
2
3
4
| <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
|
配置:
1
2
3
4
5
6
7
8
9
| @Bean
public KafkaItemWriter kafkaWriter(KafkaTemplate<String, Product> kafkaTemplate) {
return new KafkaItemWriter(kafkaTemplate, "products-topic");
}
@Bean
public KafkaTemplate<String, Product> kafkaTemplate(ProducerFactory<String, Product> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
|
说明:
KafkaItemWriter
将每个 Chunk 的数据发送到 Kafka 主题。- 使用
KafkaTemplate
简化消息发送。 - 事务由 Spring Batch 管理,确保 Chunk 级别一致性。
适用场景:
- 消息队列(如 Kafka、RabbitMQ)。
- 外部系统(如 API、FTP)。
- 自定义文件格式。
监听器(Listener)捕获 Job、Step 或 Chunk 的事件,用于日志记录、监控或干预执行。
示例:记录 Step 执行时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| package com.example.springbatchdemo.listener;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
public class TimingStepListener implements StepExecutionListener {
private long startTime;
@Override
public void beforeStep(StepExecution stepExecution) {
startTime = System.currentTimeMillis();
System.out.println("Step started: " + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
long duration = System.currentTimeMillis() - startTime;
System.out.println("Step ended: " + stepExecution.getStepName() +
", status: " + stepExecution.getStatus() +
", duration: " + duration + "ms");
return stepExecution.getExitStatus();
}
}
|
配置:
1
2
3
4
5
6
7
8
9
10
11
| @Bean
public Step apiStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("apiStep", jobRepository)
.<Product, Product>chunk(100)
.reader(apiReader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.listener(new TimingStepListener())
.build();
}
|
说明:
beforeStep
记录开始时间。afterStep
计算并打印执行时间。
示例:监控 Chunk 进度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| package com.example.springbatchdemo.listener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.listener.ChunkListenerSupport;
public class ProgressChunkListener extends ChunkListenerSupport {
@Override
public void afterChunk(ChunkContext context) {
long writeCount = context.getStepContext().getStepExecution().getWriteCount();
System.out.println("Chunk completed, items written: " + writeCount);
}
@Override
public void afterChunkError(ChunkContext context) {
System.out.println("Chunk failed: " + context.getStepContext().getStepName());
}
}
|
配置:
1
2
3
4
5
6
7
8
9
10
11
12
| @Bean
public Step apiStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("apiStep", jobRepository)
.<Product, Product>chunk(100)
.reader(apiReader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.listener(new TimingStepListener())
.listener(new ProgressChunkListener())
.build();
}
|
适用场景:
- 日志记录(执行时间、进度)。
- 监控和报警(失败通知)。
- 干预执行(如动态调整参数)。
ItemStream 是一个扩展接口,允许在读写操作前后执行初始化或清理逻辑,常用于管理资源(如文件句柄、连接)。
示例:管理 API 连接
扩展 ApiItemReader
支持 ItemStream:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| package com.example.springbatchdemo.reader;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import java.util.List;
public class ApiItemReader implements ItemStreamReader<Product> {
private final RestTemplate restTemplate;
private final String apiUrl;
private int page = 0;
private List<Product> currentPage;
private int currentIndex = 0;
private boolean initialized = false;
public ApiItemReader(String apiUrl) {
this.restTemplate = new RestTemplate();
this.apiUrl = apiUrl;
}
@Override
public Product read() {
if (!initialized) {
throw new IllegalStateException("Reader not initialized");
}
if (currentPage == null || currentIndex >= currentPage.size()) {
ResponseEntity<ProductPage> response = restTemplate.getForEntity(
apiUrl + "?page=" + page, ProductPage.class);
if (!response.getStatusCode().is2xxSuccessful() || response.getBody() == null) {
return null;
}
currentPage = response.getBody().getProducts();
if (currentPage.isEmpty()) {
return null;
}
currentIndex = 0;
page++;
}
return currentPage.get(currentIndex++);
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
page = executionContext.getInt("page", 0);
initialized = true;
System.out.println("Opening API connection, starting at page: " + page);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putInt("page", page);
}
@Override
public void close() throws ItemStreamException {
initialized = false;
System.out.println("Closing API connection");
}
}
|
说明:
open
:初始化连接,从 ExecutionContext 恢复页码。update
:保存当前页码,支持重启。close
:清理资源。ExecutionContext
存储状态,支持作业中断后恢复。
适用场景:
- 管理外部资源(如数据库连接、文件流)。
- 保存读取进度(如页码、偏移量)。
- 支持重启和状态恢复。
动态配置允许在运行时根据参数或条件调整 Job 和 Step,例如根据输入选择数据源或处理逻辑。
示例:根据参数选择 Reader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import com.example.springbatchdemo.reader.ApiItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.Map;
@Configuration
public class DynamicConfiguration {
@Bean
@StepScope
public ItemReader<Product> dynamicReader(@Value("#{jobParameters['source']}") String source,
DataSource dataSource) {
if ("api".equals(source)) {
return new ApiItemReader("http://api.example.com/products");
} else {
return new JdbcPagingItemReaderBuilder<Product>()
.name("jdbcReader")
.dataSource(dataSource)
.selectClause("SELECT id, name, price")
.fromClause("FROM source_product")
.sortKeys(Map.of("id", Order.ASCENDING))
.pageSize(1000)
.beanRowMapper(Product.class)
.build();
}
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step dynamicStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("dynamicStep", jobRepository)
.<Product, Product>chunk(100)
.reader(dynamicReader(null, dataSource))
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Job dynamicJob(JobRepository jobRepository, Step dynamicStep) {
return new JobBuilder("dynamicJob", jobRepository)
.start(dynamicStep)
.build();
}
}
|
运行:
1
2
3
4
5
| JobParameters jobParameters = new JobParametersBuilder()
.addString("source", "api") // 或 "db"
.addLong("runTime", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(dynamicJob, jobParameters);
|
说明:
@StepScope
确保 Reader 在运行时根据 JobParameters 创建。- 根据
source
参数选择 ApiItemReader
或 JdbcPagingItemReader
。 - 支持动态切换数据源。
示例:根据文件数量生成 Step
假设有多个 CSV 文件,动态创建 Step 处理每个文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
| package com.example.springbatchdemo.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;
import java.io.File;
@Configuration
public class DynamicStepConfiguration {
@Bean
public Step fileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
String filePath) {
return new StepBuilder("fileStep-" + filePath, jobRepository)
.<Product, Product>chunk(100)
.reader(new FlatFileItemReaderBuilder<Product>()
.name("fileReader")
.resource(new FileSystemResource(filePath))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build())
.processor(new ProductItemProcessor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Job dynamicFileJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
JobBuilder jobBuilder = new JobBuilder("dynamicFileJob", jobRepository);
File[] files = new File("/path/to/files").listFiles((dir, name) -> name.endsWith(".csv"));
if (files == null) {
return jobBuilder.start(new StepBuilder("emptyStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
System.out.println("No files to process");
return RepeatStatus.FINISHED;
})
.transactionManager(transactionManager)
.build())
.build();
}
FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("dynamicFlow");
for (File file : files) {
Step step = fileStep(jobRepository, transactionManager, file.getAbsolutePath());
flowBuilder.start(step);
}
return jobBuilder.start(flowBuilder.build()).end().build();
}
}
|
说明:
- 动态扫描 CSV 文件,生成对应的 Step。
- 使用
FlowBuilder
构建动态流程。 - 支持运行时扩展(如新文件自动处理)。
适用场景:
- 动态数据源(如多文件、多表)。
- 条件流程(如根据参数选择 Step)。
- 可变业务逻辑。
以下是一个综合示例,结合自定义 Reader、Processor、Writer、Listener 和动态配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
| @Configuration
public class CustomBatchConfiguration {
@Bean
@StepScope
public ApiItemReader apiReader(@Value("#{jobParameters['apiUrl']}") String apiUrl) {
return new ApiItemReader(apiUrl);
}
@Bean
public ValidationItemProcessor validationProcessor() {
return new ValidationItemProcessor("http://api.example.com/validate");
}
@Bean
public KafkaItemWriter kafkaWriter(KafkaTemplate<String, Product> kafkaTemplate) {
return new KafkaItemWriter(kafkaTemplate, "products-topic");
}
@Bean
public TimingStepListener stepListener() {
return new TimingStepListener();
}
@Bean
public ProgressChunkListener chunkListener() {
return new ProgressChunkListener();
}
@Bean
public Step customStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("customStep", jobRepository)
.<Product, Product>chunk(100)
.reader(apiReader(null))
.processor(validationProcessor())
.writer(kafkaWriter(kafkaTemplate(null)))
.transactionManager(transactionManager)
.listener(stepListener())
.listener(chunkListener())
.build();
}
@Bean
public Job customJob(JobRepository jobRepository, Step customStep) {
return new JobBuilder("customJob", jobRepository)
.start(customStep)
.build();
}
}
|
运行:
1
2
3
4
5
| JobParameters jobParameters = new JobParametersBuilder()
.addString("apiUrl", "http://api.example.com/products")
.addLong("runTime", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(customJob, jobParameters);
|
说明:
- Reader:从动态 API URL 读取数据。
- Processor:校验数据合规性。
- Writer:发送到 Kafka。
- Listener:记录时间和进度。
- 动态参数支持灵活配置。
流程图:
graph TD
A[Job: customJob] --> B[Step: customStep]
B --> C[ApiItemReader]
B --> D[ValidationItemProcessor]
B --> E[KafkaItemWriter]
B --> F[TimingStepListener]
B --> G[ProgressChunkListener]
C -->|读取| H[External API]
D -->|校验| I[Validation Service]
E -->|写入| J[Kafka Topic]
A --> K[JobRepository]
K -->|存储元数据| L[Database]
自定义组件:
- 保持单一职责(如 Reader 只负责读取)。
- 实现
ItemStream
支持状态管理和重启。 - 测试线程安全,适应多线程场景。
监听器:
- 使用 Listener 记录关键事件(如失败、进度)。
- 避免复杂逻辑,防止性能瓶颈。
- 结合监控工具(如 Prometheus)分析数据。
动态配置:
- 使用
@StepScope
注入 JobParameters。 - 设计灵活的流程,支持扩展。
- 测试动态场景,确保稳定性。
扩展性:
- 优先扩展现有组件(如装饰者模式)。
- 使用 Spring 依赖注入管理自定义组件。
- 编写单元测试验证逻辑。
Q:自定义 Reader 如何支持重启?
A:实现 ItemStream
,在 update
方法保存进度(如偏移量),在 open
方法恢复。
Q:监听器影响性能怎么办?
A:保持 Listener 轻量,异步记录日志(如使用队列),避免阻塞主线程。
Q:动态 Job 如何管理复杂流程?
A:使用 FlowBuilder
和 JobExecutionDecider
定义条件流,结合参数动态调整。
本文详细讲解了 Spring Batch 的扩展与定制,包括自定义组件、监听器、拦截器和动态配置。通过示例和 Mermaid 图表,你学会了如何适配复杂业务场景。下一篇文章将聚焦 Spring Batch 生产实践,内容包括:
- 部署和管理 Job(Spring Boot、容器化)。
- 监控和报警(JMX、Prometheus)。
- 常见生产问题与解决方案。