在前四篇中,我们完成了JDBC驱动和存储引擎的实现,支持了SQL执行、数据存储和索引管理。本篇将聚焦事务管理器(transaction-manager
模块)的实现,设计MVCC机制,确保事务的隔离性、一致性和持久性。事务管理器将与存储引擎协作管理数据版本,并通过日志管理器记录WAL日志,实现高并发和故障恢复能力。
- 实现ACID事务支持,符合SQL ANSI 92标准。
- 使用MVCC提供高效的并发控制,避免锁竞争。
- 确保数据一致性和持久性,与存储引擎和日志管理器集成。
- 轻量化设计,适合嵌入式场景。
- 事务开始:分配事务ID,初始化事务状态。
- 版本管理:通过MVCC维护数据的多版本。
- 提交与回滚:处理事务的提交或回滚,更新可见版本。
- 并发控制:支持读写并发,确保隔离性。
事务状态:
1
2
3
4
5
6
7
8
9
10
11
12
| public class Transaction {
private final long txId;
private boolean active;
private List<Row> undoLog; // 用于回滚
public Transaction(long txId) {
this.txId = txId;
this.active = true;
this.undoLog = new ArrayList<>();
}
// getter和setter
}
|
全局状态:
- 每行数据(
Row
)包含version
和txId
:version
:表示当前版本号,提交后递增。txId
:创建或修改该版本的事务ID。
- 可见性规则:
- 读操作:只读取
txId <= 当前事务ID
且version > 0
的行。 - 写操作:创建新版本,保留旧版本供回滚。
以下是TransactionManager
类的核心实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| package com.yinlongfei.lightweight.database.transaction;
import com.yinlongfei.lightweight.database.storage.Row;
import com.yinlongfei.lightweight.database.storage.StorageEngine;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class TransactionManager {
private final AtomicLong txIdGenerator = new AtomicLong(0);
private final Map<Long, Transaction> activeTransactions = new HashMap<>();
private final StorageEngine storage;
private final LoggingManager logger;
public TransactionManager(StorageEngine storage, LoggingManager logger) {
this.storage = storage;
this.logger = logger;
}
public long begin() {
long txId = txIdGenerator.incrementAndGet();
activeTransactions.put(txId, new Transaction(txId));
logger.info("Transaction " + txId + " begun");
return txId;
}
public void commit(long txId) {
Transaction tx = activeTransactions.get(txId);
if (tx == null || !tx.isActive()) {
throw new IllegalStateException("Transaction " + txId + " not active");
}
tx.setActive(false);
activeTransactions.remove(txId);
logger.info("Transaction " + txId + " committed");
// 存储引擎持久化(WAL已记录,无需额外操作)
if (!storage.isMemoryMode()) storage.flushToDisk();
}
public void rollback(long txId) {
Transaction tx = activeTransactions.get(txId);
if (tx == null || !tx.isActive()) {
throw new IllegalStateException("Transaction " + txId + " not active");
}
for (Row oldRow : tx.getUndoLog()) {
storage.restoreRow(oldRow); // 恢复旧版本
}
tx.setActive(false);
activeTransactions.remove(txId);
logger.info("Transaction " + txId + " rolled back");
}
public long getCurrentTxId() {
// 返回最新的已提交事务ID,用于快照隔离
return txIdGenerator.get();
}
public void addToUndoLog(Row oldRow, long txId) {
Transaction tx = activeTransactions.get(txId);
if (tx != null && tx.isActive()) {
tx.getUndoLog().add(oldRow);
}
}
public boolean isVisible(Row row, long txId) {
return row.getTxId() <= txId && row.getVersion() > 0;
}
}
|
调整StorageEngine
以支持MVCC和事务管理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
| package com.yinlongfei.lightweight.database.storage;
public class StorageEngine {
// ... 其他字段和方法保持不变 ...
private final TransactionManager txManager;
public StorageEngine(String url, MetadataManager metadataManager, TransactionManager txManager) {
this.metadataManager = metadataManager;
this.dataDir = Path.of(url.replace("jdbc:lightweight:", ""));
this.isMemoryMode = url.contains("memory");
this.txManager = txManager;
// 初始化代码
}
public synchronized void executeInsert(InsertStatement stmt, long txId, LoggingManager logger) {
String tableName = stmt.getTableName();
List<Row> table = memoryTables.computeIfAbsent(tableName, k -> new ArrayList<>());
Map<String, Object> rowData = new HashMap<>();
for (int i = 0; i < stmt.getColumns().size(); i++) {
rowData.put(stmt.getColumns().get(i), stmt.getValues().get(i));
}
Row row = new Row(rowData, 1, txId);
table.add(row);
updateIndexes(tableName, row);
if (!isMemoryMode) logger.logWAL("INSERT INTO " + tableName + " VALUES " + rowData);
}
public List<Map<String, Object>> executeSelect(SelectStatement stmt, long txId) {
String tableName = stmt.getTableName();
List<Row> table = memoryTables.getOrDefault(tableName, Collections.emptyList());
return table.stream()
.filter(row -> txManager.isVisible(row, txId)) // MVCC可见性检查
.filter(row -> evaluateCondition(row, stmt.getWhereClause()))
.map(row -> new HashMap<>(row.getColumns()))
.collect(Collectors.toList());
}
public void restoreRow(Row oldRow) {
String tableName = metadataManager.getTableNameForRow(oldRow);
List<Row> table = memoryTables.get(tableName);
table.removeIf(row -> row.getTxId() == oldRow.getTxId());
table.add(oldRow);
}
// ... 其他方法 ...
}
|
事务管理器的执行流程:
sequenceDiagram
participant C as 客户端
participant J as JDBC驱动
participant S as 存储引擎
participant T as 事务管理器
participant L as 日志管理器
C->>J: begin transaction
J->>T: begin()
T-->>J: txId
C->>J: executeUpdate("INSERT ...")
J->>S: executeInsert(stmt, txId)
S->>L: logWAL("INSERT ...")
C->>J: commit
J->>T: commit(txId)
T->>S: flushToDisk()
T->>L: info("Transaction committed")
- 流程说明:
- 客户端开始事务,获取
txId
。 - 执行插入操作,记录WAL日志。
- 提交事务,持久化数据并记录日志。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| StorageEngine storage = new StorageEngine("jdbc:lightweight:memory", new MetadataManager(), txManager);
LoggingManager logger = new LoggingManager("memory");
TransactionManager txManager = new TransactionManager(storage, logger);
long txId = txManager.begin();
InsertStatement insert = new InsertStatement("users",
Arrays.asList("id", "data"), Arrays.asList(1, "{\"name\": \"Alice\"}"));
storage.executeInsert(insert, txId, logger);
SelectStatement select = new SelectStatement("users", null, new Condition("id", "=", 1));
List<Map<String, Object>> result = storage.executeSelect(select, txManager.getCurrentTxId());
System.out.println(result); // 输出 [{id=1, data={"name": "Alice"}}]
txManager.commit(txId);
|
下一篇文章将实现“查询优化器”,设计基于成本的优化策略,提升复杂查询性能,并与执行引擎和存储引擎集成。
第五篇实现了事务管理器,通过MVCC机制支持并发事务,确保ACID属性。Mermaid图展示了事务执行流程,与存储引擎和日志管理器的集成增强了一致性和持久性。后续将优化查询性能。