在上一篇文章中,我们详细探讨了 Spring Batch 的核心组件(Job、Step、Chunk、ItemReader、ItemProcessor、ItemWriter),并通过示例展示了它们的协作方式。掌握了这些组件后,接下来需要了解如何灵活配置 Spring Batch 作业,并通过调度机制控制作业的执行时机。本文将聚焦以下内容:
- Spring Batch 的配置方式:XML 配置和 Java 配置的对比与实现。
- JobParameters 的定义和使用,用于动态传递运行时参数。
- 调度 Spring Batch 作业:使用 Spring Scheduler、Quartz 或手动触发。
- 通过代码示例和 Mermaid 图表展示配置和调度的完整流程。
通过本文,你将学会如何根据项目需求配置 Spring Batch 作业,并实现定时或手动触发,为生产环境部署奠定基础。
Spring Batch 支持两种主要配置方式:XML 配置 和 Java 配置。Java 配置因其类型安全和现代化特性在 Spring Boot 项目中更常见,但 XML 配置在遗留系统或特定场景中仍有使用价值。以下分别介绍这两种方式。
Java 配置使用 Spring 的 @Configuration
注解和流式 API(如 JobBuilder
、StepBuilder
)定义 Job 和 Step。上一篇文章的示例已展示了 Java 配置,这里回顾并扩展一个更复杂的配置。
示例:Java 配置多 Step 作业
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class BatchConfiguration {
@Bean
public FlatFileItemReader<Product> reader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build();
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step importStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("importStep", jobRepository)
.<Product, Product>chunk(10)
.reader(reader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Step logStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("logStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
System.out.println("Job completed successfully!");
return RepeatStatus.FINISHED;
})
.transactionManager(transactionManager)
.build();
}
@Bean
public Job importProductsJob(JobRepository jobRepository, Step importStep, Step logStep) {
return new JobBuilder("importProductsJob", jobRepository)
.start(importStep)
.next(logStep)
.build();
}
}
|
Processor 实现(为完整性重复):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
private static final double EXCHANGE_RATE = 0.14;
@Override
public Product process(Product item) {
if (item.getPrice() <= 0) {
return null;
}
item.setPrice(item.getPrice() * EXCHANGE_RATE);
return item;
}
}
|
说明:
- 使用
@Bean
定义 Reader、Processor、Writer、Step 和 Job。 JobBuilder
和 StepBuilder
提供流式 API,清晰定义作业结构。- 支持条件流(如
.on("COMPLETED").to(nextStep)
),后续文章会深入。
优点:
- 类型安全,编译期检查错误。
- 与 Spring Boot 集成紧密,易于调试。
- 代码清晰,适合现代开发。
XML 配置使用 Spring 的 XML 配置文件定义 Job 和 Step,常见于早期 Spring 项目。以下是将上述 Java 配置转换为 XML 的等效实现。
示例:XML 配置
创建 batch-config.xml
(放置在 src/main/resources
):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd">
<!-- ItemReader -->
<bean id="productReader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="classpath:products.csv"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="id,name,price"/>
</bean>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="targetType" value="com.example.springbatchdemo.entity.Product"/>
</bean>
</property>
</bean>
</property>
</bean>
<!-- ItemProcessor -->
<bean id="productProcessor" class="com.example.springbatchdemo.config.ProductItemProcessor"/>
<!-- ItemWriter -->
<bean id="productWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="INSERT INTO product (id, name, price) VALUES (:id, :name, :price)"/>
<property name="itemPreparedStatementSetter">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
</property>
</bean>
<!-- Step -->
<batch:step id="importStep">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="10"/>
</batch:tasklet>
</batch:step>
<!-- Tasklet Step -->
<batch:step id="logStep">
<batch:tasklet transaction-manager="transactionManager">
<bean class="com.example.springbatchdemo.config.LogTasklet"/>
</batch:tasklet>
</batch:step>
<!-- Job -->
<batch:job id="importProductsJob" job-repository="jobRepository">
<batch:step id="step1" next="step2">
<batch:tasklet ref="importStep"/>
</batch:step>
<batch:step id="step2">
<batch:tasklet ref="logStep"/>
</batch:step>
</batch:job>
<!-- Tasklet 实现 -->
<bean id="logTasklet" class="com.example.springbatchdemo.config.LogTasklet"/>
</beans>
|
LogTasklet 实现:
1
2
3
4
5
6
7
8
9
10
11
12
| package com.example.springbatchdemo.config;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class LogTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
System.out.println("Job completed successfully!");
return RepeatStatus.FINISHED;
}
}
|
加载 XML 配置(在 Spring Boot 项目中):
1
2
3
4
5
6
7
8
9
10
11
12
13
| package com.example.springbatchdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
@SpringBootApplication
@ImportResource("classpath:batch-config.xml")
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}
|
说明:
<batch:job>
和 <batch:step>
定义 Job 和 Step。<batch:chunk>
配置 Chunk 模型,指定 Reader、Processor、Writer 和 commit-interval。- XML 配置需要显式定义所有 bean(如 LineMapper、FieldSetMapper)。
优点:
- 适合遗留系统,易于与现有 XML 配置集成。
- 便于动态修改(无需重新编译)。
缺点:
选择建议:
- 新项目:优先使用 Java 配置,简洁且现代化。
- 遗留项目:如果已有 XML 配置,可继续使用或逐步迁移到 Java 配置。
JobParameters 是 Spring Batch 用于区分 Job 实例的运行时参数(如执行日期、文件路径)。每次运行 Job 时,Spring Batch 会根据 JobParameters 创建唯一的 JobInstance,确保作业可重复运行。
JobParameters 通过 JobParametersBuilder
创建,常见参数类型包括:
示例:手动传递 JobParameters
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| package com.example.springbatchdemo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class JobRunner implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importProductsJob;
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFile", "products.csv")
.addLong("runTime", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importProductsJob, jobParameters);
}
}
|
说明:
runTime
作为唯一标识,确保每次运行创建新的 JobInstance。inputFile
可用于动态指定输入文件。
JobParameters 可以通过 ChunkContext
或 @JobScope
/@StepScope
访问。
示例:动态读取文件路径
修改 reader
Bean,使用 @StepScope
注入 JobParameters:
1
2
3
4
5
6
7
8
9
10
11
| @Bean
@StepScope
public FlatFileItemReader<Product> reader(@Value("#{jobParameters['inputFile']}") String inputFile) {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource(inputFile))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build();
}
|
说明:
@StepScope
确保 Reader 在 Step 执行时创建,允许注入动态参数。#{jobParameters['inputFile']}
从 JobParameters 获取参数值。
注意:
- 使用
@StepScope
或 @JobScope
的 Bean 必须是原型作用域,不能是单例。 - 参数访问需要在 Job 执行时提供,否则抛出异常。
Spring Batch 本身不提供调度功能,但可以通过以下方式实现作业调度:
- Spring Scheduler:使用 Spring 的
@Scheduled
注解,适合简单定时任务。 - Quartz Scheduler:功能强大的外部调度器,适合复杂调度需求。
- 手动触发:通过 API 或命令行触发,适合开发测试或按需运行。
Spring Boot 提供 @EnableScheduling
和 @Scheduled
注解,轻松实现定时任务。
示例:每分钟运行 Job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| package com.example.springbatchdemo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@EnableScheduling
public class ScheduledJobRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importProductsJob;
@Scheduled(fixedRate = 60000) // 每分钟运行
public void runJob() throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFile", "products.csv")
.addLong("runTime", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importProductsJob, jobParameters);
}
}
|
主应用类:
1
2
3
4
5
6
| @SpringBootApplication
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}
|
说明:
@EnableScheduling
启用调度支持。@Scheduled(fixedRate = 60000)
表示每 60 秒运行一次。- 每次运行生成唯一的
runTime
参数,避免 JobInstance 重复。
局限性:
- 适合简单定时任务。
- 不支持动态调度(如 CRON 表达式的高级配置)或分布式调度。
Quartz 是一个功能强大的调度框架,支持复杂的调度策略和分布式环境。Spring Batch 提供与 Quartz 的集成模块。
添加依赖(pom.xml
):
1
2
3
4
| <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
|
配置 Quartz Job:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| package com.example.springbatchdemo.config;
import org.quartz.JobExecutionContext;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
public class QuartzBatchJob extends QuartzJobBean {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importProductsJob;
@Override
protected void executeInternal(JobExecutionContext context) throws org.quartz.JobExecutionException {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFile", "products.csv")
.addLong("runTime", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importProductsJob, jobParameters);
} catch (Exception e) {
throw new org.quartz.JobExecutionException(e);
}
}
}
|
配置 Quartz Scheduler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| package com.example.springbatchdemo.config;
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QuartzConfig {
@Bean
public JobDetail jobDetail() {
return JobBuilder.newJob(QuartzBatchJob.class)
.withIdentity("batchJob")
.storeDurably()
.build();
}
@Bean
public Trigger trigger() {
return TriggerBuilder.newTrigger()
.forJob(jobDetail())
.withIdentity("batchTrigger")
.withSchedule(CronScheduleBuilder.cronSchedule("0 * * * * ?")) // 每分钟运行
.build();
}
}
|
说明:
QuartzBatchJob
继承 QuartzJobBean
,定义 Job 执行逻辑。JobDetail
和 Trigger
配置 Quartz 作业,使用 CRON 表达式 0 * * * * ?
表示每分钟运行。- Quartz 支持更复杂的调度(如每日、每周)。
优点:
- 支持 CRON 表达式、动态调度和分布式环境。
- 提供作业持久化、恢复和集群支持。
手动触发适合开发测试或按需运行。可以通过 REST API、命令行或界面触发 Job。
示例:REST API 触发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| package com.example.springbatchdemo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class JobController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importProductsJob;
@GetMapping("/run-job")
public String runJob(@RequestParam String inputFile) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFile", inputFile)
.addLong("runTime", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(importProductsJob, jobParameters);
return "Job started with inputFile: " + inputFile;
}
}
|
测试:
访问 http://localhost:8080/run-job?inputFile=products.csv
,触发 Job。
以下是用 Mermaid 绘制的配置与调度流程图,展示 Job 的定义和触发过程:
graph TD
A[Spring Application] --> B[Configuration: Java/XML]
B --> C[Job: importProductsJob]
C --> D[Step 1: importStep]
C --> E[Step 2: logStep]
D --> F[ItemReader: read CSV]
F --> G[ItemProcessor: convert price]
G --> H[ItemWriter: write to DB]
E --> I[Tasklet: log completion]
A --> J[Scheduler: Spring/Quartz]
J -->|触发| K[JobLauncher]
K -->|运行| C
C --> L[JobRepository]
L -->|存储元数据| M[Database]
K -->|传递| N[JobParameters]
N -->|动态配置| F
说明:
- 配置(Java/XML)定义 Job 和 Step 的结构。
- Scheduler(Spring/Quartz)或手动触发通过 JobLauncher 启动 Job。
- JobParameters 动态配置 Reader(如文件路径)。
- JobRepository 记录执行状态。
配置选择:
- 新项目使用 Java 配置,简洁且类型安全。
- 遗留项目可继续使用 XML,或逐步迁移。
JobParameters:
- 始终包含唯一标识(如时间戳),避免 JobInstance 重复。
- 使用
@StepScope
注入动态参数,保持灵活性。
调度策略:
- 简单定时任务使用 Spring Scheduler。
- 复杂调度(如 CRON、分布式)使用 Quartz 或 Spring Cloud Data Flow。
性能优化:
- 避免频繁调度导致资源竞争,合理设置间隔。
- 使用异步 JobLauncher(
SimpleAsyncJobLauncher
)提高吞吐量。
Q:Java 配置和 XML 配置可以混合使用吗?
A:可以,但不推荐。混合使用可能导致配置复杂,建议统一风格。
Q:如何避免 JobInstance 重复运行?
A:确保 JobParameters 唯一(如添加时间戳),Spring Batch 会自动检查。
Q:调度失败如何处理?
A:Quartz 支持失败重试和恢复,Spring Scheduler 需要手动实现重试逻辑。
本文详细讲解了 Spring Batch 的配置(Java 和 XML)和调度(Spring Scheduler、Quartz、手动触发)机制。通过示例和 Mermaid 图表,你学会了如何定义作业、传递参数和控制执行时机。下一篇文章将聚焦 错误处理与重试机制,内容包括:
- 配置 Skip、Retry 和 Restart。
- 使用监听器(Listener)捕获错误。
- 实现自定义错误处理逻辑。
如果你想提前探索其他主题(如并行处理、数据库集成),可以告诉我!
通过本文,你掌握了 Spring Batch 的配置和调度核心知识。Java 配置提供了现代化的开发体验,XML 配置适用于遗留系统;JobParameters 实现了动态化运行;Spring Scheduler 和 Quartz 提供了灵活的调度方式。你现在可以:
- 根据项目需求选择合适的配置方式。
- 使用 JobParameters 动态控制作业行为。
- 配置定时任务或手动触发作业。
尝试修改示例代码,比如更改调度频率、添加新参数,或用 REST API 触发 Job,体验 Spring Batch 的灵活性!