1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
| package com.yinlongfei.lightweight.database.storage;
import com.yinlongfei.lightweight.database.metadata.MetadataManager;
import com.yinlongfei.lightweight.database.transaction.TransactionManager;
import org.roaringbitmap.RoaringBitmap;
import javax.json.Json;
import javax.json.JsonObject;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;
public class StorageEngine {
private final Map<String, List<Row>> memoryTables = new HashMap<>(); // 内存表
private final Map<String, BPlusTree> indexes = new HashMap<>(); // 索引
private final MetadataManager metadataManager;
private final Path dataDir; // 文件存储目录
private boolean isMemoryMode;
public StorageEngine(String url, MetadataManager metadataManager) {
this.metadataManager = metadataManager;
this.dataDir = Path.of(url.replace("jdbc:lightweight:", ""));
this.isMemoryMode = url.contains("memory");
if (!isMemoryMode && !Files.exists(dataDir)) {
try {
Files.createDirectories(dataDir);
} catch (Exception e) {
throw new RuntimeException("Failed to create data directory", e);
}
}
}
public synchronized void executeInsert(InsertStatement stmt, long txId, LoggingManager logger) {
String tableName = stmt.getTableName();
List<Row> table = memoryTables.computeIfAbsent(tableName, k -> new ArrayList<>());
Map<String, Object> rowData = new HashMap<>();
for (int i = 0; i < stmt.getColumns().size(); i++) {
rowData.put(stmt.getColumns().get(i), stmt.getValues().get(i));
}
Row row = new Row(rowData, 1, txId);
table.add(row);
// 更新索引
updateIndexes(tableName, row);
// 记录WAL日志
if (!isMemoryMode) logger.logWAL("INSERT INTO " + tableName + " VALUES " + rowData);
}
public List<Map<String, Object>> executeSelect(SelectStatement stmt, long txId) {
String tableName = stmt.getTableName();
List<Row> table = memoryTables.getOrDefault(tableName, Collections.emptyList());
return table.stream()
.filter(row -> isVisible(row, txId)) // MVCC可见性检查
.filter(row -> evaluateCondition(row, stmt.getWhereClause()))
.map(row -> new HashMap<>(row.getColumns()))
.collect(Collectors.toList());
}
private void updateIndexes(String tableName, Row row) {
List<String> indexedColumns = metadataManager.getIndexedColumns(tableName);
for (String column : indexedColumns) {
Object key = row.getColumns().get(column);
BPlusTree index = indexes.computeIfAbsent(tableName + "_" + column, k -> new BPlusTree());
index.insert(key, row);
}
}
private boolean isVisible(Row row, long txId) {
return row.getTxId() <= txId && row.getVersion() > 0; // 简化的MVCC检查
}
private boolean evaluateCondition(Row row, Condition cond) {
if (cond == null) return true;
Object value = row.getColumns().get(cond.getColumn());
switch (cond.getOperator()) {
case ">": return ((Comparable) value).compareTo(cond.getValue()) > 0;
case "=": return value.equals(cond.getValue());
default: return false;
}
}
// 文件存储示例(简化为追加写入)
public void flushToDisk() {
if (isMemoryMode) return;
try (FileChannel channel = FileChannel.open(dataDir.resolve("data.db"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024);
for (Map.Entry<String, List<Row>> entry : memoryTables.entrySet()) {
String tableName = entry.getKey();
for (Row row : entry.getValue()) {
String data = tableName + "|" + row.getColumns().toString() + "\n";
buffer.put(data.getBytes());
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to flush to disk", e);
}
}
}
// 简化的B+树实现(实际需完整实现)
class BPlusTree {
public void insert(Object key, Row row) {
// 实现B+树插入逻辑
}
public List<Row> search(Object key) {
return Collections.emptyList(); // 实现搜索逻辑
}
}
|