Spring Batch 专题系列(七):Spring Batch 与数据库集成

Spring Batch 专题系列(七):Spring Batch 与数据库集成

1. 引言

在上一篇文章中,我们学习了 Spring Batch 的并行处理机制(多线程 Step、分区、并行 Job)和性能优化技巧,掌握了如何处理海量数据。在批处理任务中,数据库操作是最常见的场景之一,例如从源表读取数据,经过转换后写入目标表(ETL)、批量更新记录或生成报表。Spring Batch 提供了强大的数据库集成支持,与主流 ORM 框架(如 JDBC、JPA、MyBatis)无缝协作。

本文将聚焦以下内容:

  • 使用 JDBC 实现高效的数据库读写。
  • 使用 JPA 处理复杂实体关系。
  • 使用 MyBatis 提供灵活的 SQL 控制。
  • 配置事务管理,确保数据一致性。
  • 优化数据库性能的实践(如批量操作、分页读取)。
  • 通过代码示例和 Mermaid 图表展示数据库集成流程。

通过本文,你将学会如何在 Spring Batch 中高效操作数据库,为生产环境中的 ETL、数据迁移等任务提供可靠支持。

2. 数据库集成的核心概念

Spring Batch 的数据库集成主要涉及以下组件:

  • ItemReader:从数据库读取数据,如 JdbcCursorItemReaderJpaPagingItemReaderMyBatisCursorItemReader
  • ItemWriter:将数据写入数据库,如 JdbcBatchItemWriterJpaItemWriterMyBatisBatchItemWriter
  • 事务管理:通过 Spring 的事务管理器(PlatformTransactionManager)确保 Chunk 级别的事务一致性。
  • JobRepository:存储 Job 和 Step 的元数据,通常使用数据库实现,支持重启和状态追踪。

数据库操作的性能关键在于:

  • 批量处理:减少 IO 开销,提高吞吐量。
  • 分页或游标:避免一次性加载大数据集。
  • 事务优化:合理配置 Chunk 大小和隔离级别。

数据库交互流程图

以下是用 Mermaid 绘制的 Spring Batch 数据库交互流程图,展示从源表读取到目标表写入的过程:

graph TD
    A[Job] --> B[Step]
    B --> C[ItemReader]
    B --> D[ItemProcessor]
    B --> E[ItemWriter]
    C -->|读取| F[Source Database]
    D -->|转换| G[Business Logic]
    E -->|写入| H[Target Database]
    B --> I[Transaction Manager]
    I -->|控制事务| H
    A --> J[JobRepository]
    J -->|存储元数据| K[Meta Database]

说明

  • ItemReader 从源数据库读取数据。
  • ItemProcessor 执行转换或业务逻辑。
  • ItemWriter 将处理后的数据写入目标数据库。
  • Transaction Manager 在每个 Chunk 结束时提交事务。
  • JobRepository 记录作业状态,存储在元数据库(可以与目标数据库相同)。

3. 使用 JDBC 集成数据库

JDBC 是 Spring Batch 最直接的数据库集成方式,提供了高性能的读写能力,适合简单表结构和大规模数据处理。

3.1 JdbcCursorItemReader

JdbcCursorItemReader 使用数据库游标逐行读取数据,适合大数据量,内存占用低。

示例:从 source_product 表读取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
public class JdbcReaderConfig {

    @Bean
    public JdbcCursorItemReader<Product> jdbcReader(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<Product>()
                .name("jdbcReader")
                .dataSource(dataSource)
                .sql("SELECT id, name, price FROM source_product")
                .beanRowMapper(Product.class)
                .build();
    }
}

说明

  • .sql() 定义查询语句。
  • .beanRowMapper(Product.class) 自动映射结果到 Product 实体。
  • 游标读取适合顺序扫描,但不支持分页或动态查询。

3.2 JdbcPagingItemReader

JdbcPagingItemReader 使用分页查询,适合需要排序或动态条件的场景,内存占用可控。

示例:分页读取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Bean
public JdbcPagingItemReader<Product> pagingReader(DataSource dataSource) {
    return new JdbcPagingItemReaderBuilder<Product>()
            .name("pagingReader")
            .dataSource(dataSource)
            .selectClause("SELECT id, name, price")
            .fromClause("FROM source_product")
            .sortKeys(Map.of("id", Order.ASCENDING))
            .pageSize(1000)
            .beanRowMapper(Product.class)
            .build();
}

说明

  • .pageSize(1000) 每次读取 1000 条记录。
  • .sortKeys() 确保数据顺序,防止重复或遗漏。
  • 适合大数据量,分页查询减少数据库压力。

3.3 JdbcBatchItemWriter

JdbcBatchItemWriter 批量写入数据库,高效且支持事务。

示例:写入 product 表

1
2
3
4
5
6
7
8
@Bean
public JdbcBatchItemWriter<Product> jdbcWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Product>()
            .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
            .dataSource(dataSource)
            .beanMapped()
            .build();
}

说明

  • .sql() 定义插入语句,使用命名参数。
  • .beanMapped() 自动映射 Product 字段到 SQL 参数。
  • 批量写入减少数据库 IO。

3.4 综合示例:JDBC ETL

以下是一个完整的 ETL 作业,从 source_product 读取数据,转换价格后写入 product 表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.Map;

@Configuration
public class JdbcBatchConfiguration {

    @Bean
    public JdbcPagingItemReader<Product> reader(DataSource dataSource) {
        return new JdbcPagingItemReaderBuilder<Product>()
                .name("productReader")
                .dataSource(dataSource)
                .selectClause("SELECT id, name, price")
                .fromClause("FROM source_product")
                .sortKeys(Map.of("id", Order.ASCENDING))
                .pageSize(1000)
                .beanRowMapper(Product.class)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step jdbcStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("jdbcStep", jobRepository)
                .<Product, Product>chunk(1000)
                .reader(reader(dataSource))
                .processor(processor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job jdbcJob(JobRepository jobRepository, Step jdbcStep) {
        return new JobBuilder("jdbcJob", jobRepository)
                .start(jdbcStep)
                .build();
    }
}

Processor 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;

public class ProductItemProcessor implements ItemProcessor<Product, Product> {
    private static final double EXCHANGE_RATE = 0.14;

    @Override
    public Product process(Product item) {
        if (item.getPrice() <= 0) {
            return null;
        }
        item.setPrice(item.getPrice() * EXCHANGE_RATE);
        return item;
    }
}

实体类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package com.example.springbatchdemo.entity;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
public class Product {
    @Id
    private Long id;
    private String name;
    private Double price;
}

说明

  • Reader:使用 JdbcPagingItemReader 分页读取 source_product
  • Processor:将价格转换为美元,过滤负值。
  • Writer:使用 JdbcBatchItemWriter 批量写入 product
  • Chunk:每 1000 条记录提交一次事务。

流程图

graph TD
    A[Step: jdbcStep] --> B[JdbcPagingItemReader]
    B -->|读取| C[Source Database: source_product]
    B --> D[ItemProcessor]
    D -->|转换价格| E[JdbcBatchItemWriter]
    E -->|批量写入| F[Target Database: product]
    A --> G[Transaction Manager]
    G -->|提交事务| F

适用场景

  • 简单表结构,无复杂关系。
  • 高性能要求(如 ETL、数据迁移)。
  • 大数据量,需分页或批量处理。

4. 使用 JPA 集成数据库

JPA(通过 Hibernate 等实现)适合处理复杂实体关系,支持对象映射和查询语言(JPQL)。

4.1 JpaPagingItemReader

JpaPagingItemReader 使用分页查询 JPA 实体,适合复杂查询和关系映射。

示例:读取 Product 实体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.example.springbatchdemo.config;

import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;

@Configuration
public class JpaReaderConfig {

    @Bean
    public JpaPagingItemReader<Product> jpaReader(EntityManagerFactory entityManagerFactory) {
        return new JpaPagingItemReaderBuilder<Product>()
                .name("jpaReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT p FROM Product p ORDER BY p.id")
                .pageSize(1000)
                .build();
    }
}

说明

  • .queryString() 使用 JPQL 查询。
  • .pageSize(1000) 每次读取 1000 条记录。
  • 自动映射到 Product 实体。

4.2 JpaItemWriter

JpaItemWriter 使用 JPA 持久化实体,适合插入或更新操作。

示例:写入 Product 实体

1
2
3
4
5
6
@Bean
public JpaItemWriter<Product> jpaWriter(EntityManagerFactory entityManagerFactory) {
    JpaItemWriter<Product> writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManagerFactory);
    return writer;
}

说明

  • 自动调用 EntityManager.persist()merge()
  • 每个 Chunk 共享一个 EntityManager,事务由 Spring Batch 管理。

4.3 综合示例:JPA ETL

以下是一个 JPA 实现的 ETL 作业。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.persistence.EntityManagerFactory;

@Configuration
public class JpaBatchConfiguration {

    @Bean
    public JpaPagingItemReader<Product> jpaReader(EntityManagerFactory entityManagerFactory) {
        return new JpaPagingItemReaderBuilder<Product>()
                .name("jpaReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT p FROM Product p WHERE p.source = 'legacy' ORDER BY p.id")
                .pageSize(1000)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JpaItemWriter<Product> jpaWriter(EntityManagerFactory entityManagerFactory) {
        JpaItemWriter<Product> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManagerFactory);
        return writer;
    }

    @Bean
    public Step jpaStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("jpaStep", jobRepository)
                .<Product, Product>chunk(1000)
                .reader(jpaReader(entityManagerFactory))
                .processor(processor())
                .writer(jpaWriter(entityManagerFactory))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job jpaJob(JobRepository jobRepository, Step jpaStep) {
        return new JobBuilder("jpaJob", jobRepository)
                .start(jpaStep)
                .build();
    }
}

修改实体类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package com.example.springbatchdemo.entity;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
public class Product {
    @Id
    private Long id;
    private String name;
    private Double price;
    private String source; // 新增字段,标识数据来源
}

说明

  • Reader:读取 source='legacy' 的记录。
  • Processor:转换价格(同前)。
  • Writer:将处理后的实体写入数据库。
  • Chunk:每 1000 条提交事务。

适用场景

  • 复杂实体关系(如一对多、多对多)。
  • 需要 JPQL 或 Criteria 查询。
  • 对象模型优先的开发风格。

注意事项

  • 性能:JPA 比 JDBC 慢,适合中小数据量。
  • 缓存:启用 Hibernate 二级缓存,减少查询开销。
  • 批量:配置 hibernate.jdbc.batch_size 启用批量插入。

5. 使用 MyBatis 集成数据库

MyBatis 提供灵活的 SQL 控制,适合需要精细优化的场景。Spring Batch 提供 MyBatisCursorItemReaderMyBatisBatchItemWriter

5.1 添加依赖

pom.xml 中添加 MyBatis 依赖:

1
2
3
4
5
6
7
8
9
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>3.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-integration</artifactId>
</dependency>

5.2 MyBatis Mapper

创建 ProductMapper 接口和 XML 映射文件。

ProductMapper.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package com.example.springbatchdemo.mapper;

import com.example.springbatchdemo.entity.Product;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.cursor.Cursor;

@Mapper
public interface ProductMapper {
    Cursor<Product> selectProducts();
    void insertProduct(Product product);
}

ProductMapper.xmlsrc/main/resources/mapper/ProductMapper.xml):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.springbatchdemo.mapper.ProductMapper">

    <select id="selectProducts" resultType="com.example.springbatchdemo.entity.Product">
        SELECT id, name, price FROM source_product ORDER BY id
    </select>

    <insert id="insertProduct" parameterType="com.example.springbatchdemo.entity.Product">
        INSERT INTO product (id, name, price)
        VALUES (#{id}, #{name}, #{price})
    </insert>

</mapper>

5.3 MyBatis Reader 和 Writer

MyBatisCursorItemReader 使用游标读取,MyBatisBatchItemWriter 批量写入。

示例:MyBatis 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import com.example.springbatchdemo.mapper.ProductMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.builder.MyBatisCursorItemReaderBuilder;
import org.springframework.batch.item.database.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
public class MyBatisBatchConfiguration {

    @Bean
    public MyBatisCursorItemReader<Product> myBatisReader(SqlSessionFactory sqlSessionFactory) {
        return new MyBatisCursorItemReaderBuilder<Product>()
                .sqlSessionFactory(sqlSessionFactory)
                .queryId("com.example.springbatchdemo.mapper.ProductMapper.selectProducts")
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public MyBatisBatchItemWriter<Product> myBatisWriter(SqlSessionFactory sqlSessionFactory) {
        return new MyBatisBatchItemWriterBuilder<Product>()
                .sqlSessionFactory(sqlSessionFactory)
                .statementId("com.example.springbatchdemo.mapper.ProductMapper.insertProduct")
                .build();
    }

    @Bean
    public Step myBatisStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("myBatisStep", jobRepository)
                .<Product, Product>chunk(1000)
                .reader(myBatisReader(sqlSessionFactory))
                .processor(processor())
                .writer(myBatisWriter(sqlSessionFactory))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job myBatisJob(JobRepository jobRepository, Step myBatisStep) {
        return new JobBuilder("myBatisJob", jobRepository)
                .start(myBatisStep)
                .build();
    }
}

说明

  • Reader:调用 selectProducts 查询,使用游标逐行读取。
  • Writer:调用 insertProduct 批量插入。
  • Chunk:每 1000 条提交事务。
  • MyBatis 提供灵活的 SQL 控制,适合复杂查询。

适用场景

  • 需要动态 SQL 或复杂映射。
  • 偏好 XML 或注解定义查询。
  • 性能要求介于 JDBC 和 JPA 之间。

注意事项

  • 批量优化:启用 MyBatis 批量模式(executorType: BATCH)。
  • 游标管理:确保游标及时关闭,防止内存泄漏。
  • 事务:Spring Batch 管理事务,无需 MyBatis 单独配置。

6. 事务管理

Spring Batch 使用 PlatformTransactionManager 管理事务,默认在每个 Chunk 结束时提交。数据库集成需要确保事务配置正确。

6.1 配置事务管理器

JDBC 和 MyBatis

1
2
3
4
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
}

JPA

1
2
3
4
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
    return new JpaTransactionManager(entityManagerFactory);
}

说明

  • 每个 Chunk 开启一个事务,Writer 完成后提交。
  • 如果异常发生,当前 Chunk 回滚,状态保存到 JobRepository。

6.2 事务隔离级别

默认隔离级别为 ISOLATION_DEFAULT(数据库默认,通常为 READ_COMMITTED)。可通过 StepBuilder 配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Bean
public Step jdbcStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("jdbcStep", jobRepository)
            .<Product, Product>chunk(1000)
            .reader(reader(dataSource))
            .processor(processor())
            .writer(writer(dataSource))
            .transactionManager(transactionManager)
            .transactionIsolation(Isolation.READ_COMMITTED)
            .build();
}

选择建议

  • READ_COMMITTED:适合大多数 ETL 任务,防止脏读。
  • REPEATABLE_READ:需要一致性读(如报表生成)。
  • SERIALIZABLE:高并发写入,避免冲突(性能较低)。

7. 优化数据库性能

数据库操作是批处理性能的瓶颈,以下是优化实践:

7.1 批量操作

  • JDBCJdbcBatchItemWriter 默认批量写入,配置 spring.jdbc.batch_size=1000
  • JPA:启用批量插入:
    1
    2
    
    spring.jpa.properties.hibernate.jdbc.batch_size=50
    spring.jpa.properties.hibernate.order_inserts=true
    
  • MyBatis:设置 executorType: BATCH
    1
    
    mybatis.configuration.default-executor-type=BATCH
    

7.2 分页读取

  • 使用 JdbcPagingItemReaderJpaPagingItemReader,设置合理 pageSize(如 1000-10000)。
  • 确保查询使用索引,避免全表扫描。

7.3 索引优化

  • WHEREORDER BY 字段添加索引:
    1
    
    CREATE INDEX idx_source_product_id ON source_product(id);
    
  • 避免频繁更新索引字段,降低写入开销。

7.4 连接池优化

使用 HikariCP(Spring Boot 默认):

1
2
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5

建议:连接数与线程数匹配(如 4 线程配 8-12 连接)。

7.5 事务优化

  • Chunk 大小:根据数据量和内存调整(如 1000-10000)。
  • 隔离级别:选择最低必要级别,减少锁竞争。
  • 只读事务:读取时启用只读模式:
    1
    
    .readerIsTransactionalQueue(true)
    

8. 综合示例:JDBC + JPA 混合

以下是一个混合示例,从 source_product(JDBC)读取,写入 product(JPA)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Configuration
public class MixedBatchConfiguration {

    @Bean
    public JdbcPagingItemReader<Product> jdbcReader(DataSource dataSource) {
        return new JdbcPagingItemReaderBuilder<Product>()
                .name("jdbcReader")
                .dataSource(dataSource)
                .selectClause("SELECT id, name, price")
                .fromClause("FROM source_product")
                .sortKeys(Map.of("id", Order.ASCENDING))
                .pageSize(1000)
                .beanRowMapper(Product.class)
                .build();
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JpaItemWriter<Product> jpaWriter(EntityManagerFactory entityManagerFactory) {
        JpaItemWriter<Product> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManagerFactory);
        return writer;
    }

    @Bean
    public Step mixedStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("mixedStep", jobRepository)
                .<Product, Product>chunk(1000)
                .reader(jdbcReader(dataSource))
                .processor(processor())
                .writer(jpaWriter(entityManagerFactory))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job mixedJob(JobRepository jobRepository, Step mixedStep) {
        return new JobBuilder("mixedJob", jobRepository)
                .start(mixedStep)
                .build();
    }
}

说明

  • Reader:JDBC 分页读取,高效处理大数据。
  • Writer:JPA 写入,适合复杂实体。
  • 事务:由 JpaTransactionManager 管理。

9. 最佳实践

  1. 选择合适的框架

    • JDBC:高性能,简单表结构。
    • JPA:复杂关系,对象模型。
    • MyBatis:灵活 SQL,动态查询。
  2. 性能优化

    • 启用批量写入,减少 IO。
    • 使用分页或游标读取,避免内存溢出。
    • 优化索引和连接池。
  3. 事务管理

    • 合理设置 Chunk 大小,平衡性能和一致性。
    • 选择适当隔离级别,减少锁冲突。
    • 确保 JobRepository 持久化,支持重启。
  4. 错误处理

    • 配置 Skip 和 Retry,处理数据错误。
    • 使用 Listener 记录异常,便于调试。

10. 常见问题与解答

  • Q:JDBC 和 JPA 如何选择?
    A:JDBC 适合高性能 ETL,JPA 适合复杂实体关系,MyBatis 介于两者,灵活性高。

  • Q:分页读取重复数据怎么办?
    A:确保 sortKeys 唯一(如 ID),避免动态数据变更。

  • Q:事务回滚影响性能吗?
    A:大 Chunk 回滚开销高,建议优化 Chunk 大小和错误处理。

11. 下一步

本文详细讲解了 Spring Batch 与数据库的集成,包括 JDBC、JPA 和 MyBatis 的读写方式,以及事务管理和性能优化实践。通过示例和 Mermaid 图表,你学会了如何高效操作数据库。下一篇文章将聚焦 Spring Batch 高级主题:扩展与定制,内容包括:

  • 自定义 Reader、Processor、Writer。
  • 使用监听器和拦截器扩展功能。
  • 配置动态 Job 和 Step。
updatedupdated2025-04-172025-04-17