Spring Batch 专题系列(八):Spring Batch 高级主题:扩展与定制

Spring Batch 专题系列(八):Spring Batch 高级主题:扩展与定制

1. 引言

在上一篇文章中,我们学习了 Spring Batch 与数据库的集成(JDBC、JPA、MyBatis),掌握了如何高效读写数据、配置事务和优化性能。Spring Batch 的强大之处不仅在于其内置功能,还在于其高度可扩展性。通过自定义组件、监听器、拦截器和动态配置,你可以根据具体业务需求灵活定制批处理流程。

本文将聚焦以下内容:

  • 自定义 ItemReader、ItemProcessor 和 ItemWriter,适配非标准数据源和目标。
  • 使用监听器(Listener)和拦截器(ItemStream、StepExecutionListener 等)增强监控和控制。
  • 动态配置 Job 和 Step,支持运行时调整流程。
  • 通过代码示例和 Mermaid 图表展示扩展与定制的实现。

通过本文,你将学会如何将 Spring Batch 应用于复杂业务场景,打造高度定制化的批处理解决方案。

2. 扩展与定制的核心概念

Spring Batch 的扩展性体现在其模块化设计,允许开发者通过以下方式定制功能:

  • 自定义组件:实现 ItemReader、ItemProcessor、ItemWriter 接口,处理特殊数据源(如 API、文件系统)或复杂逻辑。
  • 监听器:通过 JobExecutionListener、StepExecutionListener、ChunkListener 等捕获生命周期事件,记录日志或干预执行。
  • 拦截器:使用 ItemStream 或自定义拦截器在读/写/处理阶段插入逻辑。
  • 动态配置:通过 JobParameters、Spring 表达式或代码动态生成 Job 和 Step,适应变化的需求。

这些机制依赖 Spring 的依赖注入和 AOP,支持无缝集成到现有项目。

扩展流程图

以下是用 Mermaid 绘制的 Spring Batch 扩展流程图,展示自定义组件和监听器的协作:

graph TD
    A[Job] --> B[Step]
    B --> C[Custom ItemReader]
    B --> D[Custom ItemProcessor]
    B --> E[Custom ItemWriter]
    B --> F[ChunkListener]
    B --> G[StepExecutionListener]
    C -->|读取| H[External Data Source]
    D -->|处理| I[Business Logic]
    E -->|写入| J[Target System]
    C --> K[ItemStream: Open/Close]
    E --> L[ItemStream: Open/Close]
    A --> M[JobExecutionListener]
    A --> N[JobRepository]
    N -->|存储元数据| O[Database]

说明

  • 自定义组件(Reader/Processor/Writer)处理非标准数据流。
  • 监听器(Chunk/Step/Job)捕获事件,记录或干预。
  • 拦截器(ItemStream)管理资源(如文件句柄、连接)。
  • JobRepository 记录状态,支持动态调整。

3. 自定义 ItemReader

ItemReader 负责读取数据,自定义 Reader 可以适配非标准数据源,如 REST API、Kafka、文件系统等。

3.1 实现 ItemReader 接口

示例:从 REST API 读取数据

假设我们需要从外部 API 读取商品数据,每次调用返回一页 JSON。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.example.springbatchdemo.reader;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemReader;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import java.util.List;

public class ApiItemReader implements ItemReader<Product> {

    private final RestTemplate restTemplate;
    private final String apiUrl;
    private int page = 0;
    private List<Product> currentPage;
    private int currentIndex = 0;

    public ApiItemReader(String apiUrl) {
        this.restTemplate = new RestTemplate();
        this.apiUrl = apiUrl;
    }

    @Override
    public Product read() {
        if (currentPage == null || currentIndex >= currentPage.size()) {
            // 获取下一页
            ResponseEntity<ProductPage> response = restTemplate.getForEntity(
                    apiUrl + "?page=" + page, ProductPage.class);
            if (!response.getStatusCode().is2xxSuccessful() || response.getBody() == null) {
                return null; // API 错误或无数据
            }
            currentPage = response.getBody().getProducts();
            if (currentPage.isEmpty()) {
                return null; // 没有更多数据
            }
            currentIndex = 0;
            page++;
        }
        return currentPage.get(currentIndex++);
    }
}

辅助类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package com.example.springbatchdemo.reader;

import com.example.springbatchdemo.entity.Product;
import java.util.List;

public class ProductPage {
    private List<Product> products;

    public List<Product> getProducts() {
        return products;
    }

    public void setProducts(List<Product> products) {
        this.products = products;
    }
}

配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import com.example.springbatchdemo.reader.ApiItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class ApiReaderConfiguration {

    @Bean
    public ApiItemReader apiReader() {
        return new ApiItemReader("http://api.example.com/products");
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step apiStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("apiStep", jobRepository)
                .<Product, Product>chunk(100)
                .reader(apiReader())
                .processor(processor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job apiJob(JobRepository jobRepository, Step apiStep) {
        return new JobBuilder("apiJob", jobRepository)
                .start(apiStep)
                .build();
    }
}

Processor 实现(同前):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;

public class ProductItemProcessor implements ItemProcessor<Product, Product> {
    private static final double EXCHANGE_RATE = 0.14;

    @Override
    public Product process(Product item) {
        if (item.getPrice() <= 0) {
            return null;
        }
        item.setPrice(item.getPrice() * EXCHANGE_RATE);
        return item;
    }
}

实体类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
package com.example.springbatchdemo.entity;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.Data;

@Entity
@Data
public class Product {
    @Id
    private Long id;
    private String name;
    private Double price;
}

说明

  • ApiItemReader 每次调用 API 获取一页数据,逐条返回 Product
  • 返回 null 表示数据读取结束。
  • 使用 RestTemplate 模拟 HTTP 请求,实际项目可替换为生产级客户端(如 WebClient)。

适用场景

  • 外部 API(如 REST、GraphQL)。
  • 消息队列(如 Kafka、RabbitMQ)。
  • 非标准文件(如 Excel、XML)。

4. 自定义 ItemProcessor

ItemProcessor 负责数据转换或业务逻辑,自定义 Processor 可以实现复杂处理,如调用外部服务、数据校验等。

4.1 实现 ItemProcessor 接口

示例:调用外部服务校验数据

假设需要调用外部服务校验商品名称是否合规。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.example.springbatchdemo.processor;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

public class ValidationItemProcessor implements ItemProcessor<Product, Product> {

    private final RestTemplate restTemplate;
    private final String validationApiUrl;

    public ValidationItemProcessor(String validationApiUrl) {
        this.restTemplate = new RestTemplate();
        this.validationApiUrl = validationApiUrl;
    }

    @Override
    public Product process(Product item) {
        // 调用外部服务校验名称
        ResponseEntity<Boolean> response = restTemplate.getForEntity(
                validationApiUrl + "?name=" + item.getName(), Boolean.class);
        if (!response.getStatusCode().is2xxSuccessful() || !response.getBody()) {
            return null; // 名称不合规,跳过
        }
        return item;
    }
}

配置

1
2
3
4
@Bean
public ValidationItemProcessor validationProcessor() {
    return new ValidationItemProcessor("http://api.example.com/validate");
}

说明

  • ValidationItemProcessor 调用外部 API 校验名称,返回 null 跳过不合规记录。
  • 可扩展为复杂逻辑,如格式转换、聚合计算。

适用场景

  • 数据校验(如正则、外部服务)。
  • 复杂转换(如多源合并)。
  • 业务规则应用。

5. 自定义 ItemWriter

ItemWriter 负责写入数据,自定义 Writer 可以适配非标准目标,如文件、API、队列等。

5.1 实现 ItemWriter 接口

示例:写入 Kafka 队列

假设处理后的数据需要发送到 Kafka。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.example.springbatchdemo.writer;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaItemWriter implements ItemWriter<Product> {

    private final KafkaTemplate<String, Product> kafkaTemplate;
    private final String topic;

    public KafkaItemWriter(KafkaTemplate<String, Product> kafkaTemplate, String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @Override
    public void write(Chunk<? extends Product> chunk) {
        for (Product item : chunk.getItems()) {
            kafkaTemplate.send(topic, String.valueOf(item.getId()), item);
        }
    }
}

添加依赖pom.xml):

1
2
3
4
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置

1
2
3
4
5
6
7
8
9
@Bean
public KafkaItemWriter kafkaWriter(KafkaTemplate<String, Product> kafkaTemplate) {
    return new KafkaItemWriter(kafkaTemplate, "products-topic");
}

@Bean
public KafkaTemplate<String, Product> kafkaTemplate(ProducerFactory<String, Product> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
}

说明

  • KafkaItemWriter 将每个 Chunk 的数据发送到 Kafka 主题。
  • 使用 KafkaTemplate 简化消息发送。
  • 事务由 Spring Batch 管理,确保 Chunk 级别一致性。

适用场景

  • 消息队列(如 Kafka、RabbitMQ)。
  • 外部系统(如 API、FTP)。
  • 自定义文件格式。

6. 使用监听器增强功能

监听器(Listener)捕获 Job、Step 或 Chunk 的事件,用于日志记录、监控或干预执行。

6.1 自定义 StepExecutionListener

示例:记录 Step 执行时间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.example.springbatchdemo.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;

public class TimingStepListener implements StepExecutionListener {

    private long startTime;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        startTime = System.currentTimeMillis();
        System.out.println("Step started: " + stepExecution.getStepName());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        long duration = System.currentTimeMillis() - startTime;
        System.out.println("Step ended: " + stepExecution.getStepName() +
                ", status: " + stepExecution.getStatus() +
                ", duration: " + duration + "ms");
        return stepExecution.getExitStatus();
    }
}

配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Bean
public Step apiStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("apiStep", jobRepository)
            .<Product, Product>chunk(100)
            .reader(apiReader())
            .processor(processor())
            .writer(writer(dataSource))
            .transactionManager(transactionManager)
            .listener(new TimingStepListener())
            .build();
}

说明

  • beforeStep 记录开始时间。
  • afterStep 计算并打印执行时间。

6.2 自定义 ChunkListener

示例:监控 Chunk 进度

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package com.example.springbatchdemo.listener;

import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.listener.ChunkListenerSupport;

public class ProgressChunkListener extends ChunkListenerSupport {

    @Override
    public void afterChunk(ChunkContext context) {
        long writeCount = context.getStepContext().getStepExecution().getWriteCount();
        System.out.println("Chunk completed, items written: " + writeCount);
    }

    @Override
    public void afterChunkError(ChunkContext context) {
        System.out.println("Chunk failed: " + context.getStepContext().getStepName());
    }
}

配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Bean
public Step apiStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new StepBuilder("apiStep", jobRepository)
            .<Product, Product>chunk(100)
            .reader(apiReader())
            .processor(processor())
            .writer(writer(dataSource))
            .transactionManager(transactionManager)
            .listener(new TimingStepListener())
            .listener(new ProgressChunkListener())
            .build();
}

适用场景

  • 日志记录(执行时间、进度)。
  • 监控和报警(失败通知)。
  • 干预执行(如动态调整参数)。

7. 使用拦截器(ItemStream)

ItemStream 是一个扩展接口,允许在读写操作前后执行初始化或清理逻辑,常用于管理资源(如文件句柄、连接)。

7.1 实现 ItemStreamReader

示例:管理 API 连接

扩展 ApiItemReader 支持 ItemStream:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.example.springbatchdemo.reader;

import com.example.springbatchdemo.entity.Product;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;

import java.util.List;

public class ApiItemReader implements ItemStreamReader<Product> {

    private final RestTemplate restTemplate;
    private final String apiUrl;
    private int page = 0;
    private List<Product> currentPage;
    private int currentIndex = 0;
    private boolean initialized = false;

    public ApiItemReader(String apiUrl) {
        this.restTemplate = new RestTemplate();
        this.apiUrl = apiUrl;
    }

    @Override
    public Product read() {
        if (!initialized) {
            throw new IllegalStateException("Reader not initialized");
        }
        if (currentPage == null || currentIndex >= currentPage.size()) {
            ResponseEntity<ProductPage> response = restTemplate.getForEntity(
                    apiUrl + "?page=" + page, ProductPage.class);
            if (!response.getStatusCode().is2xxSuccessful() || response.getBody() == null) {
                return null;
            }
            currentPage = response.getBody().getProducts();
            if (currentPage.isEmpty()) {
                return null;
            }
            currentIndex = 0;
            page++;
        }
        return currentPage.get(currentIndex++);
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        page = executionContext.getInt("page", 0);
        initialized = true;
        System.out.println("Opening API connection, starting at page: " + page);
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putInt("page", page);
    }

    @Override
    public void close() throws ItemStreamException {
        initialized = false;
        System.out.println("Closing API connection");
    }
}

说明

  • open:初始化连接,从 ExecutionContext 恢复页码。
  • update:保存当前页码,支持重启。
  • close:清理资源。
  • ExecutionContext 存储状态,支持作业中断后恢复。

适用场景

  • 管理外部资源(如数据库连接、文件流)。
  • 保存读取进度(如页码、偏移量)。
  • 支持重启和状态恢复。

8. 动态配置 Job 和 Step

动态配置允许在运行时根据参数或条件调整 Job 和 Step,例如根据输入选择数据源或处理逻辑。

8.1 使用 JobParameters 动态选择 Reader

示例:根据参数选择 Reader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.example.springbatchdemo.config;

import com.example.springbatchdemo.entity.Product;
import com.example.springbatchdemo.reader.ApiItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.Map;

@Configuration
public class DynamicConfiguration {

    @Bean
    @StepScope
    public ItemReader<Product> dynamicReader(@Value("#{jobParameters['source']}") String source,
                                            DataSource dataSource) {
        if ("api".equals(source)) {
            return new ApiItemReader("http://api.example.com/products");
        } else {
            return new JdbcPagingItemReaderBuilder<Product>()
                    .name("jdbcReader")
                    .dataSource(dataSource)
                    .selectClause("SELECT id, name, price")
                    .fromClause("FROM source_product")
                    .sortKeys(Map.of("id", Order.ASCENDING))
                    .pageSize(1000)
                    .beanRowMapper(Product.class)
                    .build();
        }
    }

    @Bean
    public ProductItemProcessor processor() {
        return new ProductItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Product> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Product>()
                .sql("INSERT INTO product (id, name, price) VALUES (:id, :name, :price)")
                .dataSource(dataSource)
                .beanMapped()
                .build();
    }

    @Bean
    public Step dynamicStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("dynamicStep", jobRepository)
                .<Product, Product>chunk(100)
                .reader(dynamicReader(null, dataSource))
                .processor(processor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job dynamicJob(JobRepository jobRepository, Step dynamicStep) {
        return new JobBuilder("dynamicJob", jobRepository)
                .start(dynamicStep)
                .build();
    }
}

运行

1
2
3
4
5
JobParameters jobParameters = new JobParametersBuilder()
        .addString("source", "api") // 或 "db"
        .addLong("runTime", System.currentTimeMillis())
        .toJobParameters();
jobLauncher.run(dynamicJob, jobParameters);

说明

  • @StepScope 确保 Reader 在运行时根据 JobParameters 创建。
  • 根据 source 参数选择 ApiItemReaderJdbcPagingItemReader
  • 支持动态切换数据源。

8.2 动态生成 Step

示例:根据文件数量生成 Step

假设有多个 CSV 文件,动态创建 Step 处理每个文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.example.springbatchdemo.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.PlatformTransactionManager;

import java.io.File;

@Configuration
public class DynamicStepConfiguration {

    @Bean
    public Step fileStep(JobRepository jobRepository, PlatformTransactionManager transactionManager,
                         String filePath) {
        return new StepBuilder("fileStep-" + filePath, jobRepository)
                .<Product, Product>chunk(100)
                .reader(new FlatFileItemReaderBuilder<Product>()
                        .name("fileReader")
                        .resource(new FileSystemResource(filePath))
                        .delimited()
                        .names("id", "name", "price")
                        .targetType(Product.class)
                        .build())
                .processor(new ProductItemProcessor())
                .writer(writer(dataSource))
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Job dynamicFileJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        JobBuilder jobBuilder = new JobBuilder("dynamicFileJob", jobRepository);
        File[] files = new File("/path/to/files").listFiles((dir, name) -> name.endsWith(".csv"));
        if (files == null) {
            return jobBuilder.start(new StepBuilder("emptyStep", jobRepository)
                    .tasklet((contribution, chunkContext) -> {
                        System.out.println("No files to process");
                        return RepeatStatus.FINISHED;
                    })
                    .transactionManager(transactionManager)
                    .build())
                    .build();
        }

        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("dynamicFlow");
        for (File file : files) {
            Step step = fileStep(jobRepository, transactionManager, file.getAbsolutePath());
            flowBuilder.start(step);
        }
        return jobBuilder.start(flowBuilder.build()).end().build();
    }
}

说明

  • 动态扫描 CSV 文件,生成对应的 Step。
  • 使用 FlowBuilder 构建动态流程。
  • 支持运行时扩展(如新文件自动处理)。

适用场景

  • 动态数据源(如多文件、多表)。
  • 条件流程(如根据参数选择 Step)。
  • 可变业务逻辑。

9. 综合示例:全定制 ETL

以下是一个综合示例,结合自定义 Reader、Processor、Writer、Listener 和动态配置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Configuration
public class CustomBatchConfiguration {

    @Bean
    @StepScope
    public ApiItemReader apiReader(@Value("#{jobParameters['apiUrl']}") String apiUrl) {
        return new ApiItemReader(apiUrl);
    }

    @Bean
    public ValidationItemProcessor validationProcessor() {
        return new ValidationItemProcessor("http://api.example.com/validate");
    }

    @Bean
    public KafkaItemWriter kafkaWriter(KafkaTemplate<String, Product> kafkaTemplate) {
        return new KafkaItemWriter(kafkaTemplate, "products-topic");
    }

    @Bean
    public TimingStepListener stepListener() {
        return new TimingStepListener();
    }

    @Bean
    public ProgressChunkListener chunkListener() {
        return new ProgressChunkListener();
    }

    @Bean
    public Step customStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return new StepBuilder("customStep", jobRepository)
                .<Product, Product>chunk(100)
                .reader(apiReader(null))
                .processor(validationProcessor())
                .writer(kafkaWriter(kafkaTemplate(null)))
                .transactionManager(transactionManager)
                .listener(stepListener())
                .listener(chunkListener())
                .build();
    }

    @Bean
    public Job customJob(JobRepository jobRepository, Step customStep) {
        return new JobBuilder("customJob", jobRepository)
                .start(customStep)
                .build();
    }
}

运行

1
2
3
4
5
JobParameters jobParameters = new JobParametersBuilder()
        .addString("apiUrl", "http://api.example.com/products")
        .addLong("runTime", System.currentTimeMillis())
        .toJobParameters();
jobLauncher.run(customJob, jobParameters);

说明

  • Reader:从动态 API URL 读取数据。
  • Processor:校验数据合规性。
  • Writer:发送到 Kafka。
  • Listener:记录时间和进度。
  • 动态参数支持灵活配置。

流程图

graph TD
    A[Job: customJob] --> B[Step: customStep]
    B --> C[ApiItemReader]
    B --> D[ValidationItemProcessor]
    B --> E[KafkaItemWriter]
    B --> F[TimingStepListener]
    B --> G[ProgressChunkListener]
    C -->|读取| H[External API]
    D -->|校验| I[Validation Service]
    E -->|写入| J[Kafka Topic]
    A --> K[JobRepository]
    K -->|存储元数据| L[Database]

10. 最佳实践

  1. 自定义组件

    • 保持单一职责(如 Reader 只负责读取)。
    • 实现 ItemStream 支持状态管理和重启。
    • 测试线程安全,适应多线程场景。
  2. 监听器

    • 使用 Listener 记录关键事件(如失败、进度)。
    • 避免复杂逻辑,防止性能瓶颈。
    • 结合监控工具(如 Prometheus)分析数据。
  3. 动态配置

    • 使用 @StepScope 注入 JobParameters。
    • 设计灵活的流程,支持扩展。
    • 测试动态场景,确保稳定性。
  4. 扩展性

    • 优先扩展现有组件(如装饰者模式)。
    • 使用 Spring 依赖注入管理自定义组件。
    • 编写单元测试验证逻辑。

11. 常见问题与解答

  • Q:自定义 Reader 如何支持重启?
    A:实现 ItemStream,在 update 方法保存进度(如偏移量),在 open 方法恢复。

  • Q:监听器影响性能怎么办?
    A:保持 Listener 轻量,异步记录日志(如使用队列),避免阻塞主线程。

  • Q:动态 Job 如何管理复杂流程?
    A:使用 FlowBuilderJobExecutionDecider 定义条件流,结合参数动态调整。

12. 下一步

本文详细讲解了 Spring Batch 的扩展与定制,包括自定义组件、监听器、拦截器和动态配置。通过示例和 Mermaid 图表,你学会了如何适配复杂业务场景。下一篇文章将聚焦 Spring Batch 生产实践,内容包括:

  • 部署和管理 Job(Spring Boot、容器化)。
  • 监控和报警(JMX、Prometheus)。
  • 常见生产问题与解决方案。
updatedupdated2025-04-172025-04-17