Spring Batch 专题系列(四):配置与调度 Spring Batch 作业

Spring Batch 专题系列(四):配置与调度 Spring Batch 作业

1. 引言

在上一篇文章中,我们详细探讨了 Spring Batch 的核心组件(Job、Step、Chunk、ItemReader、ItemProcessor、ItemWriter),并通过示例展示了它们的协作方式。掌握了这些组件后,接下来需要了解如何灵活配置 Spring Batch 作业,并通过调度机制控制作业的执行时机。本文将聚焦以下内容:

  • Spring Batch 的配置方式:XML 配置和 Java 配置的对比与实现。
  • JobParameters 的定义和使用,用于动态传递运行时参数。
  • 调度 Spring Batch 作业:使用 Spring Scheduler、Quartz 或手动触发。
  • 通过代码示例和 Mermaid 图表展示配置和调度的完整流程。

通过本文,你将学会如何根据项目需求配置 Spring Batch 作业,并实现定时或手动触发,为生产环境部署奠定基础。

2. Spring Batch 配置方式

Spring Batch 支持两种主要配置方式:XML 配置Java 配置。Java 配置因其类型安全和现代化特性在 Spring Boot 项目中更常见,但 XML 配置在遗留系统或特定场景中仍有使用价值。以下分别介绍这两种方式。

2.1 Java 配置

Java 配置使用 Spring 的 @Configuration 注解和流式 API(如 JobBuilderStepBuilder)定义 Job 和 Step。上一篇文章的示例已展示了 Java 配置,这里回顾并扩展一个更复杂的配置。

示例:Java 配置多 Step 作业

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class BatchConfiguration {

    @Bean
    public FlatFileItemReader<Product> reader() {
        return new FlatFileItemReaderBuilder<Product>()
                .name("productReader")
                .resource(new ClassPathResource("products.csv"))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step importStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("importStep", jobRepository)
                .<Product, Product>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Step logStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("logStep", jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("Job completed successfully!");
                    return RepeatStatus.FINISHED;
                })
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job importProductsJob(JobRepository jobRepository, Step importStep, Step logStep) {
        return new JobBuilder("importProductsJob", jobRepository)
                .start(importStep)
                .next(logStep)
                .build();
    }
}

Processor 实现(为完整性重复):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;

public class ProductItemProcessor implements ItemProcessor<Product, Product> {
    private static final double EXCHANGE_RATE = 0.14;

    @Override
    public Product process(Product item) {
        if (item.getPrice() <= 0) {
            return null;
        }
        item.setPrice(item.getPrice() * EXCHANGE_RATE);
        return item;
    }
}

说明

  • 使用 @Bean 定义 Reader、Processor、Writer、Step 和 Job。
  • JobBuilderStepBuilder 提供流式 API,清晰定义作业结构。
  • 支持条件流(如 .on("COMPLETED").to(nextStep)),后续文章会深入。

优点

  • 类型安全,编译期检查错误。
  • 与 Spring Boot 集成紧密,易于调试。
  • 代码清晰,适合现代开发。

2.2 XML 配置

XML 配置使用 Spring 的 XML 配置文件定义 Job 和 Step,常见于早期 Spring 项目。以下是将上述 Java 配置转换为 XML 的等效实现。

示例:XML 配置

创建 batch-config.xml(放置在 src/main/resources):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/batch
                           http://www.springframework.org/schema/batch/spring-batch.xsd">

    <!-- ItemReader -->
    <bean id="productReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="resource" value="classpath:products.csv"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="id,name,price"/>
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
                        <property name="targetType" value="com.example.springbatchdemo.entity.Product"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>

    <!-- ItemProcessor -->
    <bean id="productProcessor" class="com.example.springbatchdemo.config.ProductItemProcessor"/>

    <!-- ItemWriter -->
    <bean id="productWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource"/>
        <property name="sql" value="INSERT INTO product (id, name, price) VALUES (:id, :name, :price)"/>
        <property name="itemPreparedStatementSetter">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
    </bean>

    <!-- Step -->
    <batch:step id="importStep">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>

    <!-- Tasklet Step -->
    <batch:step id="logStep">
        <batch:tasklet transaction-manager="transactionManager">
            <bean class="com.example.springbatchdemo.config.LogTasklet"/>
        </batch:tasklet>
    </batch:step>

    <!-- Job -->
    <batch:job id="importProductsJob" job-repository="jobRepository">
        <batch:step id="step1" next="step2">
            <batch:tasklet ref="importStep"/>
        </batch:step>
        <batch:step id="step2">
            <batch:tasklet ref="logStep"/>
        </batch:step>
    </batch:job>

    <!-- Tasklet 实现 -->
    <bean id="logTasklet" class="com.example.springbatchdemo.config.LogTasklet"/>
</beans>

LogTasklet 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package com.example.springbatchdemo.config;

import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class LogTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        System.out.println("Job completed successfully!");
        return RepeatStatus.FINISHED;
    }
}

加载 XML 配置(在 Spring Boot 项目中):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
package com.example.springbatchdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;

@SpringBootApplication
@ImportResource("classpath:batch-config.xml")
public class SpringBatchDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchDemoApplication.class, args);
    }
}

说明

  • <batch:job><batch:step> 定义 Job 和 Step。
  • <batch:chunk> 配置 Chunk 模型,指定 Reader、Processor、Writer 和 commit-interval。
  • XML 配置需要显式定义所有 bean(如 LineMapper、FieldSetMapper)。

优点

  • 适合遗留系统,易于与现有 XML 配置集成。
  • 便于动态修改(无需重新编译)。

缺点

  • 配置冗长,易出错。
  • 缺乏类型安全,调试困难。

选择建议

  • 新项目:优先使用 Java 配置,简洁且现代化。
  • 遗留项目:如果已有 XML 配置,可继续使用或逐步迁移到 Java 配置。

3. JobParameters 的使用

JobParameters 是 Spring Batch 用于区分 Job 实例的运行时参数(如执行日期、文件路径)。每次运行 Job 时,Spring Batch 会根据 JobParameters 创建唯一的 JobInstance,确保作业可重复运行。

3.1 定义 JobParameters

JobParameters 通过 JobParametersBuilder 创建,常见参数类型包括:

  • String
  • Long
  • Double
  • Date

示例:手动传递 JobParameters

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.example.springbatchdemo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class JobRunner implements CommandLineRunner {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importProductsJob;

    @Override
    public void run(String... args) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("inputFile", "products.csv")
                .addLong("runTime", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(importProductsJob, jobParameters);
    }
}

说明

  • runTime 作为唯一标识,确保每次运行创建新的 JobInstance。
  • inputFile 可用于动态指定输入文件。

3.2 在 Job 中访问 JobParameters

JobParameters 可以通过 ChunkContext@JobScope/@StepScope 访问。

示例:动态读取文件路径

修改 reader Bean,使用 @StepScope 注入 JobParameters:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Bean
@StepScope
public FlatFileItemReader<Product> reader(@Value("#{jobParameters['inputFile']}") String inputFile) {
    return new FlatFileItemReaderBuilder<Product>()
            .name("productReader")
            .resource(new ClassPathResource(inputFile))
            .delimited()
            .names("id", "name", "price")
            .targetType(Product.class)
            .build();
}

说明

  • @StepScope 确保 Reader 在 Step 执行时创建,允许注入动态参数。
  • #{jobParameters['inputFile']} 从 JobParameters 获取参数值。

注意

  • 使用 @StepScope@JobScope 的 Bean 必须是原型作用域,不能是单例。
  • 参数访问需要在 Job 执行时提供,否则抛出异常。

4. 调度 Spring Batch 作业

Spring Batch 本身不提供调度功能,但可以通过以下方式实现作业调度:

  1. Spring Scheduler:使用 Spring 的 @Scheduled 注解,适合简单定时任务。
  2. Quartz Scheduler:功能强大的外部调度器,适合复杂调度需求。
  3. 手动触发:通过 API 或命令行触发,适合开发测试或按需运行。

4.1 使用 Spring Scheduler

Spring Boot 提供 @EnableScheduling@Scheduled 注解,轻松实现定时任务。

示例:每分钟运行 Job

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.example.springbatchdemo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@EnableScheduling
public class ScheduledJobRunner {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importProductsJob;

    @Scheduled(fixedRate = 60000) // 每分钟运行
    public void runJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("inputFile", "products.csv")
                .addLong("runTime", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(importProductsJob, jobParameters);
    }
}

主应用类

1
2
3
4
5
6
@SpringBootApplication
public class SpringBatchDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchDemoApplication.class, args);
    }
}

说明

  • @EnableScheduling 启用调度支持。
  • @Scheduled(fixedRate = 60000) 表示每 60 秒运行一次。
  • 每次运行生成唯一的 runTime 参数,避免 JobInstance 重复。

局限性

  • 适合简单定时任务。
  • 不支持动态调度(如 CRON 表达式的高级配置)或分布式调度。

4.2 使用 Quartz Scheduler

Quartz 是一个功能强大的调度框架,支持复杂的调度策略和分布式环境。Spring Batch 提供与 Quartz 的集成模块。

添加依赖pom.xml):

1
2
3
4
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

配置 Quartz Job

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.springbatchdemo.config;

import org.quartz.JobExecutionContext;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class QuartzBatchJob extends QuartzJobBean {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importProductsJob;

    @Override
    protected void executeInternal(JobExecutionContext context) throws org.quartz.JobExecutionException {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("inputFile", "products.csv")
                    .addLong("runTime", System.currentTimeMillis())
                    .toJobParameters();
            jobLauncher.run(importProductsJob, jobParameters);
        } catch (Exception e) {
            throw new org.quartz.JobExecutionException(e);
        }
    }
}

配置 Quartz Scheduler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.example.springbatchdemo.config;

import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QuartzConfig {

    @Bean
    public JobDetail jobDetail() {
        return JobBuilder.newJob(QuartzBatchJob.class)
                .withIdentity("batchJob")
                .storeDurably()
                .build();
    }

    @Bean
    public Trigger trigger() {
        return TriggerBuilder.newTrigger()
                .forJob(jobDetail())
                .withIdentity("batchTrigger")
                .withSchedule(CronScheduleBuilder.cronSchedule("0 * * * * ?")) // 每分钟运行
                .build();
    }
}

说明

  • QuartzBatchJob 继承 QuartzJobBean,定义 Job 执行逻辑。
  • JobDetailTrigger 配置 Quartz 作业,使用 CRON 表达式 0 * * * * ? 表示每分钟运行。
  • Quartz 支持更复杂的调度(如每日、每周)。

优点

  • 支持 CRON 表达式、动态调度和分布式环境。
  • 提供作业持久化、恢复和集群支持。

4.3 手动触发

手动触发适合开发测试或按需运行。可以通过 REST API、命令行或界面触发 Job。

示例:REST API 触发

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.example.springbatchdemo;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class JobController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job importProductsJob;

    @GetMapping("/run-job")
    public String runJob(@RequestParam String inputFile) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("inputFile", inputFile)
                .addLong("runTime", System.currentTimeMillis())
                .toJobParameters();
        jobLauncher.run(importProductsJob, jobParameters);
        return "Job started with inputFile: " + inputFile;
    }
}

测试: 访问 http://localhost:8080/run-job?inputFile=products.csv,触发 Job。

5. 配置与调度流程图

以下是用 Mermaid 绘制的配置与调度流程图,展示 Job 的定义和触发过程:

graph TD
    A[Spring Application] --> B[Configuration: Java/XML]
    B --> C[Job: importProductsJob]
    C --> D[Step 1: importStep]
    C --> E[Step 2: logStep]
    D --> F[ItemReader: read CSV]
    F --> G[ItemProcessor: convert price]
    G --> H[ItemWriter: write to DB]
    E --> I[Tasklet: log completion]
    A --> J[Scheduler: Spring/Quartz]
    J -->|触发| K[JobLauncher]
    K -->|运行| C
    C --> L[JobRepository]
    L -->|存储元数据| M[Database]
    K -->|传递| N[JobParameters]
    N -->|动态配置| F

说明

  • 配置(Java/XML)定义 Job 和 Step 的结构。
  • Scheduler(Spring/Quartz)或手动触发通过 JobLauncher 启动 Job。
  • JobParameters 动态配置 Reader(如文件路径)。
  • JobRepository 记录执行状态。

6. 最佳实践

  1. 配置选择

    • 新项目使用 Java 配置,简洁且类型安全。
    • 遗留项目可继续使用 XML,或逐步迁移。
  2. JobParameters

    • 始终包含唯一标识(如时间戳),避免 JobInstance 重复。
    • 使用 @StepScope 注入动态参数,保持灵活性。
  3. 调度策略

    • 简单定时任务使用 Spring Scheduler。
    • 复杂调度(如 CRON、分布式)使用 Quartz 或 Spring Cloud Data Flow。
  4. 性能优化

    • 避免频繁调度导致资源竞争,合理设置间隔。
    • 使用异步 JobLauncher(SimpleAsyncJobLauncher)提高吞吐量。

7. 常见问题与解答

  • Q:Java 配置和 XML 配置可以混合使用吗?
    A:可以,但不推荐。混合使用可能导致配置复杂,建议统一风格。

  • Q:如何避免 JobInstance 重复运行?
    A:确保 JobParameters 唯一(如添加时间戳),Spring Batch 会自动检查。

  • Q:调度失败如何处理?
    A:Quartz 支持失败重试和恢复,Spring Scheduler 需要手动实现重试逻辑。

8. 下一步

本文详细讲解了 Spring Batch 的配置(Java 和 XML)和调度(Spring Scheduler、Quartz、手动触发)机制。通过示例和 Mermaid 图表,你学会了如何定义作业、传递参数和控制执行时机。下一篇文章将聚焦 错误处理与重试机制,内容包括:

  • 配置 Skip、Retry 和 Restart。
  • 使用监听器(Listener)捕获错误。
  • 实现自定义错误处理逻辑。

如果你想提前探索其他主题(如并行处理、数据库集成),可以告诉我!


总结

通过本文,你掌握了 Spring Batch 的配置和调度核心知识。Java 配置提供了现代化的开发体验,XML 配置适用于遗留系统;JobParameters 实现了动态化运行;Spring Scheduler 和 Quartz 提供了灵活的调度方式。你现在可以:

  • 根据项目需求选择合适的配置方式。
  • 使用 JobParameters 动态控制作业行为。
  • 配置定时任务或手动触发作业。

尝试修改示例代码,比如更改调度频率、添加新参数,或用 REST API 触发 Job,体验 Spring Batch 的灵活性!

updatedupdated2025-04-172025-04-17