系列专题:通用系统批量比对工具设计(第四篇:模块设计与实现)
一、设计目标
- 高性能:亿级数据单表处理 ≤ 4分钟(目标 ≤ 2分钟)。
- 智能转换:支持XML、TXT等数据源及动态规则。
- 分布式调度:支持自研系统接入与任务分发。
- 可视化规则管理:支持编辑、权限和版本控制。
- 模块化:易扩展,支持新数据源和规则。
二、核心模块设计
数据加载与转换模块(Smart Data Loader)
- 功能:
- 从多种数据源加载数据(DB、CSV、Excel、JSON、XML、TXT、API)。
- 执行智能转换(字段映射、清洗、类型转换)。
- 技术:Spring Boot + Drools(规则引擎)+ Jackson(JSON/XML)+ Netty(API)。
- 实现:
- 流式加载:使用
Stream
处理大数据。 - 动态规则:Drools解析JSON配置。
- XML支持:XPATH解析嵌套结构。
- TXT支持:正则表达式或分隔符解析。
- 流式加载:使用
- 代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
public interface SmartDataLoader { Stream<Record> loadData(DataSourceConfig config) throws Exception; Stream<Record> transform(Stream<Record> data, TransformRule rule); } @Service public class FileDataLoader implements SmartDataLoader { @Autowired private KieContainer kieContainer; // Drools规则容器 @Override public Stream<Record> loadData(DataSourceConfig config) throws Exception { String type = config.getType(); Path path = Paths.get(config.getPath()); if ("XML".equals(type)) { return parseXml(path); } else if ("TXT".equals(type)) { return parseTxt(path, config.getDelimiter()); } // 其他类型略 return Stream.empty(); } private Stream<Record> parseXml(Path path) throws Exception { Document doc = DocumentBuilderFactory.newInstance() .newDocumentBuilder().parse(path.toFile()); XPath xpath = XPathFactory.newInstance().newXPath(); NodeList nodes = (NodeList) xpath.evaluate("//record", doc, XPathConstants.NODESET); return StreamSupport.stream(Spliterators.spliteratorUnknownSize( new Iterator<Record>() { int index = 0; @Override public boolean hasNext() { return index < nodes.getLength(); } @Override public Record next() { Node node = nodes.item(index++); return new Record(/* 解析逻辑 */); } }, Spliterator.ORDERED), false); } private Stream<Record> parseTxt(Path path, String delimiter) throws Exception { return Files.lines(path).map(line -> { String[] fields = line.split(delimiter); return new Record(fields); }); } @Override public Stream<Record> transform(Stream<Record> data, TransformRule rule) { KieSession session = kieContainer.newKieSession(); session.insert(rule); return data.map(record -> { session.insert(record); session.fireAllRules(); return record; }); } }
- 规则示例(Drools):
rule "MapField" when $record: Record() $rule: TransformRule(sourceField == "id", targetField == "userId") then $record.setField("userId", $record.getField("id")); end
- 功能:
比对引擎模块(Compare Engine)
- 功能:
- 执行流式比对(新增、删除、修改)。
- 支持亿级数据分片处理。
- 技术:Spring Batch + Disruptor。
- 实现:
- 分片:每1000万条一shard。
- 并行:16线程单节点。
- 异步结果:Disruptor队列处理差异输出。
- 代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
@Service public class StreamComparator { private final Disruptor<DiffResult> disruptor; public StreamComparator() { this.disruptor = new Disruptor<>(DiffResult::new, 1024, Executors.defaultThreadFactory()); disruptor.handleEventsWith((result, sequence, endOfBatch) -> {/* 存储结果 */}); disruptor.start(); } public void compare(Stream<Record> sourceA, Stream<Record> sourceB, CompareRule rule) { Iterator<Record> iterA = sourceA.iterator(); Iterator<Record> iterB = sourceB.iterator(); while (iterA.hasNext() && iterB.hasNext()) { Record a = iterA.next(), b = iterB.next(); DiffResult result = rule.apply(a, b); disruptor.publishEvent((event, sequence) -> event.setResult(result)); } } } @Configuration public class BatchConfig { @Bean public Job compareJob(JobBuilderFactory jobBuilderFactory, Step step) { return jobBuilderFactory.get("compareJob").start(step).build(); } @Bean public Step compareStep(StepBuilderFactory stepBuilderFactory, StreamComparator comparator) { return stepBuilderFactory.get("compareStep") .<Record, DiffResult>chunk(100_000) .reader(/* 自定义Reader */) .processor(comparator::compare) .writer(/* 自定义Writer */) .taskExecutor(new SimpleAsyncTaskExecutor()) .build(); } }
- 功能:
分布式调度模块(Custom Scheduler)
- 功能:
- 支持自研系统接入(API或消息队列)。
- 任务分片与分布式执行。
- 技术:Kafka + Hazelcast。
- 实现:
- Kafka发布任务分片。
- Hazelcast竞争执行权与状态同步。
- 代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
@Service public class CustomScheduler { @Autowired private KafkaTemplate<String, Task> kafkaTemplate; @Autowired private HazelcastInstance hazelcast; public void scheduleTask(Task task) { IMap<String, TaskStatus> statusMap = hazelcast.getMap("taskStatus"); List<TaskShard> shards = splitTask(task, 10); // 分10片 shards.forEach(shard -> kafkaTemplate.send("compare-tasks", shard)); } @KafkaListener(topics = "compare-tasks") public void executeShard(TaskShard shard) { ILock lock = hazelcast.getLock("shard-" + shard.getId()); if (lock.tryLock()) { try { // 执行任务分片 } finally { lock.unlock(); } } } // 自研系统接入API @PostMapping("/api/tasks") public ResponseEntity<String> submitTask(@RequestBody TaskRequest request) { Task task = convertRequest(request); scheduleTask(task); return ResponseEntity.ok("Task submitted: " + task.getId()); } }
- 功能:
可视化规则管理模块(Rule Manager)
- 功能:
- 编辑转换与比对规则。
- 权限控制与版本管理。
- 技术:Spring Boot + Angular + PostgreSQL。
- 实现:
- 前端:拖拽界面,支持规则预览。
- 后端:存储规则版本,集成JHipster权限。
- 代码示例(后端):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
@Entity public class RuleVersion { @Id private Long id; private String name; private String content; // JSON格式规则 private int version; private String creator; @ManyToOne private Role role; // 权限关联 } @RestController @RequestMapping("/api/rules") public class RuleController { @Autowired private RuleRepository ruleRepository; @PostMapping @PreAuthorize("hasRole('ADMIN')") public RuleVersion createRule(@RequestBody RuleVersion rule) { rule.setVersion(1); rule.setCreator(SecurityUtils.getCurrentUserLogin().orElse("unknown")); return ruleRepository.save(rule); } @GetMapping("/{id}/versions") public List<RuleVersion> getVersions(@PathVariable Long id) { return ruleRepository.findByNameOrderByVersionDesc(id); } }
- 前端(Angular伪代码):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
@Component export class RuleEditorComponent { rule: Rule = { name: '', content: '' }; previewData: any; onDragDrop(event: any) { this.rule.content = JSON.stringify(event.config); } preview() { this.http.post('/api/rules/preview', this.rule).subscribe(data => { this.previewData = data; }); } }
- 功能:
三、亿级数据优化
- 分片:1亿条分10个shard,每shard 1000万条。
- 并行:16线程单节点,约160万条/秒(假设100MB/s I/O)。
- 流式处理:XML/TXT逐行解析,避免内存溢出。
- 缓存:Hazelcast缓存热点规则。
四、下一步规划
下一篇文章将聚焦于扩展与优化,探讨如何添加新数据源、新规则,以及性能调优和分布式部署方案。
总结
本篇设计了数据加载、比对引擎、分布式调度和规则管理模块,支持XML/TXT、自研系统接入和可视化规则编辑。代码示例展示了关键实现,满足亿级数据和高性能需求。