基于Java的高性能轻量化数据库设计与实现 第四篇:存储引擎实现

1. 引言

在前三篇中,我们完成了系统架构设计和JDBC驱动实现,并集成了日志管理器以支持WAL和调试日志。本篇将聚焦存储引擎(storage-engine模块)的实现,设计内存和文件存储机制,支持B+树索引和JSON数据类型,确保高性能和事务一致性。存储引擎将与执行引擎、事务管理器和元数据管理模块协作,完成数据操作。

2. 存储引擎的目标与功能
2.1 目标
  • 提供高效的数据存储和检索,支持内存和文件两种模式。
  • 实现B+树索引,优化查询性能。
  • 原生支持JSON数据类型,满足现代应用需求。
  • 与MVCC事务和日志管理器集成,确保数据一致性和持久性。
2.2 功能
  • 数据存储:支持表数据和索引的内存/文件存储。
  • 索引管理:实现B+树索引,支持主键和二级索引。
  • JSON支持:存储和查询JSON数据。
  • 事务支持:配合MVCC管理多版本数据。
3. 存储引擎的核心设计
3.1 数据结构
  • 表数据

    • 使用Map<String, List<Row>>存储表,键为表名,值为行列表。
    • 每行Row包含字段值和MVCC元数据。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    public class Row {
        private Map<String, Object> columns; // 支持JSON类型
        private long version; // MVCC版本号
        private long txId; // 事务ID
    
        public Row(Map<String, Object> columns, long version, long txId) {
            this.columns = columns;
            this.version = version;
            this.txId = txId;
        }
        // getter和setter
    }
    
  • 索引

    • 使用B+树实现,支持高效范围查询。
    • 键为索引字段值,值为行引用或主键。
3.2 存储模式
  • 内存模式:所有数据存储在HashMap中,适合小规模快速操作。
  • 文件模式:使用Java NIO的内存映射文件(MappedByteBuffer)存储数据,结合WAL日志确保持久性。
3.3 JSON支持
  • 使用javax.json解析和存储JSON数据。
  • 支持JSON查询,如JSON_EXTRACT(data, '$.name')
4. 存储引擎实现

以下是StorageEngine类的核心实现:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.yinlongfei.lightweight.database.storage;

import com.yinlongfei.lightweight.database.metadata.MetadataManager;
import com.yinlongfei.lightweight.database.transaction.TransactionManager;
import org.roaringbitmap.RoaringBitmap;

import javax.json.Json;
import javax.json.JsonObject;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;

public class StorageEngine {
    private final Map<String, List<Row>> memoryTables = new HashMap<>(); // 内存表
    private final Map<String, BPlusTree> indexes = new HashMap<>(); // 索引
    private final MetadataManager metadataManager;
    private final Path dataDir; // 文件存储目录
    private boolean isMemoryMode;

    public StorageEngine(String url, MetadataManager metadataManager) {
        this.metadataManager = metadataManager;
        this.dataDir = Path.of(url.replace("jdbc:lightweight:", ""));
        this.isMemoryMode = url.contains("memory");
        if (!isMemoryMode && !Files.exists(dataDir)) {
            try {
                Files.createDirectories(dataDir);
            } catch (Exception e) {
                throw new RuntimeException("Failed to create data directory", e);
            }
        }
    }

    public synchronized void executeInsert(InsertStatement stmt, long txId, LoggingManager logger) {
        String tableName = stmt.getTableName();
        List<Row> table = memoryTables.computeIfAbsent(tableName, k -> new ArrayList<>());
        Map<String, Object> rowData = new HashMap<>();
        for (int i = 0; i < stmt.getColumns().size(); i++) {
            rowData.put(stmt.getColumns().get(i), stmt.getValues().get(i));
        }
        Row row = new Row(rowData, 1, txId);
        table.add(row);

        // 更新索引
        updateIndexes(tableName, row);
        // 记录WAL日志
        if (!isMemoryMode) logger.logWAL("INSERT INTO " + tableName + " VALUES " + rowData);
    }

    public List<Map<String, Object>> executeSelect(SelectStatement stmt, long txId) {
        String tableName = stmt.getTableName();
        List<Row> table = memoryTables.getOrDefault(tableName, Collections.emptyList());
        return table.stream()
                .filter(row -> isVisible(row, txId)) // MVCC可见性检查
                .filter(row -> evaluateCondition(row, stmt.getWhereClause()))
                .map(row -> new HashMap<>(row.getColumns()))
                .collect(Collectors.toList());
    }

    private void updateIndexes(String tableName, Row row) {
        List<String> indexedColumns = metadataManager.getIndexedColumns(tableName);
        for (String column : indexedColumns) {
            Object key = row.getColumns().get(column);
            BPlusTree index = indexes.computeIfAbsent(tableName + "_" + column, k -> new BPlusTree());
            index.insert(key, row);
        }
    }

    private boolean isVisible(Row row, long txId) {
        return row.getTxId() <= txId && row.getVersion() > 0; // 简化的MVCC检查
    }

    private boolean evaluateCondition(Row row, Condition cond) {
        if (cond == null) return true;
        Object value = row.getColumns().get(cond.getColumn());
        switch (cond.getOperator()) {
            case ">": return ((Comparable) value).compareTo(cond.getValue()) > 0;
            case "=": return value.equals(cond.getValue());
            default: return false;
        }
    }

    // 文件存储示例(简化为追加写入)
    public void flushToDisk() {
        if (isMemoryMode) return;
        try (FileChannel channel = FileChannel.open(dataDir.resolve("data.db"), 
                StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
            MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024);
            for (Map.Entry<String, List<Row>> entry : memoryTables.entrySet()) {
                String tableName = entry.getKey();
                for (Row row : entry.getValue()) {
                    String data = tableName + "|" + row.getColumns().toString() + "\n";
                    buffer.put(data.getBytes());
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("Failed to flush to disk", e);
        }
    }
}

// 简化的B+树实现(实际需完整实现)
class BPlusTree {
    public void insert(Object key, Row row) {
        // 实现B+树插入逻辑
    }
    public List<Row> search(Object key) {
        return Collections.emptyList(); // 实现搜索逻辑
    }
}
5. 与其他模块的集成
  • 元数据管理:获取表结构和索引信息。
  • 事务管理器:通过txIdversion实现MVCC。
  • 日志管理器:记录WAL日志确保持久性。
  • 执行引擎:通过executeInsertexecuteSelect接口提供服务。
6. 执行流程图

存储引擎的执行流程:

sequenceDiagram
    participant E as 执行引擎
    participant S as 存储引擎
    participant M as 元数据管理
    participant T as 事务管理器
    participant L as 日志管理器

    E->>S: executeInsert(stmt, txId)
    S->>M: getIndexedColumns(table)
    M-->>S: indexedColumns
    S->>S: updateIndexes()
    S->>L: logWAL("INSERT ...")
    E->>S: executeSelect(stmt, txId)
    S->>T: check visibility(txId)
    T-->>S: visible rows
    S-->>E: List<Map>
  • 流程说明
    1. 插入时更新索引并记录WAL日志。
    2. 查询时检查MVCC可见性并过滤数据。
7. 测试示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
StorageEngine storage = new StorageEngine("jdbc:lightweight:memory", new MetadataManager());
LoggingManager logger = new LoggingManager("memory");
TransactionManager tx = new TransactionManager();

InsertStatement insert = new InsertStatement("users", 
    Arrays.asList("id", "data"), Arrays.asList(1, "{\"name\": \"Alice\"}"));
storage.executeInsert(insert, tx.begin(), logger);

SelectStatement select = new SelectStatement("users", null, new Condition("id", "=", 1));
List<Map<String, Object>> result = storage.executeSelect(select, tx.getCurrentTxId());
System.out.println(result); // 输出 [{id=1, data={"name": "Alice"}}]
8. 下一步展望

下一篇文章将实现“事务管理器”,详细设计MVCC机制,确保并发一致性,并与存储引擎和日志管理器深度集成。


总结

第四篇实现了存储引擎,支持内存和文件存储、B+树索引和JSON数据类型,通过MVCC和WAL日志确保一致性和持久性。Mermaid图展示了与相关模块的交互流程,为后续事务管理奠定了基础。

updatedupdated2025-03-312025-03-31