在上一篇文章中,我们学习了 Spring Batch 的错误处理机制(Skip、Retry、Restart 和 Listener),掌握了如何提升作业的健壮性。随着数据量的增加,批处理任务的性能成为关键挑战。Spring Batch 提供了强大的并行处理功能,包括多线程 Step、分区(Partitioning)和并行 Job,能够显著缩短运行时间。此外,性能优化还涉及 Chunk 大小、缓冲区配置等细节。
本文将聚焦以下内容:
- 多线程 Step:使用线程池并行执行 Step。
- 分区(Partitioning):将大数据集分割为多个子集并行处理。
- 并行 Job:同时运行多个独立 Job。
- 性能优化技巧:调整 Chunk 大小、优化数据库交互等。
- 通过代码示例和 Mermaid 图表展示并行处理和优化的实现。
通过本文,你将学会如何利用 Spring Batch 的并行机制处理海量数据,并优化作业性能,为生产环境提供高效的批处理解决方案。
Spring Batch 的并行处理旨在通过并发执行任务来提高吞吐量,主要包括以下方式:
- 多线程 Step:在单个 Step 内使用线程池并行处理 Chunk,适合 CPU 密集型或 IO 密集型任务。
- 分区(Partitioning):将大数据集分割为多个子集,每个子集由独立的 Step 处理,可分布在多线程或多节点上。
- 并行 Job:同时运行多个独立 Job,适合无依赖关系的任务。
- 异步执行:通过异步 JobLauncher 并发启动 Job。
这些机制依赖 Spring Batch 的任务执行器(TaskExecutor)和分区管理器(PartitionHandler)。性能优化的关键在于合理配置线程数、Chunk 大小和数据源访问。
以下是用 Mermaid 绘制的 Spring Batch 并行处理概览图,展示多线程 Step 和分区的关系:
graph TD
A[Job] --> B[Partitioned Step]
A --> C[Multi-Threaded Step]
B --> D[Partitioner]
D --> E[Slave Step 1]
D --> F[Slave Step 2]
D --> G[Slave Step N]
E --> H[ItemReader]
E --> I[ItemProcessor]
E --> J[ItemWriter]
C --> K[Thread 1: Chunk]
C --> L[Thread 2: Chunk]
C --> M[Thread N: Chunk]
K --> N[ItemReader]
K --> O[ItemProcessor]
K --> P[ItemWriter]
A --> Q[JobRepository]
Q -->|存储元数据| R[Database]
说明:
- Partitioned Step:通过 Partitioner 将数据分割,分配给多个 Slave Step 并行执行。
- Multi-Threaded Step:单个 Step 使用线程池并行处理 Chunk。
- JobRepository 记录所有执行状态,确保数据一致性。
多线程 Step 使用线程池在单个 Step 内并行处理 Chunk,适合数据量适中且任务可并行的场景(如文件读取、简单转换)。
通过 StepBuilder
的 .taskExecutor()
配置线程池。
示例:多线程读取 CSV 并写入数据库
假设 products.csv
包含大量记录,我们希望通过多线程加速处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class BatchConfiguration {
@Bean
public FlatFileItemReader<Product> reader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build();
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step multiThreadedStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Product, Product>chunk(100) // 每 100 条一个 Chunk
.reader(reader())
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.taskExecutor(taskExecutor())
.throttleLimit(4) // 最大 4 个线程
.build();
}
@Bean
public Job multiThreadedJob(JobRepository jobRepository, Step multiThreadedStep) {
return new JobBuilder("multiThreadedJob", jobRepository)
.start(multiThreadedStep)
.build();
}
@Bean
public org.springframework.core.task.TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(4); // 限制最大线程数
return executor;
}
}
|
Processor 实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
private static final double EXCHANGE_RATE = 0.14;
@Override
public Product process(Product item) {
if (item.getPrice() <= 0) {
return null;
}
item.setPrice(item.getPrice() * EXCHANGE_RATE);
return item;
}
}
|
实体类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| package com.example.springbatchdemo.entity;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;
@Entity
@Data
public class Product {
@Id
private Long id;
private String name;
private Double price;
}
|
说明:
.taskExecutor(taskExecutor())
指定线程池,SimpleAsyncTaskExecutor
创建新线程。.throttleLimit(4)
限制最大并发线程数为 4。chunk(100)
设置每个 Chunk 处理 100 条记录。- 每个线程独立处理一个 Chunk,Reader/Processor/Writer 必须线程安全。
运行结果:
- 4 个线程并行处理 CSV 文件,显著缩短运行时间。
- 数据库写入仍按 Chunk 提交事务,保证一致性。
流程图:
graph TD
A[Multi-Threaded Step] --> B[Thread Pool]
B --> C[Thread 1: Chunk]
B --> D[Thread 2: Chunk]
B --> E[Thread 3: Chunk]
B --> F[Thread 4: Chunk]
C --> G[ItemReader]
C --> H[ItemProcessor]
C --> I[ItemWriter]
I --> J[Database]
注意事项:
- 线程安全:
FlatFileItemReader
默认线程不安全,建议使用同步代理(如 SynchronizedItemStreamReader
): 1
2
3
4
5
6
7
8
9
10
11
| @Bean
public ItemReader<Product> reader() {
FlatFileItemReader<Product> reader = new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new ClassPathResource("products.csv"))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build();
return new SynchronizedItemStreamReader<>(reader);
}
|
- 资源竞争:多线程写入数据库可能导致锁竞争,需优化数据库配置(如连接池大小)。
- 线程数选择:根据 CPU 核心数和任务类型设置,通常为
CPU 核心数 * 2
或稍高。
适用场景:
- 数据量适中(百万级以下)。
- 任务可并行(如文件读取、简单计算)。
- 单机环境,无需分布式处理。
分区 将大数据集分割为多个子集,每个子集由独立的 Slave Step 处理。分区支持单机多线程或分布式环境(如多节点),适合海量数据处理。
分区涉及以下组件:
- Partitioner:定义如何分割数据,生成分区元数据(如文件路径、数据库范围)。
- PartitionHandler:管理 Slave Step 的执行,可使用本地线程池或远程节点。
- Slave Step:处理单个分区的数据,包含独立的 Reader/Processor/Writer。
示例:分区处理大型数据库表
假设 source_product
表包含千万条记录,我们按 ID 范围分区处理,转换为 product
表。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
| package com.example.springbatchdemo.config;
import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class PartitionConfiguration {
@Bean
public Partitioner partitioner() {
return new RangePartitioner();
}
@Bean
@StepScope
public JdbcCursorItemReader<Product> partitionReader(@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId,
DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Product>()
.name("partitionReader")
.dataSource(dataSource)
.sql("SELECT id, name, price FROM source_product WHERE id >= ? AND id <= ?")
.preparedStatementSetter((ps) -> {
ps.setLong(1, minId);
ps.setLong(2, maxId);
})
.beanRowMapper(Product.class)
.build();
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> partitionWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("slaveStep", jobRepository)
.<Product, Product>chunk(1000)
.reader(partitionReader(null, null, dataSource))
.processor(processor())
.writer(partitionWriter(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Step masterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
Step slaveStep) {
return new StepBuilder("masterStep", jobRepository)
.partitioner("slaveStep", partitioner())
.step(slaveStep)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Job partitionedJob(JobRepository jobRepository, Step masterStep) {
return new JobBuilder("partitionedJob", jobRepository)
.start(masterStep)
.build();
}
@Bean
public org.springframework.core.task.TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(4); // 4 个分区并行
return executor;
}
}
|
自定义 Partitioner:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| package com.example.springbatchdemo.config;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
Long minId = jdbcTemplate.queryForObject("SELECT MIN(id) FROM source_product", Long.class);
Long maxId = jdbcTemplate.queryForObject("SELECT MAX(id) FROM source_product", Long.class);
long targetSize = (maxId - minId) / gridSize + 1;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putLong("minId", minId + i * targetSize);
context.putLong("maxId", minId + (i + 1) * targetSize - 1);
result.put("partition" + i, context);
}
return result;
}
}
|
Processor 实现(同前):
1
2
3
4
5
6
7
8
9
10
11
12
| public class ProductItemProcessor implements ItemProcessor<Product, Product> {
private static final double EXCHANGE_RATE = 0.14;
@Override
public Product process(Product item) {
if (item.getPrice() <= 0) {
return null;
}
item.setPrice(item.getPrice() * EXCHANGE_RATE);
return item;
}
}
|
说明:
- Partitioner:
RangePartitioner
根据 ID 范围分割数据,生成 4 个分区(gridSize=4)。 - Master Step:协调分区,分配给 Slave Step。
- Slave Step:每个分区使用独立的 Reader(按 ID 范围查询),处理自己的数据子集。
- TaskExecutor:4 个线程并行执行分区。
chunk(1000)
:每个分区内每 1000 条记录提交一次事务。
流程图:
graph TD
A[Master Step] --> B[Partitioner]
B --> C[Partition 1: ID 1-250k]
B --> D[Partition 2: ID 250k-500k]
B --> E[Partition 3: ID 500k-750k]
B --> F[Partition 4: ID 750k-1M]
C --> G[Slave Step]
D --> H[Slave Step]
E --> I[Slave Step]
F --> J[Slave Step]
G --> K[ItemReader: Query Range]
G --> L[ItemProcessor]
G --> M[ItemWriter]
M --> N[Database]
运行结果:
- 数据被分割为 4 个 ID 范围,4 个线程并行处理。
- 每个分区独立读取、处理和写入,性能接近线性提升。
注意事项:
- 数据隔离:确保分区之间无重叠(如 ID 范围互斥)。
- 资源限制:线程数不宜过多,建议与数据库连接池大小匹配。
- 分布式分区:通过
MessageChannelPartitionHandler
可将 Slave Step 分发到远程节点(需要 Spring Integration)。
适用场景:
- 数据量巨大(千万级以上)。
- 数据可分割(如按 ID、日期、文件)。
- 单机或分布式环境。
并行 Job 允许同时运行多个独立 Job,适合无依赖关系的任务(如处理不同文件的 ETL)。
通过 FlowBuilder
和 SplitState
配置并行 Job。
示例:并行处理两个 CSV 文件
假设有两个文件 products1.csv
和 products2.csv
,分别由两个 Job 处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
| package com.example.springbatchdemo.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class ParallelJobConfiguration {
@Bean
@JobScope
public FlatFileItemReader<Product> reader1(@Value("#{jobParameters['fileName']}") String fileName) {
return new FlatFileItemReaderBuilder<Product>()
.name("reader1")
.resource(new ClassPathResource(fileName))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build();
}
@Bean
@JobScope
public FlatFileItemReader<Product> reader2(@Value("#{jobParameters['fileName']}") String fileName) {
return new FlatFileItemReaderBuilder<Product>()
.name("reader2")
.resource(new ClassPathResource(fileName))
.delimited()
.names("id", "name", "price")
.targetType(Product.class)
.build();
}
@Bean
public ProductItemProcessor processor() {
return new ProductItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Product, Product>chunk(100)
.reader(reader1("products1.csv"))
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step2", jobRepository)
.<Product, Product>chunk(100)
.reader(reader2("products2.csv"))
.processor(processor())
.writer(writer(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Job parallelJob(JobRepository jobRepository, Step step1, Step step2) {
Flow flow1 = new FlowBuilder<Flow>("flow1").start(step1).build();
Flow flow2 = new FlowBuilder<Flow>("flow2").start(step2).build();
return new JobBuilder("parallelJob", jobRepository)
.start(new FlowBuilder<Flow>("splitFlow")
.split(taskExecutor())
.add(flow1, flow2)
.build())
.end()
.build();
}
@Bean
public org.springframework.core.task.TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor();
}
}
|
说明:
FlowBuilder
定义两个 Flow,分别包含 step1
和 step2
。.split(taskExecutor())
并行执行两个 Flow。- 每个 Step 处理一个 CSV 文件,写入同一数据库表。
流程图:
graph TD
A[Parallel Job] --> B[Split Flow]
B --> C[Flow 1: Step 1]
B --> D[Flow 2: Step 2]
C --> E[Reader: products1.csv]
C --> F[Processor]
C --> G[Writer]
D --> H[Reader: products2.csv]
D --> I[Processor]
D --> J[Writer]
G --> K[Database]
J --> K
运行结果:
products1.csv
和 products2.csv
并行处理,写入数据库。- 性能接近两倍提升(受 IO 和数据库限制)。
注意事项:
- 独立性:Job 之间应无依赖,否则需使用条件流。
- 资源竞争:多 Job 写入同一表可能导致锁竞争,需优化数据库。
- 线程管理:避免创建过多线程,建议限制 TaskExecutor 的并发数。
适用场景:
- 多个独立任务(如不同文件的 ETL)。
- 任务间无顺序依赖。
- 资源充足(如多核 CPU、大内存)。
除了并行处理,性能优化还涉及以下方面:
- Chunk 大小:影响内存使用和 IO 效率。
- 小 Chunk(10-100):适合内存受限或数据量小,减少事务开销。
- 大 Chunk(1000-10000):适合大数据量,减少提交频率。
- 测试方法:逐步调整 Chunk 大小(如 100、500、1000),监控运行时间和内存使用。
示例:
- 批量写入:使用
JdbcBatchItemWriter
而非逐条插入。 - 连接池:配置足够大的数据库连接池(如 HikariCP,默认 10 个连接)。
- 索引优化:为频繁查询的字段(如 ID)添加索引。
- 分页读取:使用
JpaPagingItemReader
或 JdbcPagingItemReader
避免全表扫描。
示例:分页读取:
1
2
3
4
5
6
7
8
9
10
11
12
| @Bean
public JdbcPagingItemReader<Product> pagingReader(DataSource dataSource) {
return new JdbcPagingItemReaderBuilder<Product>()
.name("pagingReader")
.dataSource(dataSource)
.selectClause("SELECT id, name, price")
.fromClause("FROM source_product")
.sortKeys(Map.of("id", Order.ASCENDING))
.pageSize(1000)
.beanRowMapper(Product.class)
.build();
}
|
- Reader 缓冲:配置
FlatFileItemReader
的 bufferedReader
大小,减少 IO。 - Writer 缓存:启用数据库批量写入缓存(如
spring.jpa.properties.hibernate.jdbc.batch_size=50
)。 - Processor 优化:避免复杂计算,必要时使用缓存(如内存 Map)。
使用 AsyncItemProcessor
和 AsyncItemWriter
异步处理和写入。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| @Bean
public AsyncItemProcessor<Product, Product> asyncProcessor() {
AsyncItemProcessor<Product, Product> processor = new AsyncItemProcessor<>();
processor.setDelegate(processor());
processor.setTaskExecutor(taskExecutor());
return processor;
}
@Bean
public AsyncItemWriter<Product> asyncWriter(DataSource dataSource) {
AsyncItemWriter<Product> writer = new AsyncItemWriter<>();
writer.setDelegate(writer(dataSource));
return writer;
}
@Bean
public Step asyncStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("asyncStep", jobRepository)
.<Product, Product>chunk(100)
.reader(reader())
.processor(asyncProcessor())
.writer(asyncWriter(dataSource))
.transactionManager(transactionManager)
.build();
}
|
说明:
AsyncItemProcessor
和 AsyncItemWriter
将处理和写入操作放入线程池。- 提高吞吐量,但增加复杂性,需确保线程安全。
- 日志:启用详细日志(如
spring.batch.*=DEBUG
),分析瓶颈。 - 监控工具:使用 JMX、Prometheus 或 Spring Actuator 监控 Job 性能。
- 性能测试:模拟生产数据量,比较不同配置的效果。
以下是一个综合示例,结合分区和多线程处理数据库表:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
| @Configuration
public class OptimizedConfiguration {
@Bean
public Partitioner partitioner(DataSource dataSource) {
return new RangePartitioner(dataSource);
}
@Bean
@StepScope
public JdbcPagingItemReader<Product> partitionReader(@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId,
DataSource dataSource) {
return new JdbcPagingItemReaderBuilder<Product>()
.name("partitionReader")
.dataSource(dataSource)
.selectClause("SELECT id, name, price")
.fromClause("FROM source_product")
.whereClause("WHERE id >= :minId AND id <= :maxId")
.parameterValues(Map.of("minId", minId, "maxId", maxId))
.sortKeys(Map.of("id", Order.ASCENDING))
.pageSize(1000)
.beanRowMapper(Product.class)
.build();
}
@Bean
public AsyncItemProcessor<Product, Product> asyncProcessor() {
AsyncItemProcessor<Product, Product> processor = new AsyncItemProcessor<>();
processor.setDelegate(new ProductItemProcessor());
processor.setTaskExecutor(taskExecutor());
return processor;
}
@Bean
public AsyncItemWriter<Product> asyncWriter(DataSource dataSource) {
AsyncItemWriter<Product> writer = new AsyncItemWriter<>();
writer.setDelegate(new JdbcBatchItemWriterBuilder<Product>()
.sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
.dataSource(dataSource)
.beanMapped()
.build());
return writer;
}
@Bean
public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("slaveStep", jobRepository)
.<Product, Product>chunk(1000)
.reader(partitionReader(null, null, dataSource))
.processor(asyncProcessor())
.writer(asyncWriter(dataSource))
.transactionManager(transactionManager)
.build();
}
@Bean
public Step masterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
Step slaveStep) {
return new StepBuilder("masterStep", jobRepository)
.partitioner("slaveStep", partitioner(dataSource))
.step(slaveStep)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Job optimizedJob(JobRepository jobRepository, Step masterStep) {
return new JobBuilder("optimizedJob", jobRepository)
.start(masterStep)
.build();
}
@Bean
public org.springframework.core.task.TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
executor.setConcurrencyLimit(8); // 8 个线程
return executor;
}
}
|
说明:
- 分区:按 ID 范围分割数据。
- 分页读取:使用
JdbcPagingItemReader
优化数据库查询。 - 异步处理:
AsyncItemProcessor
和 AsyncItemWriter
提高吞吐量。 - 多线程:8 个线程并行处理分区。
多线程 Step:
- 确保 Reader/Processor/Writer 线程安全。
- 限制线程数,避免资源耗尽。
- 测试不同线程数,找到最佳配置。
分区:
- 设计合理的分区策略(如按 ID、时间)。
- 使用分页读取优化数据库性能。
- 考虑分布式分区,扩展到多节点。
并行 Job:
- 确保 Job 独立,避免数据冲突。
- 监控数据库锁和连接池使用。
性能优化:
- 调整 Chunk 大小,平衡内存和效率。
- 优化数据库配置(如批量写入、索引)。
- 使用异步处理提高吞吐量。
Q:多线程 Step 和分区如何选择?
A:多线程 Step 适合简单并行任务,配置简单;分区适合大数据量,可扩展到分布式环境。
Q:并行处理导致数据重复怎么办?
A:确保分区互斥(如 ID 范围不重叠),使用事务隔离写入。
Q:如何调试性能瓶颈?
A:启用详细日志,分析 Reader/Processor/Writer 的耗时,使用监控工具定位问题。
本文详细讲解了 Spring Batch 的并行处理机制(多线程 Step、分区、并行 Job)和性能优化技巧。通过示例和 Mermaid 图表,你学会了如何加速批处理任务。下一篇文章将聚焦 Spring Batch 与数据库集成,内容包括:
- 使用 JDBC、JPA 和 MyBatis 读写数据库。
- 配置事务管理。
- 优化数据库性能的实践。