通用系统批量比对工具设计(第四篇:模块设计与实现)

系列专题:通用系统批量比对工具设计(第四篇:模块设计与实现)

一、设计目标

  • 高性能:亿级数据单表处理 ≤ 4分钟(目标 ≤ 2分钟)。
  • 智能转换:支持XML、TXT等数据源及动态规则。
  • 分布式调度:支持自研系统接入与任务分发。
  • 可视化规则管理:支持编辑、权限和版本控制。
  • 模块化:易扩展,支持新数据源和规则。

二、核心模块设计

  1. 数据加载与转换模块(Smart Data Loader)

    • 功能
      • 从多种数据源加载数据(DB、CSV、Excel、JSON、XML、TXT、API)。
      • 执行智能转换(字段映射、清洗、类型转换)。
    • 技术:Spring Boot + Drools(规则引擎)+ Jackson(JSON/XML)+ Netty(API)。
    • 实现
      • 流式加载:使用Stream处理大数据。
      • 动态规则:Drools解析JSON配置。
      • XML支持:XPATH解析嵌套结构。
      • TXT支持:正则表达式或分隔符解析。
    • 代码示例
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      
      public interface SmartDataLoader {
          Stream<Record> loadData(DataSourceConfig config) throws Exception;
          Stream<Record> transform(Stream<Record> data, TransformRule rule);
      }
      
      @Service
      public class FileDataLoader implements SmartDataLoader {
          @Autowired
          private KieContainer kieContainer; // Drools规则容器
      
          @Override
          public Stream<Record> loadData(DataSourceConfig config) throws Exception {
              String type = config.getType();
              Path path = Paths.get(config.getPath());
              if ("XML".equals(type)) {
                  return parseXml(path);
              } else if ("TXT".equals(type)) {
                  return parseTxt(path, config.getDelimiter());
              }
              // 其他类型略
              return Stream.empty();
          }
      
          private Stream<Record> parseXml(Path path) throws Exception {
              Document doc = DocumentBuilderFactory.newInstance()
                      .newDocumentBuilder().parse(path.toFile());
              XPath xpath = XPathFactory.newInstance().newXPath();
              NodeList nodes = (NodeList) xpath.evaluate("//record", doc, XPathConstants.NODESET);
              return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
                      new Iterator<Record>() {
                          int index = 0;
                          @Override public boolean hasNext() { return index < nodes.getLength(); }
                          @Override public Record next() {
                              Node node = nodes.item(index++);
                              return new Record(/* 解析逻辑 */);
                          }
                      }, Spliterator.ORDERED), false);
          }
      
          private Stream<Record> parseTxt(Path path, String delimiter) throws Exception {
              return Files.lines(path).map(line -> {
                  String[] fields = line.split(delimiter);
                  return new Record(fields);
              });
          }
      
          @Override
          public Stream<Record> transform(Stream<Record> data, TransformRule rule) {
              KieSession session = kieContainer.newKieSession();
              session.insert(rule);
              return data.map(record -> {
                  session.insert(record);
                  session.fireAllRules();
                  return record;
              });
          }
      }
      
    • 规则示例(Drools)
      rule "MapField"
          when
              $record: Record()
              $rule: TransformRule(sourceField == "id", targetField == "userId")
          then
              $record.setField("userId", $record.getField("id"));
      end
      
  2. 比对引擎模块(Compare Engine)

    • 功能
      • 执行流式比对(新增、删除、修改)。
      • 支持亿级数据分片处理。
    • 技术:Spring Batch + Disruptor。
    • 实现
      • 分片:每1000万条一shard。
      • 并行:16线程单节点。
      • 异步结果:Disruptor队列处理差异输出。
    • 代码示例
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      
      @Service
      public class StreamComparator {
          private final Disruptor<DiffResult> disruptor;
      
          public StreamComparator() {
              this.disruptor = new Disruptor<>(DiffResult::new, 1024, Executors.defaultThreadFactory());
              disruptor.handleEventsWith((result, sequence, endOfBatch) -> {/* 存储结果 */});
              disruptor.start();
          }
      
          public void compare(Stream<Record> sourceA, Stream<Record> sourceB, CompareRule rule) {
              Iterator<Record> iterA = sourceA.iterator();
              Iterator<Record> iterB = sourceB.iterator();
              while (iterA.hasNext() && iterB.hasNext()) {
                  Record a = iterA.next(), b = iterB.next();
                  DiffResult result = rule.apply(a, b);
                  disruptor.publishEvent((event, sequence) -> event.setResult(result));
              }
          }
      }
      
      @Configuration
      public class BatchConfig {
          @Bean
          public Job compareJob(JobBuilderFactory jobBuilderFactory, Step step) {
              return jobBuilderFactory.get("compareJob").start(step).build();
          }
      
          @Bean
          public Step compareStep(StepBuilderFactory stepBuilderFactory, StreamComparator comparator) {
              return stepBuilderFactory.get("compareStep")
                      .<Record, DiffResult>chunk(100_000)
                      .reader(/* 自定义Reader */)
                      .processor(comparator::compare)
                      .writer(/* 自定义Writer */)
                      .taskExecutor(new SimpleAsyncTaskExecutor())
                      .build();
          }
      }
      
  3. 分布式调度模块(Custom Scheduler)

    • 功能
      • 支持自研系统接入(API或消息队列)。
      • 任务分片与分布式执行。
    • 技术:Kafka + Hazelcast。
    • 实现
      • Kafka发布任务分片。
      • Hazelcast竞争执行权与状态同步。
    • 代码示例
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      
      @Service
      public class CustomScheduler {
          @Autowired
          private KafkaTemplate<String, Task> kafkaTemplate;
          @Autowired
          private HazelcastInstance hazelcast;
      
          public void scheduleTask(Task task) {
              IMap<String, TaskStatus> statusMap = hazelcast.getMap("taskStatus");
              List<TaskShard> shards = splitTask(task, 10); // 分10片
              shards.forEach(shard -> kafkaTemplate.send("compare-tasks", shard));
          }
      
          @KafkaListener(topics = "compare-tasks")
          public void executeShard(TaskShard shard) {
              ILock lock = hazelcast.getLock("shard-" + shard.getId());
              if (lock.tryLock()) {
                  try {
                      // 执行任务分片
                  } finally {
                      lock.unlock();
                  }
              }
          }
      
          // 自研系统接入API
          @PostMapping("/api/tasks")
          public ResponseEntity<String> submitTask(@RequestBody TaskRequest request) {
              Task task = convertRequest(request);
              scheduleTask(task);
              return ResponseEntity.ok("Task submitted: " + task.getId());
          }
      }
      
  4. 可视化规则管理模块(Rule Manager)

    • 功能
      • 编辑转换与比对规则。
      • 权限控制与版本管理。
    • 技术:Spring Boot + Angular + PostgreSQL。
    • 实现
      • 前端:拖拽界面,支持规则预览。
      • 后端:存储规则版本,集成JHipster权限。
    • 代码示例(后端)
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      
      @Entity
      public class RuleVersion {
          @Id private Long id;
          private String name;
          private String content; // JSON格式规则
          private int version;
          private String creator;
          @ManyToOne private Role role; // 权限关联
      }
      
      @RestController
      @RequestMapping("/api/rules")
      public class RuleController {
          @Autowired
          private RuleRepository ruleRepository;
      
          @PostMapping
          @PreAuthorize("hasRole('ADMIN')")
          public RuleVersion createRule(@RequestBody RuleVersion rule) {
              rule.setVersion(1);
              rule.setCreator(SecurityUtils.getCurrentUserLogin().orElse("unknown"));
              return ruleRepository.save(rule);
          }
      
          @GetMapping("/{id}/versions")
          public List<RuleVersion> getVersions(@PathVariable Long id) {
              return ruleRepository.findByNameOrderByVersionDesc(id);
          }
      }
      
    • 前端(Angular伪代码)
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      
      @Component
      export class RuleEditorComponent {
        rule: Rule = { name: '', content: '' };
        previewData: any;
      
        onDragDrop(event: any) {
          this.rule.content = JSON.stringify(event.config);
        }
      
        preview() {
          this.http.post('/api/rules/preview', this.rule).subscribe(data => {
            this.previewData = data;
          });
        }
      }
      

三、亿级数据优化

  • 分片:1亿条分10个shard,每shard 1000万条。
  • 并行:16线程单节点,约160万条/秒(假设100MB/s I/O)。
  • 流式处理:XML/TXT逐行解析,避免内存溢出。
  • 缓存:Hazelcast缓存热点规则。

四、下一步规划

下一篇文章将聚焦于扩展与优化,探讨如何添加新数据源、新规则,以及性能调优和分布式部署方案。


总结

本篇设计了数据加载、比对引擎、分布式调度和规则管理模块,支持XML/TXT、自研系统接入和可视化规则编辑。代码示例展示了关键实现,满足亿级数据和高性能需求。

updatedupdated2025-03-312025-03-31