基于Java的高性能轻量化数据库设计与实现 扩展篇:执行引擎实现

1. 引言

在前三篇中,我们完成了系统架构设计和JDBC驱动的实现,支持了SQL语句的接收和初步解析。本篇将聚焦执行引擎(execution-engine模块)的实现,细化其设计与功能。执行引擎是数据库的核心组件,负责协调SQL操作的执行,桥接SQL解析器、查询优化器、存储引擎和事务管理器,确保高效完成DDL、DML和事务操作。

2. 执行引擎的目标与功能
2.1 目标
  • 高效执行SQL语句,支持SQL ANSI 92标准。
  • 协调各模块(如查询优化器、存储引擎),完成复杂查询。
  • 支持事务上下文,确保操作一致性。
  • 优化资源使用,提供并行执行能力。
2.2 功能
  • DDL执行:处理CREATE TABLEDROP TABLE等语句。
  • DML执行:执行INSERTSELECTUPDATEDELETE
  • 事务支持:管理事务开始、提交和回滚。
  • 计划执行:运行查询优化器生成的执行计划。
3. 执行引擎的核心设计
3.1 数据结构
  • 执行上下文(ExecutionContext)

    • 包含事务ID和其他运行时信息。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    
    public class ExecutionContext {
        private final long txId;
        private final TransactionManager txManager;
    
        public ExecutionContext(long txId, TransactionManager txManager) {
            this.txId = txId;
            this.txManager = txManager;
        }
        // getter
    }
    
  • 执行计划(QueryPlan)

    • 由查询优化器生成,执行引擎直接运行(参考第六篇)。
3.2 执行模型
  • 操作符模型:将查询分解为操作符(如扫描、过滤、连接)。
  • 流水线执行:支持操作符间的流式处理。
  • 并行执行:使用线程池处理大查询。
3.3 与其他模块的交互
  • SQL解析器:接收AST。
  • 查询优化器:获取优化后的计划。
  • 存储引擎:执行数据操作。
  • 事务管理器:管理事务状态。
4. 执行引擎实现

以下是ExecutionEngine类的核心实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.yinlongfei.lightweight.database.execution;

import com.yinlongfei.lightweight.database.optimizer.QueryOptimizer;
import com.yinlongfei.lightweight.database.optimizer.QueryPlan;
import com.yinlongfei.lightweight.database.parser.*;
import com.yinlongfei.lightweight.database.storage.StorageEngine;
import com.yinlongfei.lightweight.database.transaction.TransactionManager;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutionEngine {
    private final StorageEngine storage;
    private final TransactionManager txManager;
    private final QueryOptimizer optimizer;
    private final ExecutorService executorService = Executors.newFixedThreadPool(4); // 并行执行线程池

    public ExecutionEngine(StorageEngine storage, TransactionManager txManager, QueryOptimizer optimizer) {
        this.storage = storage;
        this.txManager = txManager;
        this.optimizer = optimizer;
    }

    public Object execute(Statement stmt, ExecutionContext context, LoggingManager logger) {
        if (stmt instanceof CreateTableStatement) {
            return executeCreateTable((CreateTableStatement) stmt, logger);
        } else if (stmt instanceof InsertStatement) {
            return executeInsert((InsertStatement) stmt, context, logger);
        } else if (stmt instanceof SelectStatement) {
            return executeSelect((SelectStatement) stmt, context, logger);
        } else if (stmt instanceof UpdateStatement) {
            return executeUpdate((UpdateStatement) stmt, context, logger);
        } else if (stmt instanceof DeleteStatement) {
            return executeDelete((DeleteStatement) stmt, context, logger);
        }
        throw new UnsupportedOperationException("Unsupported statement: " + stmt.getClass());
    }

    private void executeCreateTable(CreateTableStatement stmt, LoggingManager logger) {
        storage.getMetadataManager().addTable(stmt.getTableName(), stmt.getColumnNames(), 
                stmt.getColumnTypes(), stmt.getIndexedColumns());
        logger.info("Table created: " + stmt.getTableName());
    }

    private Integer executeInsert(InsertStatement stmt, ExecutionContext context, LoggingManager logger) {
        storage.executeInsert(stmt, context.getTxId(), logger);
        return 1; // 返回受影响行数
    }

    private List<Map<String, Object>> executeSelect(SelectStatement stmt, ExecutionContext context, LoggingManager logger) {
        QueryPlan plan = optimizer.optimize(stmt, context.getTxId());
        logger.info("Executing optimized plan for: " + stmt);
        return executorService.submit(() -> plan.execute(storage, context.getTxId())).get(); // 并行执行
    }

    private Integer executeUpdate(UpdateStatement stmt, ExecutionContext context, LoggingManager logger) {
        List<Map<String, Object>> rows = storage.executeSelect(
                new SelectStatement(stmt.getTableName(), null, stmt.getWhereClause()), 
                context.getTxId());
        for (Map<String, Object> row : rows) {
            stmt.getSetClauses().forEach((col, val) -> row.put(col, val));
            storage.updateRow(stmt.getTableName(), row, context.getTxId(), logger);
        }
        return rows.size();
    }

    private Integer executeDelete(DeleteStatement stmt, ExecutionContext context, LoggingManager logger) {
        List<Map<String, Object>> rows = storage.executeSelect(
                new SelectStatement(stmt.getTableName(), null, stmt.getWhereClause()), 
                context.getTxId());
        for (Map<String, Object> row : rows) {
            storage.deleteRow(stmt.getTableName(), row, context.getTxId(), logger);
        }
        return rows.size();
    }

    public void shutdown() {
        executorService.shutdown();
    }
}
5. 调整存储引擎以支持更新和删除

为支持UPDATEDELETE,调整StorageEngine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class StorageEngine {
    // ... 其他代码 ...

    public void updateRow(String tableName, Map<String, Object> rowData, long txId, LoggingManager logger) {
        List<Row> table = memoryTables.get(tableName);
        Row oldRow = table.stream()
                .filter(r -> r.getColumns().get("id").equals(rowData.get("id")))
                .findFirst().orElse(null);
        if (oldRow != null) {
            txManager.addToUndoLog(oldRow, txId); // 记录旧版本
            table.remove(oldRow);
            Row newRow = new Row(rowData, oldRow.getVersion() + 1, txId);
            table.add(newRow);
            updateIndexes(tableName, newRow);
            logger.logWAL("UPDATE " + tableName + " SET " + rowData);
        }
    }

    public void deleteRow(String tableName, Map<String, Object> rowData, long txId, LoggingManager logger) {
        List<Row> table = memoryTables.get(tableName);
        Row oldRow = table.stream()
                .filter(r -> r.getColumns().get("id").equals(rowData.get("id")))
                .findFirst().orElse(null);
        if (oldRow != null) {
            txManager.addToUndoLog(oldRow, txId);
            table.remove(oldRow);
            logger.logWAL("DELETE FROM " + tableName + " WHERE id = " + rowData.get("id"));
        }
    }
}
6. 执行流程图

执行引擎的执行流程:

sequenceDiagram
    participant J as JDBC驱动
    participant E as 执行引擎
    participant O as 查询优化器
    participant S as 存储引擎
    participant T as 事务管理器
    participant L as 日志管理器

    J->>E: execute(stmt, context)
    alt DDL
        E->>S: addTable()
        S->>L: info("Table created")
    else DML (SELECT)
        E->>O: optimize(stmt, txId)
        O-->>E: QueryPlan
        E->>S: execute(plan, txId)
        S-->>E: List<Map>
    else DML (INSERT/UPDATE/DELETE)
        E->>S: executeInsert/Update/Delete(stmt, txId)
        S->>T: addToUndoLog()
        S->>L: logWAL()
    end
    E-->>J: Result
  • 流程说明
    1. JDBC驱动调用执行引擎处理语句。
    2. DDL直接操作元数据,DML根据类型调用优化器或存储引擎。
    3. 写操作记录WAL日志和事务回滚信息。
7. 测试示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
MetadataManager metadata = new MetadataManager("jdbc:lightweight:memory");
LoggingManager logger = new LoggingManager("memory");
TransactionManager txManager = new TransactionManager(storage, logger);
StorageEngine storage = new StorageEngine("jdbc:lightweight:memory", metadata, txManager);
QueryOptimizer optimizer = new QueryOptimizer(metadata, storage);
ExecutionEngine engine = new ExecutionEngine(storage, txManager, optimizer);

long txId = txManager.begin();
ExecutionContext context = new ExecutionContext(txId, txManager);

engine.execute(new CreateTableStatement("users", Arrays.asList("id", "name"), 
        Arrays.asList("INT", "VARCHAR"), Collections.singletonList("id")), context, logger);
engine.execute(new InsertStatement("users", Arrays.asList("id", "name"), 
        Arrays.asList(1, "Alice")), context, logger);
List<Map<String, Object>> result = (List<Map<String, Object>>) engine.execute(
        new SelectStatement("users", null, new Condition("id", "=", 1)), context, logger);
System.out.println(result); // 输出 [{id=1, name=Alice}]

txManager.commit(txId);
engine.shutdown();
8. 下一步展望

下一篇文章将实现“事务管理器”,设计MVCC机制,与执行引擎和存储引擎集成,确保并发一致性。


总结

新调整的第四篇细化了执行引擎的设计与实现,支持DDL和DML的执行,引入并行处理和操作符模型。Mermaid图展示了执行流程,与其他模块的协作增强了系统的完整性。后续篇章编号顺延,第五篇将变为事务管理器实现。

updatedupdated2025-03-312025-03-31