Spring Batch 专题系列(二):快速入门:构建第一个 Spring Batch 作业

Spring Batch 专题系列(二):快速入门:构建第一个 Spring Batch 作业

1. 引言

在上一篇文章中,我们介绍了 Spring Batch 的核心概念,包括 Job、Step、Chunk、ItemReader、ItemProcessor 和 ItemWriter 等。本文将通过一个简单的示例,带你从零开始构建一个 Spring Batch 作业,体验其基本功能。你将学习如何:

  • 配置 Spring Boot 和 Spring Batch 环境。
  • 实现从 CSV 文件读取数据、处理数据并写入数据库的完整流程。
  • 定义 Job 和 Step。
  • 运行并验证作业结果。

示例场景

我们将实现一个 ETL(Extract-Transform-Load)任务:

  • 输入:一个包含商品信息的 CSV 文件(字段:商品 ID、名称、价格)。
  • 处理:将价格统一转换为美元(假设输入为人民币,乘以汇率 0.14),并过滤掉价格低于 0 的记录。
  • 输出:将处理后的商品数据写入数据库的 product 表。

2. 准备工作

2.1 技术栈

  • Spring Boot:简化项目配置和依赖管理。
  • Spring Batch:提供批处理功能。
  • H2 数据库:嵌入式数据库,便于测试(生产环境可替换为 MySQL、PostgreSQL 等)。
  • Maven:依赖管理工具。

2.2 项目依赖

创建一个 Spring Boot 项目,添加以下依赖(pom.xml):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>spring-batch-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-batch-demo</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.5</version>
        <relativePath/>
    </parent>

    <dependencies>
        <!-- Spring Boot Starter Batch -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <!-- H2 数据库 -->
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!-- Spring Boot Starter Data JPA -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <!-- Lombok(可选,简化实体类代码) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

说明

  • spring-boot-starter-batch:包含 Spring Batch 的核心依赖。
  • spring-boot-starter-data-jpa:用于数据库操作。
  • h2:嵌入式数据库,便于测试。

2.3 输入文件

在项目资源目录(src/main/resources)下创建一个 CSV 文件 products.csv,内容如下:

1
2
3
4
5
id,name,price
1,Laptop,10000
2,Phone,5000
3,Headphones,-100
4,Tablet,8000

说明

  • 每行表示一个商品,包含 id(编号)、name(名称)、price(人民币价格)。
  • 第三行价格为负数,将在处理时被过滤。

2.4 数据库表

定义一个 product 表存储处理后的商品数据。Spring Boot 会自动根据实体类创建表结构。实体类如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package com.example.springbatchdemo.entity;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
public class Product {
    @Id
    private Long id;
    private String name;
    private Double price; // 美元价格
}

3. 实现 Spring Batch 作业

3.1 作业流程图

以下是用 Mermaid 绘制的本次作业流程图,展示数据从 CSV 到数据库的处理过程:

graph TD
    A[Job: importProductsJob] --> B[Step: processProductsStep]
    B --> C[ItemReader: read from products.csv]
    C -->|逐条读取| D[ItemProcessor: convert price to USD, filter invalid]
    D -->|处理后数据| E[ItemWriter: write to product table]
    E --> F[H2 Database]
    A --> G[JobRepository]
    G -->|存储元数据| F

说明

  • Job 名为 importProductsJob,包含一个 StepprocessProductsStep)。
  • ItemReaderproducts.csv 读取数据。
  • ItemProcessor 将价格转换为美元并过滤无效记录。
  • ItemWriter 将数据写入数据库的 product 表。
  • JobRepository 记录作业状态,存储在 H2 数据库。

3.2 配置 Spring Batch

创建一个 Spring Batch 配置类,定义 Job 和 Step:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class BatchConfiguration {

    @Bean
    public FlatFileItemReader<Product> reader() {
        return new FlatFileItemReaderBuilder<Product>()
                .name("productReader")
                .resource(new ClassPathResource("products.csv"))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Job importProductsJob(JobRepository jobRepository, Step processProductsStep) {
        return new JobBuilder("importProductsJob", jobRepository)
                .start(processProductsStep)
                .build();
    }

    @Bean
    public Step processProductsStep(JobRepository jobRepository,
                                   PlatformTransactionManager transactionManager,
                                   FlatFileItemReader<Product> reader,
                                   ProductItemProcessor processor,
                                   JdbcBatchItemWriter<Product> writer) {
        return new StepBuilder("processProductsStep", jobRepository)
                .<Product, Product>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .transactionManager(transactionManager)
                .build();
    }
}

代码说明

  • ItemReader:使用 FlatFileItemReader 读取 products.csv,自动映射到 Product 类。
  • ItemProcessor:自定义处理器(稍后实现),转换价格并过滤。
  • ItemWriter:使用 JdbcBatchItemWriter 批量写入数据库。
  • Step:定义一个 Chunk 模式的 Step,每 10 条数据提交一次事务。
  • Job:定义一个名为 importProductsJob 的作业,包含一个 Step。

3.3 实现 ItemProcessor

创建一个 ProductItemProcessor 类,负责价格转换和过滤:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;

public class ProductItemProcessor implements ItemProcessor<Product, Product> {

    private static final double EXCHANGE_RATE = 0.14; // 人民币到美元汇率

    @Override
    public Product process(Product item) {
        // 过滤价格小于等于 0 的记录
        if (item.getPrice() <= 0) {
            return null; // 返回 null 表示跳过该记录
        }

        // 转换价格到美元
        item.setPrice(item.getPrice() * EXCHANGE_RATE);
        return item;
    }
}

说明

  • 如果价格 ≤ 0,返回 null,Spring Batch 会自动跳过该记录。
  • 否则,将价格乘以汇率 0.14,转换为美元。

3.4 应用配置

application.properties 中配置数据库和 Spring Batch:

1
2
3
4
5
6
spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.batch.jdbc.initialize-schema=always

说明

  • spring.batch.jdbc.initialize-schema=always:自动创建 Spring Batch 所需的元数据表。
  • H2 数据库使用内存模式,适合测试。

3.5 主应用类

确保主类启用 Spring Batch:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package com.example.springbatchdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBatchDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchDemoApplication.class, args);
    }
}

4. 运行与验证

4.1 运行项目

运行 SpringBatchDemoApplication 主类。Spring Boot 会自动启动 importProductsJob(因为 Spring Batch 默认会运行所有定义的 Job)。

4.2 验证结果

  1. 控制台输出
    你将看到 Spring Batch 的日志,显示 Job 的执行状态,如:

    Completed Job: [importProductsJob] - [JobExecution: id=0, status=COMPLETED, ...]
    
  2. 数据库数据
    连接 H2 数据库(访问 http://localhost:8080/h2-console,使用 jdbc:h2:mem:testdb),查询 product 表:

    1
    
    SELECT * FROM product;
    

    预期结果:

    ID | NAME       | PRICE
    1  | Laptop     | 1400.0
    2  | Phone      | 700.0
    4  | Tablet     | 1120.0
    

    说明

    • 第三条记录(Headphones,价格 -100)被过滤。
    • 价格已转换为美元(10000 * 0.14 = 1400,依此类推)。
  3. 元数据表
    Spring Batch 会在数据库中创建元数据表(如 BATCH_JOB_EXECUTIONBATCH_STEP_EXECUTION),记录作业的执行状态。查询示例:

    1
    
    SELECT * FROM BATCH_JOB_EXECUTION;
    

5. 代码与流程回顾

通过这个示例,我们完成了以下工作:

  • 配置:搭建了 Spring Boot 和 Spring Batch 环境。
  • 读取:从 CSV 文件读取商品数据。
  • 处理:将价格转换为美元,过滤无效记录。
  • 写入:将数据批量写入数据库。
  • 运行:通过 JobLauncher 自动运行作业。

Mermaid 流程图清晰展示了数据流向,Chunk 模式确保了高效的批量处理。

6. 常见问题与解答

  • Q:为什么第三条记录没有写入数据库?
    A:ItemProcessor 返回 null 表示跳过该记录,价格 ≤ 0 的记录被过滤。

  • Q:如何手动触发 Job?
    A:可以通过 REST API 或 Spring Scheduler 调用 JobLauncher.run(job, parameters),后续文章会讲解。

  • Q:Chunk 大小如何选择?
    A:Chunk 大小(如 10)需要根据数据量和性能权衡,过大可能占用内存,过小影响效率。

7. 下一步

本示例展示了 Spring Batch 的基本用法,但还有更多功能等待探索。在下一篇文章中,我们将深入讲解 Spring Batch 的核心组件,包括:

  • ItemReader 和 ItemWriter 的多种实现(如数据库、JSON 文件)。
  • Tasklet Step 的使用场景。
  • 如何配置多 Step 的复杂 Job。
updatedupdated2025-04-172025-04-17