系列专题:通用系统批量比对工具设计
第一部分:需求分析与功能规划
核心功能
- 数据输入与智能转换
- 支持多种数据源:数据库(MySQL、PostgreSQL、Oracle)、文件(CSV、Excel、JSON)、API。
- 任意选择两种数据源进行比对。
- 可视化配置转换规则(如字段映射、类型转换、清洗规则)。
- 标签标记系统
- 支持对数据记录添加标签(如“异常”、“待核查”)。
- 标签可动态配置,支持批量标记。
- 差异化结果与统计指标
- 生成差异化报告(新增、删除、修改)。
- 提供统计指标(匹配率、差异率、记录总数等)。
- 无限大数据量支持
- 单次单表亿级数据处理,性能目标:不超过4分钟(甚至更高要求,如2分钟内)。
- 支持流式处理和分片机制。
- 人性化设计
- 超长时间任务提醒(WebSocket推送或邮件通知)。
- 任务进度可视化(实时进度条)。
- 支持任务中断与恢复。
- 高性能与分布式调度
- 单节点亿级数据处理不超过4分钟。
- 支持分布式调度:集成XXL-JOB、Elastic-Job,或自研调度框架。
非功能性需求
- 高性能:亿级数据单表处理 ≤ 4分钟(目标 ≤ 2分钟)。
- 可扩展性:支持新数据源、新规则、新调度器。
- 高可用性:分布式环境下任务容错与负载均衡。
第二部分:技术选型与JHipster集成
技术栈调整
- 后端
- Spring Boot(JHipster核心)。
- Spring Batch(批量处理与分片)。
- Spring Data Flow(流式处理与分布式任务管理)。
- Apache Kafka(分布式消息队列)。
- Hazelcast/Redis(分布式缓存与锁)。
- 前端
- Angular(Web界面)。
- WebSocket(实时进度推送)。
- Chart.js/D3.js(统计指标可视化)。
- 数据库与存储
- PostgreSQL(主数据库,支持分区表)。
- Elasticsearch(大数据索引与搜索)。
- MinIO(分布式文件存储,存储大文件)。
- 分布式调度
- XXL-JOB(轻量级调度器)。
- Elastic-Job(分布式任务调度)。
- 自研调度(基于Kafka和Spring Boot实现)。
- 高性能组件
- Netty(异步I/O处理API数据)。
- Disruptor(高性能队列,内存计算)。
JHipster配置
- 应用类型:Microservices(分布式架构)。
- 数据库:PostgreSQL + Elasticsearch。
- 异步支持:启用Kafka和WebSocket。
- 额外依赖:Spring Batch、Spring Data Flow、Hazelcast。
第三部分:系统架构设计
架构图(Mermaid)
graph TD
A[用户界面<br>Web UI / API] --> B[任务调度层<br>Distributed Scheduler]
B --> C[控制器层<br>REST Controller]
C --> D[服务层<br>Compare Service / Transform Service]
D --> E[数据访问层<br>Smart Data Loader]
D --> F[批量处理层<br>Spring Batch / Flow]
E --> G[数据源<br>DB / File / API]
F --> H[分布式计算节点<br>Kafka / Hazelcast]
D --> I[标签管理<br>Tag Service]
D --> J[结果生成<br>Report Generator]
J --> K[输出与统计<br>HTML / Excel / Charts]
A --> L[实时通知<br>WebSocket / Email]
架构说明
- 任务调度层
- 集成XXL-JOB/Elastic-Job,或自研调度器,负责任务分片与分配。
- 数据访问层(Smart Data Loader)
- 支持智能数据加载与转换,基于规则引擎(如Drools)动态解析转换规则。
- 服务层
Transform Service
:处理数据转换。Compare Service
:执行比对逻辑。Tag Service
:管理标签。
- 批量处理层
- Spring Batch负责分片与并行处理。
- Spring Data Flow支持流式处理。
- 分布式计算节点
- 通过Kafka分发任务,Hazelcast实现分布式锁与缓存。
- 实时通知
- WebSocket推送任务进度与超长时间提醒。
第四部分:模块设计与实现
核心模块
智能数据加载与转换
- 接口:
SmartDataLoader
- 实现:
DatabaseLoader
、FileLoader
、APILoader
- 转换规则:基于JSON配置(如
{"sourceField": "id", "targetField": "userId", "transform": "toString"}
) - 规则引擎:Drools解析动态规则。
1 2 3 4
public interface SmartDataLoader { Stream<Record> loadData(DataSourceConfig config); Stream<Record> transform(Stream<Record> data, TransformRule rule); }
- 接口:
比对引擎
- 接口:
Comparator
- 实现:支持流式比对,内存占用低。
- 示例:
StreamComparator
1 2 3 4 5
public class StreamComparator implements Comparator { public Stream<DiffResult> compare(Stream<Record> sourceA, Stream<Record> sourceB, CompareRule rule) { // 流式比对逻辑 } }
- 接口:
标签管理
- 服务:
TagService
- 支持动态标签规则(如“差异率>10%标记为异常”)。
- 服务:
分布式调度
- 自研调度器示例:
- 使用Kafka发布任务分片。
- 节点通过Hazelcast竞争任务执行权。
1 2 3 4 5 6 7 8 9
@Service public class CustomScheduler { @Autowired private KafkaTemplate<String, Task> kafkaTemplate; public void scheduleTask(Task task) { kafkaTemplate.send("compare-tasks", task); } }
- 自研调度器示例:
结果生成与统计
- 使用Disruptor队列异步生成报告。
- 统计指标:匹配率、差异率等通过Elasticsearch聚合计算。
高性能设计
- 亿级数据处理:
- 分片:每千万条记录一 shard。
- 并行:单节点16线程,分布式多节点扩展。
- 内存优化:流式处理,避免全量加载。
- 目标:亿级数据 ≤ 2分钟(假设16核CPU,100MB/s磁盘I/O)。
- 分布式支持:
- Kafka分发任务,Hazelcast同步状态。
第五部分:人性化与优化
人性化设计
- 超长时间提醒:任务超过5分钟未完成,通过WebSocket推送“任务仍在进行,请耐心等待”。
- 进度可视化:前端显示实时进度条(基于Spring Batch Job状态)。
- 任务恢复:支持失败任务重试与断点续传。
性能优化
- 数据分区:数据库使用分区表,文件分片读取。
- 缓存:Hazelcast缓存热点数据。
- 异步I/O:Netty处理API数据加载。
第六部分:部署与测试
部署
- 使用JHipster生成微服务架构。
- Docker Compose单机部署,Kubernetes分布式部署。
- Kafka集群+Zookeeper确保高可用。
测试
- 性能测试:模拟亿级数据,验证 ≤ 2分钟目标。
- 压力测试:分布式环境下多节点负载均衡。
总结
调整后的设计满足了亿级数据高性能处理(≤ 2分钟)、智能数据转换、分布式调度和人性化体验的要求。通过JHipster快速搭建框架,结合Spring Batch、Kafka和Hazelcast实现高性能与扩展性。