在上一篇文章中,我们介绍了 Spring Batch 的核心概念,包括 Job、Step、Chunk、ItemReader、ItemProcessor 和 ItemWriter 等。本文将通过一个简单的示例,带你从零开始构建一个 Spring Batch 作业,体验其基本功能。你将学习如何:
- 配置 Spring Boot 和 Spring Batch 环境。
- 实现从 CSV 文件读取数据、处理数据并写入数据库的完整流程。
- 定义 Job 和 Step。
- 运行并验证作业结果。
我们将实现一个 ETL(Extract-Transform-Load)任务:
- 输入:一个包含商品信息的 CSV 文件(字段:商品 ID、名称、价格)。
- 处理:将价格统一转换为美元(假设输入为人民币,乘以汇率 0.14),并过滤掉价格低于 0 的记录。
- 输出:将处理后的商品数据写入数据库的
product
表。
- Spring Boot:简化项目配置和依赖管理。
- Spring Batch:提供批处理功能。
- H2 数据库:嵌入式数据库,便于测试(生产环境可替换为 MySQL、PostgreSQL 等)。
- Maven:依赖管理工具。
创建一个 Spring Boot 项目,添加以下依赖(pom.xml
):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>spring-batch-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-batch-demo</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/>
</parent>
<dependencies>
<!-- Spring Boot Starter Batch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- H2 数据库 -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Spring Boot Starter Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- Lombok(可选,简化实体类代码) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
|
说明:
spring-boot-starter-batch
:包含 Spring Batch 的核心依赖。spring-boot-starter-data-jpa
:用于数据库操作。h2
:嵌入式数据库,便于测试。
在项目资源目录(src/main/resources
)下创建一个 CSV 文件 products.csv
,内容如下:
1
2
3
4
5
| id,name,price
1,Laptop,10000
2,Phone,5000
3,Headphones,-100
4,Tablet,8000
|
说明:
- 每行表示一个商品,包含
id
(编号)、name
(名称)、price
(人民币价格)。 - 第三行价格为负数,将在处理时被过滤。
定义一个 product
表存储处理后的商品数据。Spring Boot 会自动根据实体类创建表结构。实体类如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| package com.example.springbatchdemo.entity;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;
@Entity
@Data
public class Product {
@Id
private Long id;
private String name;
private Double price; // 美元价格
}
|
以下是用 Mermaid 绘制的本次作业流程图,展示数据从 CSV 到数据库的处理过程:
graph TD
A[Job: importProductsJob] --> B[Step: processProductsStep]
B --> C[ItemReader: read from products.csv]
C -->|逐条读取| D[ItemProcessor: convert price to USD, filter invalid]
D -->|处理后数据| E[ItemWriter: write to product table]
E --> F[H2 Database]
A --> G[JobRepository]
G -->|存储元数据| F
说明:
- Job 名为
importProductsJob
,包含一个 Step(processProductsStep
)。 - ItemReader 从
products.csv
读取数据。 - ItemProcessor 将价格转换为美元并过滤无效记录。
- ItemWriter 将数据写入数据库的
product
表。 - JobRepository 记录作业状态,存储在 H2 数据库。
创建一个 Spring Batch 配置类,定义 Job 和 Step:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class BatchConfiguration {
@Bean
public FlatFileItemReader<Product> reader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build();
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Job importProductsJob(JobRepository jobRepository, Step processProductsStep) {
return new JobBuilder("importProductsJob", jobRepository)
.start(processProductsStep)
.build();
}
@Bean
public Step processProductsStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
FlatFileItemReader<Product> reader,
ProductItemProcessor processor,
JdbcBatchItemWriter<Product> writer) {
return new StepBuilder("processProductsStep", jobRepository)
.<Product, Product>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.transactionManager(transactionManager)
.build();
}
}
|
代码说明:
- ItemReader:使用
FlatFileItemReader
读取 products.csv
,自动映射到 Product
类。 - ItemProcessor:自定义处理器(稍后实现),转换价格并过滤。
- ItemWriter:使用
JdbcBatchItemWriter
批量写入数据库。 - Step:定义一个 Chunk 模式的 Step,每 10 条数据提交一次事务。
- Job:定义一个名为
importProductsJob
的作业,包含一个 Step。
创建一个 ProductItemProcessor
类,负责价格转换和过滤:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
private static final double EXCHANGE_RATE = 0.14; // 人民币到美元汇率
@Override
public Product process(Product item) {
// 过滤价格小于等于 0 的记录
if (item.getPrice() <= 0) {
return null; // 返回 null 表示跳过该记录
}
// 转换价格到美元
item.setPrice(item.getPrice() * EXCHANGE_RATE);
return item;
}
}
|
说明:
- 如果价格 ≤ 0,返回
null
,Spring Batch 会自动跳过该记录。 - 否则,将价格乘以汇率 0.14,转换为美元。
在 application.properties
中配置数据库和 Spring Batch:
1
2
3
4
5
6
| spring.datasource.url=jdbc:h2:mem:testdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect
spring.batch.jdbc.initialize-schema=always
|
说明:
spring.batch.jdbc.initialize-schema=always
:自动创建 Spring Batch 所需的元数据表。- H2 数据库使用内存模式,适合测试。
确保主类启用 Spring Batch:
1
2
3
4
5
6
7
8
9
10
11
| package com.example.springbatchdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBatchDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchDemoApplication.class, args);
}
}
|
运行 SpringBatchDemoApplication
主类。Spring Boot 会自动启动 importProductsJob
(因为 Spring Batch 默认会运行所有定义的 Job)。
控制台输出
你将看到 Spring Batch 的日志,显示 Job 的执行状态,如:
Completed Job: [importProductsJob] - [JobExecution: id=0, status=COMPLETED, ...]
数据库数据
连接 H2 数据库(访问 http://localhost:8080/h2-console
,使用 jdbc:h2:mem:testdb
),查询 product
表:
预期结果:
ID | NAME | PRICE
1 | Laptop | 1400.0
2 | Phone | 700.0
4 | Tablet | 1120.0
说明:
- 第三条记录(Headphones,价格 -100)被过滤。
- 价格已转换为美元(10000 * 0.14 = 1400,依此类推)。
元数据表
Spring Batch 会在数据库中创建元数据表(如 BATCH_JOB_EXECUTION
、BATCH_STEP_EXECUTION
),记录作业的执行状态。查询示例:
1
| SELECT * FROM BATCH_JOB_EXECUTION;
|
通过这个示例,我们完成了以下工作:
- 配置:搭建了 Spring Boot 和 Spring Batch 环境。
- 读取:从 CSV 文件读取商品数据。
- 处理:将价格转换为美元,过滤无效记录。
- 写入:将数据批量写入数据库。
- 运行:通过 JobLauncher 自动运行作业。
Mermaid 流程图清晰展示了数据流向,Chunk 模式确保了高效的批量处理。
Q:为什么第三条记录没有写入数据库?
A:ItemProcessor
返回 null
表示跳过该记录,价格 ≤ 0 的记录被过滤。
Q:如何手动触发 Job?
A:可以通过 REST API 或 Spring Scheduler 调用 JobLauncher.run(job, parameters)
,后续文章会讲解。
Q:Chunk 大小如何选择?
A:Chunk 大小(如 10)需要根据数据量和性能权衡,过大可能占用内存,过小影响效率。
本示例展示了 Spring Batch 的基本用法,但还有更多功能等待探索。在下一篇文章中,我们将深入讲解 Spring Batch 的核心组件,包括:
- ItemReader 和 ItemWriter 的多种实现(如数据库、JSON 文件)。
- Tasklet Step 的使用场景。
- 如何配置多 Step 的复杂 Job。