基于Java的高性能轻量化数据库设计与实现 第五篇:事务管理器实现

1. 引言

在前四篇中,我们完成了JDBC驱动和存储引擎的实现,支持了SQL执行、数据存储和索引管理。本篇将聚焦事务管理器(transaction-manager模块)的实现,设计MVCC机制,确保事务的隔离性、一致性和持久性。事务管理器将与存储引擎协作管理数据版本,并通过日志管理器记录WAL日志,实现高并发和故障恢复能力。

2. 事务管理器的目标与功能
2.1 目标
  • 实现ACID事务支持,符合SQL ANSI 92标准。
  • 使用MVCC提供高效的并发控制,避免锁竞争。
  • 确保数据一致性和持久性,与存储引擎和日志管理器集成。
  • 轻量化设计,适合嵌入式场景。
2.2 功能
  • 事务开始:分配事务ID,初始化事务状态。
  • 版本管理:通过MVCC维护数据的多版本。
  • 提交与回滚:处理事务的提交或回滚,更新可见版本。
  • 并发控制:支持读写并发,确保隔离性。
3. 事务管理器的核心设计
3.1 数据结构
  • 事务状态

    • 使用Transaction类记录事务信息。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    public class Transaction {
        private final long txId;
        private boolean active;
        private List<Row> undoLog; // 用于回滚
    
        public Transaction(long txId) {
            this.txId = txId;
            this.active = true;
            this.undoLog = new ArrayList<>();
        }
        // getter和setter
    }
    
  • 全局状态

    • 使用Map跟踪活动事务和全局版本号。
3.2 MVCC机制
  • 每行数据(Row)包含versiontxId
    • version:表示当前版本号,提交后递增。
    • txId:创建或修改该版本的事务ID。
  • 可见性规则:
    • 读操作:只读取txId <= 当前事务IDversion > 0的行。
    • 写操作:创建新版本,保留旧版本供回滚。
3.3 WAL日志
  • 在写操作前记录WAL日志,确保故障后可恢复。
4. 事务管理器实现

以下是TransactionManager类的核心实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.yinlongfei.lightweight.database.transaction;

import com.yinlongfei.lightweight.database.storage.Row;
import com.yinlongfei.lightweight.database.storage.StorageEngine;

import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

public class TransactionManager {
    private final AtomicLong txIdGenerator = new AtomicLong(0);
    private final Map<Long, Transaction> activeTransactions = new HashMap<>();
    private final StorageEngine storage;
    private final LoggingManager logger;

    public TransactionManager(StorageEngine storage, LoggingManager logger) {
        this.storage = storage;
        this.logger = logger;
    }

    public long begin() {
        long txId = txIdGenerator.incrementAndGet();
        activeTransactions.put(txId, new Transaction(txId));
        logger.info("Transaction " + txId + " begun");
        return txId;
    }

    public void commit(long txId) {
        Transaction tx = activeTransactions.get(txId);
        if (tx == null || !tx.isActive()) {
            throw new IllegalStateException("Transaction " + txId + " not active");
        }
        tx.setActive(false);
        activeTransactions.remove(txId);
        logger.info("Transaction " + txId + " committed");
        // 存储引擎持久化(WAL已记录,无需额外操作)
        if (!storage.isMemoryMode()) storage.flushToDisk();
    }

    public void rollback(long txId) {
        Transaction tx = activeTransactions.get(txId);
        if (tx == null || !tx.isActive()) {
            throw new IllegalStateException("Transaction " + txId + " not active");
        }
        for (Row oldRow : tx.getUndoLog()) {
            storage.restoreRow(oldRow); // 恢复旧版本
        }
        tx.setActive(false);
        activeTransactions.remove(txId);
        logger.info("Transaction " + txId + " rolled back");
    }

    public long getCurrentTxId() {
        // 返回最新的已提交事务ID,用于快照隔离
        return txIdGenerator.get();
    }

    public void addToUndoLog(Row oldRow, long txId) {
        Transaction tx = activeTransactions.get(txId);
        if (tx != null && tx.isActive()) {
            tx.getUndoLog().add(oldRow);
        }
    }

    public boolean isVisible(Row row, long txId) {
        return row.getTxId() <= txId && row.getVersion() > 0;
    }
}
5. 与存储引擎的集成

调整StorageEngine以支持MVCC和事务管理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.yinlongfei.lightweight.database.storage;

public class StorageEngine {
    // ... 其他字段和方法保持不变 ...

    private final TransactionManager txManager;

    public StorageEngine(String url, MetadataManager metadataManager, TransactionManager txManager) {
        this.metadataManager = metadataManager;
        this.dataDir = Path.of(url.replace("jdbc:lightweight:", ""));
        this.isMemoryMode = url.contains("memory");
        this.txManager = txManager;
        // 初始化代码
    }

    public synchronized void executeInsert(InsertStatement stmt, long txId, LoggingManager logger) {
        String tableName = stmt.getTableName();
        List<Row> table = memoryTables.computeIfAbsent(tableName, k -> new ArrayList<>());
        Map<String, Object> rowData = new HashMap<>();
        for (int i = 0; i < stmt.getColumns().size(); i++) {
            rowData.put(stmt.getColumns().get(i), stmt.getValues().get(i));
        }
        Row row = new Row(rowData, 1, txId);
        table.add(row);
        updateIndexes(tableName, row);
        if (!isMemoryMode) logger.logWAL("INSERT INTO " + tableName + " VALUES " + rowData);
    }

    public List<Map<String, Object>> executeSelect(SelectStatement stmt, long txId) {
        String tableName = stmt.getTableName();
        List<Row> table = memoryTables.getOrDefault(tableName, Collections.emptyList());
        return table.stream()
                .filter(row -> txManager.isVisible(row, txId)) // MVCC可见性检查
                .filter(row -> evaluateCondition(row, stmt.getWhereClause()))
                .map(row -> new HashMap<>(row.getColumns()))
                .collect(Collectors.toList());
    }

    public void restoreRow(Row oldRow) {
        String tableName = metadataManager.getTableNameForRow(oldRow);
        List<Row> table = memoryTables.get(tableName);
        table.removeIf(row -> row.getTxId() == oldRow.getTxId());
        table.add(oldRow);
    }

    // ... 其他方法 ...
}
6. 执行流程图

事务管理器的执行流程:

sequenceDiagram
    participant C as 客户端
    participant J as JDBC驱动
    participant S as 存储引擎
    participant T as 事务管理器
    participant L as 日志管理器

    C->>J: begin transaction
    J->>T: begin()
    T-->>J: txId
    C->>J: executeUpdate("INSERT ...")
    J->>S: executeInsert(stmt, txId)
    S->>L: logWAL("INSERT ...")
    C->>J: commit
    J->>T: commit(txId)
    T->>S: flushToDisk()
    T->>L: info("Transaction committed")
  • 流程说明
    1. 客户端开始事务,获取txId
    2. 执行插入操作,记录WAL日志。
    3. 提交事务,持久化数据并记录日志。
7. 测试示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
StorageEngine storage = new StorageEngine("jdbc:lightweight:memory", new MetadataManager(), txManager);
LoggingManager logger = new LoggingManager("memory");
TransactionManager txManager = new TransactionManager(storage, logger);

long txId = txManager.begin();
InsertStatement insert = new InsertStatement("users", 
    Arrays.asList("id", "data"), Arrays.asList(1, "{\"name\": \"Alice\"}"));
storage.executeInsert(insert, txId, logger);

SelectStatement select = new SelectStatement("users", null, new Condition("id", "=", 1));
List<Map<String, Object>> result = storage.executeSelect(select, txManager.getCurrentTxId());
System.out.println(result); // 输出 [{id=1, data={"name": "Alice"}}]

txManager.commit(txId);
8. 下一步展望

下一篇文章将实现“查询优化器”,设计基于成本的优化策略,提升复杂查询性能,并与执行引擎和存储引擎集成。


总结

第五篇实现了事务管理器,通过MVCC机制支持并发事务,确保ACID属性。Mermaid图展示了事务执行流程,与存储引擎和日志管理器的集成增强了一致性和持久性。后续将优化查询性能。

updatedupdated2025-03-312025-03-31