Spring Batch 专题系列(三):Spring Batch 的核心组件详解

Spring Batch 专题系列(三):Spring Batch 的核心组件详解

1. 引言

在上一篇文章中,我们通过一个简单的示例(从 CSV 文件读取商品数据,处理后写入数据库)快速搭建并运行了一个 Spring Batch 作业,初步接触了 Job、Step、ItemReader、ItemProcessor 和 ItemWriter 等核心组件。本文将进一步深入这些组件,详细讲解它们的定义、作用、实现方式及常见用法。我们将:

  • 分析 Job 和 Step 的结构与配置。
  • 对比 Chunk-Oriented Step 和 Tasklet Step 的使用场景。
  • 探索 ItemReader、ItemProcessor 和 ItemWriter 的多种实现。
  • 通过代码示例和 Mermaid 图表展示组件的协作方式。

通过本文,你将对 Spring Batch 的核心组件有系统性的理解,为后续学习错误处理、并行处理等高级主题打下坚实基础。

2. Spring Batch 核心组件概览

Spring Batch 的设计围绕模块化和可扩展性,其核心组件共同协作完成批处理任务。以下是主要组件的简要回顾:

  • Job:一个完整的批处理任务,包含一个或多个 Step。
  • Step:Job 的独立执行单元,分为 Chunk-Oriented Step(基于块的处理)和 Tasklet Step(自定义任务)。
  • Chunk:Spring Batch 的核心处理模型,将数据分成块(Chunk)进行读取、处理和写入。
  • ItemReader:从数据源读取数据的组件(如文件、数据库、消息队列)。
  • ItemProcessor:对读取的数据进行转换或业务逻辑处理(可选)。
  • ItemWriter:将处理后的数据写入目标(如数据库、文件)。
  • JobRepository:存储 Job 和 Step 的元数据,通常使用数据库实现。
  • JobLauncher:启动 Job 的组件。

本文将重点讲解 Job、Step 和 Chunk 模型的实现细节,并深入 ItemReader、ItemProcessor 和 ItemWriter 的多种用法。

组件协作流程图

以下是用 Mermaid 绘制的 Spring Batch 核心组件协作流程图,展示 Job、Step 和 Chunk 模型的关系:

graph TD
    A[JobLauncher] -->|启动| B[Job]
    B --> C[Step 1: Chunk-Oriented]
    B --> D[Step 2: Tasklet]
    C --> E[ItemReader]
    C --> F[ItemProcessor]
    C --> G[ItemWriter]
    E -->|读取数据| F
    F -->|处理数据| G
    G -->|写入数据| H[Data Source]
    D --> I[Tasklet: Custom Logic]
    B --> J[JobRepository]
    J -->|存储元数据| K[Database]

说明

  • JobLauncher 触发 Job
  • Job 包含多个 Step,可以是 Chunk-Oriented Step(包含 Reader、Processor、Writer)或 Tasklet Step(自定义逻辑)。
  • JobRepository 记录执行状态,存储在数据库中。

3. Job 和 Step 详解

3.1 Job

Job 是 Spring Batch 的顶级抽象,表示一个完整的批处理任务。每个 Job 由一个或多个 Step 组成,按照定义的顺序执行。Job 的主要职责包括:

  • 定义 Step 的执行顺序(顺序执行、条件分支或并行)。
  • 管理 JobParameters(运行时参数,如执行日期)。
  • 记录执行状态(如 COMPLETED、FAILED)。

配置示例

1
2
3
4
5
6
7
@Bean
public Job sampleJob(JobRepository jobRepository, Step step1, Step step2) {
    return new JobBuilder("sampleJob", jobRepository)
            .start(step1)
            .next(step2)
            .build();
}

说明

  • JobBuilder 用于创建 Job,指定名称和 JobRepository。
  • .start(step1).next(step2) 定义 Step 的顺序执行。
  • Job 支持条件流(如根据 Step 的状态跳转),后续文章会深入讲解。

3.2 Step

Step 是 Job 的独立执行单元,每个 Step 包含具体的处理逻辑。Spring Batch 支持两种 Step 类型:

  1. Chunk-Oriented Step
    基于 Chunk 模型,适合处理大量数据。每次读取、处理和写入一批数据(Chunk),通过事务管理确保数据一致性。
    典型场景:ETL 任务、数据迁移。

  2. Tasklet Step
    执行自定义逻辑,适合简单或非数据驱动的任务。Tasklet 是一个接口,开发者实现具体逻辑。
    典型场景:调用存储过程、清理临时文件。

Chunk-Oriented Step 配置示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Bean
public Step chunkStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                      ItemReader<Product> reader, ItemProcessor<Product, Product> processor,
                      ItemWriter<Product> writer) {
    return new StepBuilder("chunkStep", jobRepository)
            .<Product, Product>chunk(10) // 每 10 条数据提交一次事务
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .transactionManager(transactionManager)
            .build();
}

Tasklet Step 配置示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Bean
public Step taskletStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("taskletStep", jobRepository)
            .tasklet((contribution, chunkContext) -> {
                System.out.println("Executing custom tasklet logic...");
                // 自定义逻辑,如调用存储过程
                return RepeatStatus.FINISHED;
            })
            .transactionManager(transactionManager)
            .build();
}

对比

  • Chunk-Oriented Step:适合批量数据处理,自动管理读/写事务。
  • Tasklet Step:适合一次性或非结构化任务,逻辑完全自定义。
  • 选择建议:优先使用 Chunk-Oriented Step 处理数据驱动任务;使用 Tasklet Step 处理简单控制逻辑或初始化/清理任务。

4. Chunk 模型详解

Chunk 是 Spring Batch 的核心处理模型,专为高效处理大量数据设计。Chunk-Oriented Step 将数据分成固定大小的块(Chunk),每个 Chunk 包含以下步骤:

  1. 读取:ItemReader 读取一批数据(逐条读取,直到达到 Chunk 大小)。
  2. 处理:ItemProcessor(可选)对每条数据进行转换或业务逻辑处理。
  3. 写入:ItemWriter 将处理后的 Chunk 批量写入目标。

Chunk 模型的优势在于:

  • 事务管理:每个 Chunk 提交一个事务,失败时只回滚当前 Chunk。
  • 性能优化:批量读写减少 IO 开销。
  • 可扩展性:支持多线程和分区处理(后续文章会讲解)。

以下是用 Mermaid 绘制的 Chunk 处理流程图:

graph TD
    A[Start Chunk] --> B[ItemReader: Read Item]
    B -->|逐条读取| C{Items < Chunk Size?}
    C -->|是| B
    C -->|否| D[ItemProcessor: Process Items]
    D --> E[ItemWriter: Write Chunk]
    E -->|提交事务| F{More Data?}
    F -->|是| B
    F -->|否| G[End Chunk]

说明

  • ItemReader 逐条读取数据,累积到 Chunk 大小(如 10 条)。
  • ItemProcessor(可选)处理每条数据。
  • ItemWriter 批量写入 Chunk,提交事务。
  • 如果有更多数据,重复上述过程。

配置 Chunk 大小

1
.chunk(10) // 每 10 条数据一个 Chunk
  • 选择建议:Chunk 大小需要根据数据量、内存和性能权衡。过大可能导致内存溢出,过小可能降低效率。常见范围:10-1000。

5. ItemReader 详解

ItemReader 负责从数据源读取数据,是 Chunk 模型的起点。Spring Batch 提供了多种内置 ItemReader 实现,覆盖常见数据源。

5.1 常见 ItemReader 实现

  1. FlatFileItemReader
    读取平面文件(如 CSV、TXT)。
    场景:从 CSV 文件导入数据。
    示例(上一篇文章已展示):

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    @Bean
    public FlatFileItemReader<Product> reader() {
        return new FlatFileItemReaderBuilder<Product>()
                .name("productReader")
                .resource(new ClassPathResource("products.csv"))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
    }
    
  2. JdbcCursorItemReader
    使用数据库游标逐行读取数据,适合大数据量。
    场景:从数据库表读取记录。
    示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    @Bean
    public JdbcCursorItemReader<Product> jdbcReader(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<Product>()
                .name("jdbcReader")
                .dataSource(dataSource)
                .sql("SELECT id, name, price FROM product")
                .beanRowMapper(Product.class)
                .build();
    }
    
  3. JpaPagingItemReader
    使用 JPA 分页读取数据,适合与 Hibernate 集成。
    场景:从 JPA 实体读取数据。
    示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    @Bean
    public JpaPagingItemReader<Product> jpaReader(EntityManagerFactory entityManagerFactory) {
        return new JpaPagingItemReaderBuilder<Product>()
                .name("jpaReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT p FROM Product p")
                .pageSize(100)
                .build();
    }
    
  4. JsonItemReader
    读取 JSON 文件或流。
    场景:处理 API 返回的 JSON 数据。
    示例:

    1
    2
    3
    4
    5
    6
    7
    8
    
    @Bean
    public JsonItemReader<Product> jsonReader() {
        return new JsonItemReaderBuilder<Product>()
                .name("jsonReader")
                .resource(new ClassPathResource("products.json"))
                .jsonObjectReader(new JacksonJsonObjectReader<>(Product.class))
                .build();
    }
    
  5. StaxEventItemReader
    读取 XML 文件,基于流式解析。
    场景:处理大型 XML 文件。

  6. QueueItemReader
    从消息队列(如 RabbitMQ、Kafka)读取数据。
    场景:消费队列中的消息。

5.2 自定义 ItemReader

如果内置实现无法满足需求,可以实现 ItemReader 接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class CustomItemReader implements ItemReader<String> {
    private final List<String> items = Arrays.asList("Item1", "Item2", "Item3");
    private int index = 0;

    @Override
    public String read() {
        if (index < items.size()) {
            return items.get(index++);
        }
        return null; // 返回 null 表示读取结束
    }
}

场景:从自定义数据源(如内存列表、API 调用)读取数据。

6. ItemProcessor 详解

ItemProcessor 是可选组件,负责对读取的数据进行转换、过滤或执行业务逻辑。ItemProcessor 在 ItemReader 和 ItemWriter 之间充当“中间人”。

6.1 基本用法

实现 ItemProcessor 接口,定义处理逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
    private static final double EXCHANGE_RATE = 0.14;

    @Override
    public Product process(Product item) {
        if (item.getPrice() <= 0) {
            return null; // 过滤无效记录
        }
        item.setPrice(item.getPrice() * EXCHANGE_RATE); // 转换为美元
        return item;
    }
}

说明

  • 返回 null 表示跳过该记录。
  • 可以返回不同类型的对象(如 <Product, Order>),实现类型转换。

6.2 高级用法

  1. 复合处理器(CompositeItemProcessor)
    组合多个处理器,按顺序执行。
    示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    @Bean
    public CompositeItemProcessor<Product, Product> compositeProcessor() {
        CompositeItemProcessor<Product, Product> processor = new CompositeItemProcessor<>();
        processor.setDelegates(Arrays.asList(
                new ProductItemProcessor(), // 转换价格
                new ValidationProcessor()   // 校验数据
        ));
        return processor;
    }
    
  2. 过滤处理器
    专门用于过滤数据,返回 null 跳过记录。
    场景:过滤不符合条件的记录(如空值、非法格式)。

  3. 转换处理器
    将输入类型转换为输出类型。
    场景:将 CSV 数据的字符串字段转换为实体对象。

7. ItemWriter 详解

ItemWriter 负责将处理后的数据写入目标,是 Chunk 模型的终点。Spring Batch 提供了多种内置 ItemWriter 实现。

7.1 常见 ItemWriter 实现

  1. JdbcBatchItemWriter
    批量写入数据库,高效且支持事务。
    示例:

    1
    2
    3
    4
    5
    6
    7
    8
    
    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }
    
  2. JpaItemWriter
    使用 JPA 写入数据,适合与 Hibernate 集成。
    示例:

    1
    2
    3
    4
    5
    6
    
    @Bean
    public JpaItemWriter<Product> jpaWriter(EntityManagerFactory entityManagerFactory) {
        JpaItemWriter<Product> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManagerFactory);
        return writer;
    }
    
  3. FlatFileItemWriter
    写入平面文件(如 CSV、TXT)。
    场景:生成报表文件。
    示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    @Bean
    public FlatFileItemWriter<Product> fileWriter() {
        return new FlatFileItemWriterBuilder<Product>()
                .name("productWriter")
                .resource(new FileSystemResource("output.csv"))
                .delimited()
                .names("id", "name", "price")
                .build();
    }
    
  4. JsonItemWriter
    写入 JSON 文件或流。
    场景:生成 JSON 格式的输出。

  5. CompositeItemWriter
    组合多个 Writer,数据写入多个目标。
    示例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    @Bean
    public CompositeItemWriter<Product> compositeWriter() {
        CompositeItemWriter<Product> writer = new CompositeItemWriter<>();
        writer.setDelegates(Arrays.asList(
                jdbcWriter(dataSource()), // 写入数据库
                fileWriter()              // 写入文件
        ));
        return writer;
    }
    

7.2 自定义 ItemWriter

实现 ItemWriter 接口,定义自定义写入逻辑:

1
2
3
4
5
6
7
8
public class CustomItemWriter implements ItemWriter<String> {
    @Override
    public void write(Chunk<? extends String> chunk) {
        for (String item : chunk.getItems()) {
            System.out.println("Writing: " + item); // 模拟写入
        }
    }
}

场景:将数据写入非标准目标(如 API、日志)。

8. 综合示例:多 Step 作业

以下是一个综合示例,展示一个包含 Chunk-Oriented Step 和 Tasklet Step 的 Job:

  • Step 1(Chunk-Oriented):从 CSV 读取商品数据,转换价格后写入数据库。
  • Step 2(Tasklet):记录作业完成日志。

8.1 代码实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Configuration
public class BatchConfiguration {

    @Bean
    public FlatFileItemReader<Product> reader() {
        return new FlatFileItemReaderBuilder<Product>()
                .name("productReader")
                .resource(new ClassPathResource("products.csv"))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
    }

    @Bean
    public ItemProcessor<Product, Product> processor() {
        return item -> {
            if (item.getPrice() <= 0) return null;
            item.setPrice(item.getPrice() * 0.14);
            return item;
        };
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step chunkStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("chunkStep", jobRepository)
                .<Product, Product>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer(dataSource()))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Step taskletStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("taskletStep", jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("Job completed successfully!");
                    return RepeatStatus.FINISHED;
                })
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job multiStepJob(JobRepository jobRepository, Step chunkStep, Step taskletStep) {
        return new JobBuilder("multiStepJob", jobRepository)
                .start(chunkStep)
                .next(taskletStep)
                .build();
    }
}

8.2 流程图

以下是用 Mermaid 绘制的多 Step 作业流程图:

graph TD
    A[Job: multiStepJob] --> B[Step 1: chunkStep]
    B --> C[ItemReader: read CSV]
    C --> D[ItemProcessor: convert price]
    D --> E[ItemWriter: write to DB]
    B -->|完成后| F[Step 2: taskletStep]
    F --> G[Tasklet: log completion]
    A --> H[JobRepository]
    H -->|存储元数据| I[Database]

说明

  • chunkStep 处理 CSV 数据并写入数据库。
  • taskletStep 记录作业完成日志。
  • JobRepository 记录两个 Step 的状态。

9. 常见问题与解答

  • Q:如何选择 Chunk-Oriented Step 和 Tasklet Step?
    A:Chunk-Oriented Step 适合批量数据处理(如 ETL);Tasklet Step 适合简单控制逻辑(如初始化、清理)。

  • Q:ItemProcessor 是否必须实现?
    A:ItemProcessor 是可选的。如果不需要数据转换,可以直接从 Reader 传递到 Writer。

  • Q:如何处理大型数据集的性能问题?
    A:调整 Chunk 大小、使用分页读取(如 JpaPagingItemReader)、启用多线程或分区(后续文章会讲解)。

10. 下一步

本文详细讲解了 Spring Batch 的核心组件,包括 Job、Step、Chunk 模型和 Reader/Processor/Writer 的实现方式。通过示例和 Mermaid 图表,你应该对组件的协作方式有了深入理解。下一篇文章将聚焦 配置与调度 Spring Batch 作业,内容包括:

  • XML 和 Java 配置的对比。
  • 使用 Spring Scheduler 或外部调度器(如 Quartz)触发作业。
  • 传递和使用 JobParameters。
updatedupdated2025-04-172025-04-17