Spring Batch 专题系列(六):并行处理与性能优化

Spring Batch 专题系列(六):并行处理与性能优化

1. 引言

在上一篇文章中,我们学习了 Spring Batch 的错误处理机制(Skip、Retry、Restart 和 Listener),掌握了如何提升作业的健壮性。随着数据量的增加,批处理任务的性能成为关键挑战。Spring Batch 提供了强大的并行处理功能,包括多线程 Step、分区(Partitioning)和并行 Job,能够显著缩短运行时间。此外,性能优化还涉及 Chunk 大小、缓冲区配置等细节。

本文将聚焦以下内容:

  • 多线程 Step:使用线程池并行执行 Step。
  • 分区(Partitioning):将大数据集分割为多个子集并行处理。
  • 并行 Job:同时运行多个独立 Job。
  • 性能优化技巧:调整 Chunk 大小、优化数据库交互等。
  • 通过代码示例和 Mermaid 图表展示并行处理和优化的实现。

通过本文,你将学会如何利用 Spring Batch 的并行机制处理海量数据,并优化作业性能,为生产环境提供高效的批处理解决方案。

2. 并行处理的核心概念

Spring Batch 的并行处理旨在通过并发执行任务来提高吞吐量,主要包括以下方式:

  • 多线程 Step:在单个 Step 内使用线程池并行处理 Chunk,适合 CPU 密集型或 IO 密集型任务。
  • 分区(Partitioning):将大数据集分割为多个子集,每个子集由独立的 Step 处理,可分布在多线程或多节点上。
  • 并行 Job:同时运行多个独立 Job,适合无依赖关系的任务。
  • 异步执行:通过异步 JobLauncher 并发启动 Job。

这些机制依赖 Spring Batch 的任务执行器(TaskExecutor)和分区管理器(PartitionHandler)。性能优化的关键在于合理配置线程数、Chunk 大小和数据源访问。

并行处理流程图

以下是用 Mermaid 绘制的 Spring Batch 并行处理概览图,展示多线程 Step 和分区的关系:

graph TD
    A[Job] --> B[Partitioned Step]
    A --> C[Multi-Threaded Step]
    B --> D[Partitioner]
    D --> E[Slave Step 1]
    D --> F[Slave Step 2]
    D --> G[Slave Step N]
    E --> H[ItemReader]
    E --> I[ItemProcessor]
    E --> J[ItemWriter]
    C --> K[Thread 1: Chunk]
    C --> L[Thread 2: Chunk]
    C --> M[Thread N: Chunk]
    K --> N[ItemReader]
    K --> O[ItemProcessor]
    K --> P[ItemWriter]
    A --> Q[JobRepository]
    Q -->|存储元数据| R[Database]

说明

  • Partitioned Step:通过 Partitioner 将数据分割,分配给多个 Slave Step 并行执行。
  • Multi-Threaded Step:单个 Step 使用线程池并行处理 Chunk。
  • JobRepository 记录所有执行状态,确保数据一致性。

3. 多线程 Step

多线程 Step 使用线程池在单个 Step 内并行处理 Chunk,适合数据量适中且任务可并行的场景(如文件读取、简单转换)。

3.1 配置多线程 Step

通过 StepBuilder.taskExecutor() 配置线程池。

示例:多线程读取 CSV 并写入数据库

假设 products.csv 包含大量记录,我们希望通过多线程加速处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class BatchConfiguration {

    @Bean
    public FlatFileItemReader<Product> reader() {
        return new FlatFileItemReaderBuilder<Product>()
                .name("productReader")
                .resource(new ClassPathResource("products.csv"))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step multiThreadedStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("multiThreadedStep", jobRepository)
                .<Product, Product>chunk(100) // 每 100 条一个 Chunk
                .reader(reader())
                .processor(processor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .taskExecutor(taskExecutor())
                .throttleLimit(4) // 最大 4 个线程
                .build();
    }

    @Bean
    public Job multiThreadedJob(JobRepository jobRepository, Step multiThreadedStep) {
        return new JobBuilder("multiThreadedJob", jobRepository)
                .start(multiThreadedStep)
                .build();
    }

    @Bean
    public org.springframework.core.task.TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        executor.setConcurrencyLimit(4); // 限制最大线程数
        return executor;
    }
}

Processor 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;

public class ProductItemProcessor implements ItemProcessor<Product, Product> {
    private static final double EXCHANGE_RATE = 0.14;

    @Override
    public Product process(Product item) {
        if (item.getPrice() <= 0) {
            return null;
        }
        item.setPrice(item.getPrice() * EXCHANGE_RATE);
        return item;
    }
}

实体类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package com.example.springbatchdemo.entity;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
public class Product {
    @Id
    private Long id;
    private String name;
    private Double price;
}

说明

  • .taskExecutor(taskExecutor()) 指定线程池,SimpleAsyncTaskExecutor 创建新线程。
  • .throttleLimit(4) 限制最大并发线程数为 4。
  • chunk(100) 设置每个 Chunk 处理 100 条记录。
  • 每个线程独立处理一个 Chunk,Reader/Processor/Writer 必须线程安全。

运行结果

  • 4 个线程并行处理 CSV 文件,显著缩短运行时间。
  • 数据库写入仍按 Chunk 提交事务,保证一致性。

流程图

graph TD
    A[Multi-Threaded Step] --> B[Thread Pool]
    B --> C[Thread 1: Chunk]
    B --> D[Thread 2: Chunk]
    B --> E[Thread 3: Chunk]
    B --> F[Thread 4: Chunk]
    C --> G[ItemReader]
    C --> H[ItemProcessor]
    C --> I[ItemWriter]
    I --> J[Database]

注意事项

  • 线程安全FlatFileItemReader 默认线程不安全,建议使用同步代理(如 SynchronizedItemStreamReader):
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    @Bean
    public ItemReader<Product> reader() {
        FlatFileItemReader<Product> reader = new FlatFileItemReaderBuilder<Product>()
                .name("productReader")
                .resource(new ClassPathResource("products.csv"))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
        return new SynchronizedItemStreamReader<>(reader);
    }
    
  • 资源竞争:多线程写入数据库可能导致锁竞争,需优化数据库配置(如连接池大小)。
  • 线程数选择:根据 CPU 核心数和任务类型设置,通常为 CPU 核心数 * 2 或稍高。

适用场景

  • 数据量适中(百万级以下)。
  • 任务可并行(如文件读取、简单计算)。
  • 单机环境,无需分布式处理。

4. 分区(Partitioning)

分区 将大数据集分割为多个子集,每个子集由独立的 Slave Step 处理。分区支持单机多线程或分布式环境(如多节点),适合海量数据处理。

4.1 分区工作原理

分区涉及以下组件:

  • Partitioner:定义如何分割数据,生成分区元数据(如文件路径、数据库范围)。
  • PartitionHandler:管理 Slave Step 的执行,可使用本地线程池或远程节点。
  • Slave Step:处理单个分区的数据,包含独立的 Reader/Processor/Writer。

4.2 配置分区

示例:分区处理大型数据库表

假设 source_product 表包含千万条记录,我们按 ID 范围分区处理,转换为 product 表。

代码实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class PartitionConfiguration {

    @Bean
    public Partitioner partitioner() {
        return new RangePartitioner();
    }

    @Bean
    @StepScope
    public JdbcCursorItemReader<Product> partitionReader(@Value("#{stepExecutionContext['minId']}") Long minId,
                                                        @Value("#{stepExecutionContext['maxId']}") Long maxId,
                                                        DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<Product>()
                .name("partitionReader")
                .dataSource(dataSource)
                .sql("SELECT id, name, price FROM source_product WHERE id >= ? AND id <= ?")
                .preparedStatementSetter((ps) -> {
                    ps.setLong(1, minId);
                    ps.setLong(2, maxId);
                })
                .beanRowMapper(Product.class)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> partitionWriter(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("slaveStep", jobRepository)
                .<Product, Product>chunk(1000)
                .reader(partitionReader(null, null, dataSource))
                .processor(processor())
                .writer(partitionWriter(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Step masterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                           Step slaveStep) {
        return new StepBuilder("masterStep", jobRepository)
                .partitioner("slaveStep", partitioner())
                .step(slaveStep)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Job partitionedJob(JobRepository jobRepository, Step masterStep) {
        return new JobBuilder("partitionedJob", jobRepository)
                .start(masterStep)
                .build();
    }

    @Bean
    public org.springframework.core.task.TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        executor.setConcurrencyLimit(4); // 4 个分区并行
        return executor;
    }
}

自定义 Partitioner

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.example.springbatchdemo.config;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

public class RangePartitioner implements Partitioner {

    private final JdbcTemplate jdbcTemplate;

    public RangePartitioner(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<>();
        Long minId = jdbcTemplate.queryForObject("SELECT MIN(id) FROM source_product", Long.class);
        Long maxId = jdbcTemplate.queryForObject("SELECT MAX(id) FROM source_product", Long.class);
        long targetSize = (maxId - minId) / gridSize + 1;

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putLong("minId", minId + i * targetSize);
            context.putLong("maxId", minId + (i + 1) * targetSize - 1);
            result.put("partition" + i, context);
        }
        return result;
    }
}

Processor 实现(同前):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class ProductItemProcessor implements ItemProcessor<Product, Product> {
    private static final double EXCHANGE_RATE = 0.14;

    @Override
    public Product process(Product item) {
        if (item.getPrice() <= 0) {
            return null;
        }
        item.setPrice(item.getPrice() * EXCHANGE_RATE);
        return item;
    }
}

说明

  • PartitionerRangePartitioner 根据 ID 范围分割数据,生成 4 个分区(gridSize=4)。
  • Master Step:协调分区,分配给 Slave Step。
  • Slave Step:每个分区使用独立的 Reader(按 ID 范围查询),处理自己的数据子集。
  • TaskExecutor:4 个线程并行执行分区。
  • chunk(1000):每个分区内每 1000 条记录提交一次事务。

流程图

graph TD
    A[Master Step] --> B[Partitioner]
    B --> C[Partition 1: ID 1-250k]
    B --> D[Partition 2: ID 250k-500k]
    B --> E[Partition 3: ID 500k-750k]
    B --> F[Partition 4: ID 750k-1M]
    C --> G[Slave Step]
    D --> H[Slave Step]
    E --> I[Slave Step]
    F --> J[Slave Step]
    G --> K[ItemReader: Query Range]
    G --> L[ItemProcessor]
    G --> M[ItemWriter]
    M --> N[Database]

运行结果

  • 数据被分割为 4 个 ID 范围,4 个线程并行处理。
  • 每个分区独立读取、处理和写入,性能接近线性提升。

注意事项

  • 数据隔离:确保分区之间无重叠(如 ID 范围互斥)。
  • 资源限制:线程数不宜过多,建议与数据库连接池大小匹配。
  • 分布式分区:通过 MessageChannelPartitionHandler 可将 Slave Step 分发到远程节点(需要 Spring Integration)。

适用场景

  • 数据量巨大(千万级以上)。
  • 数据可分割(如按 ID、日期、文件)。
  • 单机或分布式环境。

5. 并行 Job

并行 Job 允许同时运行多个独立 Job,适合无依赖关系的任务(如处理不同文件的 ETL)。

5.1 配置并行 Job

通过 FlowBuilderSplitState 配置并行 Job。

示例:并行处理两个 CSV 文件

假设有两个文件 products1.csvproducts2.csv,分别由两个 Job 处理。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
package com.example.springbatchdemo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class ParallelJobConfiguration {

    @Bean
    @JobScope
    public FlatFileItemReader<Product> reader1(@Value("#{jobParameters['fileName']}") String fileName) {
        return new FlatFileItemReaderBuilder<Product>()
                .name("reader1")
                .resource(new ClassPathResource(fileName))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
    }

    @Bean
    @JobScope
    public FlatFileItemReader<Product> reader2(@Value("#{jobParameters['fileName']}") String fileName) {
        return new FlatFileItemReaderBuilder<Product>()
                .name("reader2")
                .resource(new ClassPathResource(fileName))
                .delimited()
                .names("id", "name", "price")
                .targetType(Product.class)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("step1", jobRepository)
                .<Product, Product>chunk(100)
                .reader(reader1("products1.csv"))
                .processor(processor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Step step2(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("step2", jobRepository)
                .<Product, Product>chunk(100)
                .reader(reader2("products2.csv"))
                .processor(processor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job parallelJob(JobRepository jobRepository, Step step1, Step step2) {
        Flow flow1 = new FlowBuilder<Flow>("flow1").start(step1).build();
        Flow flow2 = new FlowBuilder<Flow>("flow2").start(step2).build();
        return new JobBuilder("parallelJob", jobRepository)
                .start(new FlowBuilder<Flow>("splitFlow")
                        .split(taskExecutor())
                        .add(flow1, flow2)
                        .build())
                .end()
                .build();
    }

    @Bean
    public org.springframework.core.task.TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }
}

说明

  • FlowBuilder 定义两个 Flow,分别包含 step1step2
  • .split(taskExecutor()) 并行执行两个 Flow。
  • 每个 Step 处理一个 CSV 文件,写入同一数据库表。

流程图

graph TD
    A[Parallel Job] --> B[Split Flow]
    B --> C[Flow 1: Step 1]
    B --> D[Flow 2: Step 2]
    C --> E[Reader: products1.csv]
    C --> F[Processor]
    C --> G[Writer]
    D --> H[Reader: products2.csv]
    D --> I[Processor]
    D --> J[Writer]
    G --> K[Database]
    J --> K

运行结果

  • products1.csvproducts2.csv 并行处理,写入数据库。
  • 性能接近两倍提升(受 IO 和数据库限制)。

注意事项

  • 独立性:Job 之间应无依赖,否则需使用条件流。
  • 资源竞争:多 Job 写入同一表可能导致锁竞争,需优化数据库。
  • 线程管理:避免创建过多线程,建议限制 TaskExecutor 的并发数。

适用场景

  • 多个独立任务(如不同文件的 ETL)。
  • 任务间无顺序依赖。
  • 资源充足(如多核 CPU、大内存)。

6. 性能优化技巧

除了并行处理,性能优化还涉及以下方面:

6.1 调整 Chunk 大小

  • Chunk 大小:影响内存使用和 IO 效率。
    • 小 Chunk(10-100):适合内存受限或数据量小,减少事务开销。
    • 大 Chunk(1000-10000):适合大数据量,减少提交频率。
  • 测试方法:逐步调整 Chunk 大小(如 100、500、1000),监控运行时间和内存使用。

示例

1
.chunk(1000) // 适合大数据量

6.2 优化数据库交互

  • 批量写入:使用 JdbcBatchItemWriter 而非逐条插入。
  • 连接池:配置足够大的数据库连接池(如 HikariCP,默认 10 个连接)。
  • 索引优化:为频繁查询的字段(如 ID)添加索引。
  • 分页读取:使用 JpaPagingItemReaderJdbcPagingItemReader 避免全表扫描。

示例:分页读取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Bean
public JdbcPagingItemReader<Product> pagingReader(DataSource dataSource) {
    return new JdbcPagingItemReaderBuilder<Product>()
            .name("pagingReader")
            .dataSource(dataSource)
            .selectClause("SELECT id, name, price")
            .fromClause("FROM source_product")
            .sortKeys(Map.of("id", Order.ASCENDING))
            .pageSize(1000)
            .beanRowMapper(Product.class)
            .build();
}

6.3 缓冲区和缓存

  • Reader 缓冲:配置 FlatFileItemReaderbufferedReader 大小,减少 IO。
  • Writer 缓存:启用数据库批量写入缓存(如 spring.jpa.properties.hibernate.jdbc.batch_size=50)。
  • Processor 优化:避免复杂计算,必要时使用缓存(如内存 Map)。

6.4 异步执行

使用 AsyncItemProcessorAsyncItemWriter 异步处理和写入。

示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Bean
public AsyncItemProcessor<Product, Product> asyncProcessor() {
    AsyncItemProcessor<Product, Product> processor = new AsyncItemProcessor<>();
    processor.setDelegate(processor());
    processor.setTaskExecutor(taskExecutor());
    return processor;
}

@Bean
public AsyncItemWriter<Product> asyncWriter(DataSource dataSource) {
    AsyncItemWriter<Product> writer = new AsyncItemWriter<>();
    writer.setDelegate(writer(dataSource));
    return writer;
}

@Bean
public Step asyncStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("asyncStep", jobRepository)
            .<Product, Product>chunk(100)
            .reader(reader())
            .processor(asyncProcessor())
            .writer(asyncWriter(dataSource))
            .transactionManager(transactionManager)
            .build();
}

说明

  • AsyncItemProcessorAsyncItemWriter 将处理和写入操作放入线程池。
  • 提高吞吐量,但增加复杂性,需确保线程安全。

6.5 监控与分析

  • 日志:启用详细日志(如 spring.batch.*=DEBUG),分析瓶颈。
  • 监控工具:使用 JMX、Prometheus 或 Spring Actuator 监控 Job 性能。
  • 性能测试:模拟生产数据量,比较不同配置的效果。

7. 综合示例:分区 + 多线程

以下是一个综合示例,结合分区和多线程处理数据库表:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@Configuration
public class OptimizedConfiguration {

    @Bean
    public Partitioner partitioner(DataSource dataSource) {
        return new RangePartitioner(dataSource);
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader<Product> partitionReader(@Value("#{stepExecutionContext['minId']}") Long minId,
                                                        @Value("#{stepExecutionContext['maxId']}") Long maxId,
                                                        DataSource dataSource) {
        return new JdbcPagingItemReaderBuilder<Product>()
                .name("partitionReader")
                .dataSource(dataSource)
                .selectClause("SELECT id, name, price")
                .fromClause("FROM source_product")
                .whereClause("WHERE id >= :minId AND id <= :maxId")
                .parameterValues(Map.of("minId", minId, "maxId", maxId))
                .sortKeys(Map.of("id", Order.ASCENDING))
                .pageSize(1000)
                .beanRowMapper(Product.class)
                .build();
    }

    @Bean
    public AsyncItemProcessor<Product, Product> asyncProcessor() {
        AsyncItemProcessor<Product, Product> processor = new AsyncItemProcessor<>();
        processor.setDelegate(new ProductItemProcessor());
        processor.setTaskExecutor(taskExecutor());
        return processor;
    }

    @Bean
    public AsyncItemWriter<Product> asyncWriter(DataSource dataSource) {
        AsyncItemWriter<Product> writer = new AsyncItemWriter<>();
        writer.setDelegate(new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build());
        return writer;
    }

    @Bean
    public Step slaveStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("slaveStep", jobRepository)
                .<Product, Product>chunk(1000)
                .reader(partitionReader(null, null, dataSource))
                .processor(asyncProcessor())
                .writer(asyncWriter(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Step masterStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                           Step slaveStep) {
        return new StepBuilder("masterStep", jobRepository)
                .partitioner("slaveStep", partitioner(dataSource))
                .step(slaveStep)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Job optimizedJob(JobRepository jobRepository, Step masterStep) {
        return new JobBuilder("optimizedJob", jobRepository)
                .start(masterStep)
                .build();
    }

    @Bean
    public org.springframework.core.task.TaskExecutor taskExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        executor.setConcurrencyLimit(8); // 8 个线程
        return executor;
    }
}

说明

  • 分区:按 ID 范围分割数据。
  • 分页读取:使用 JdbcPagingItemReader 优化数据库查询。
  • 异步处理AsyncItemProcessorAsyncItemWriter 提高吞吐量。
  • 多线程:8 个线程并行处理分区。

8. 最佳实践

  1. 多线程 Step

    • 确保 Reader/Processor/Writer 线程安全。
    • 限制线程数,避免资源耗尽。
    • 测试不同线程数,找到最佳配置。
  2. 分区

    • 设计合理的分区策略(如按 ID、时间)。
    • 使用分页读取优化数据库性能。
    • 考虑分布式分区,扩展到多节点。
  3. 并行 Job

    • 确保 Job 独立,避免数据冲突。
    • 监控数据库锁和连接池使用。
  4. 性能优化

    • 调整 Chunk 大小,平衡内存和效率。
    • 优化数据库配置(如批量写入、索引)。
    • 使用异步处理提高吞吐量。

9. 常见问题与解答

  • Q:多线程 Step 和分区如何选择?
    A:多线程 Step 适合简单并行任务,配置简单;分区适合大数据量,可扩展到分布式环境。

  • Q:并行处理导致数据重复怎么办?
    A:确保分区互斥(如 ID 范围不重叠),使用事务隔离写入。

  • Q:如何调试性能瓶颈?
    A:启用详细日志,分析 Reader/Processor/Writer 的耗时,使用监控工具定位问题。

10. 下一步

本文详细讲解了 Spring Batch 的并行处理机制(多线程 Step、分区、并行 Job)和性能优化技巧。通过示例和 Mermaid 图表,你学会了如何加速批处理任务。下一篇文章将聚焦 Spring Batch 与数据库集成,内容包括:

  • 使用 JDBC、JPA 和 MyBatis 读写数据库。
  • 配置事务管理。
  • 优化数据库性能的实践。
updatedupdated2025-04-172025-04-17