Skip to content

Commit 0965077

Browse files
committed
WIP
1 parent 8569ac3 commit 0965077

7 files changed

+121
-76
lines changed

src/Index.zig

+89-39
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ const SearchResult = @import("common.zig").SearchResult;
99
const SearchResults = @import("common.zig").SearchResults;
1010
const SegmentID = @import("common.zig").SegmentID;
1111

12+
const SegmentMergeOptions = @import("segment_list.zig").SegmentMergeOptions;
13+
1214
const Oplog = @import("Oplog.zig");
1315

1416
const MemorySegment = @import("MemorySegment.zig");
@@ -68,6 +70,11 @@ file_segment_merge_condition: std.Thread.Condition = .{},
6870
file_segment_merge_stop: bool = false,
6971
file_segment_merge_thread: ?std.Thread = null,
7072

73+
memory_segment_merge_mutex: std.Thread.Mutex = .{},
74+
memory_segment_merge_condition: std.Thread.Condition = .{},
75+
memory_segment_merge_stop: bool = false,
76+
memory_segment_merge_thread: ?std.Thread = null,
77+
7178
pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir, options: Options) !Self {
7279
var data_dir = try dir.makeOpenPath("data", .{ .iterate = true });
7380
errdefer data_dir.close();
@@ -90,6 +97,7 @@ pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir, options: Options) !Se
9097
pub fn deinit(self: *Self) void {
9198
self.stopCheckpointThread();
9299
self.stopFileSegmentMergeThread();
100+
self.stopFileSegmentMergeThread();
93101

94102
self.oplog.deinit();
95103
self.memory_segments.deinit();
@@ -115,7 +123,11 @@ fn prepareMemorySegmentMerge(self: *Self) !?MemorySegmentList.PreparedMerge {
115123
self.segments_lock.lockShared();
116124
defer self.segments_lock.unlockShared();
117125

118-
const merge = try self.memory_segments.prepareMerge(.{ .max_segment_size = self.options.min_segment_size }) orelse return null;
126+
const options = SegmentMergeOptions{
127+
.max_segment_size = self.options.min_segment_size,
128+
};
129+
130+
const merge = try self.memory_segments.prepareMerge(options) orelse return null;
119131
errdefer self.memory_segments.destroySegment(merge.target);
120132

121133
try merge.target.data.merge(&merge.sources.node1.data, &merge.sources.node2.data, &self.memory_segments);
@@ -134,6 +146,10 @@ fn finishMemorySegmentMerge(self: *Self, merge: MemorySegmentList.PreparedMerge)
134146

135147
self.flattenMemorySegmentIds();
136148

149+
if (merge.target.data.getSize() > self.options.min_segment_size / 2) {
150+
log.info("performed big memory merge, size={}", .{merge.target.data.getSize()});
151+
}
152+
137153
return merge.target.data.getSize() >= self.options.min_segment_size;
138154
}
139155

@@ -226,9 +242,12 @@ fn loadSegment(self: *Self, segment_id: SegmentID) !void {
226242

227243
fn loadSegments(self: *Self) !void {
228244
const segment_ids = filefmt.readIndexFile(self.data_dir, self.allocator) catch |err| {
229-
if (err == error.FileNotFound and self.options.create) {
230-
try filefmt.writeIndexFile(self.data_dir, &[_]SegmentID{});
231-
return;
245+
if (err == error.FileNotFound) {
246+
if (self.options.create) {
247+
try filefmt.writeIndexFile(self.data_dir, &[_]SegmentID{});
248+
return;
249+
}
250+
return error.IndexNotFound;
232251
}
233252
return err;
234253
};
@@ -288,41 +307,30 @@ fn doCheckpoint(self: *Self) !bool {
288307
}
289308

290309
fn checkpointThreadFn(self: *Self) void {
291-
const min_delay = std.time.ns_per_s;
292-
const max_delay = std.time.ns_per_s * 60;
293-
var delay: u64 = min_delay;
294-
var retries: u32 = 0;
295-
296310
while (true) {
297311
self.checkpoint_mutex.lock();
298312
defer self.checkpoint_mutex.unlock();
299313

300314
if (self.checkpoint_stop) return;
301315

302-
var wait: bool = true;
303-
304316
if (self.doCheckpoint()) |successful| {
305-
delay = min_delay;
306-
retries = 0;
307317
if (successful) {
308-
wait = false;
309318
self.file_segment_merge_condition.signal();
319+
continue;
310320
}
311321
} else |err| {
312-
delay = @min(delay * 110 / 100, max_delay);
313-
retries += 1;
314-
log.err("checkpoint failed: {} (retry {})", .{ err, retries });
322+
log.err("checkpoint failed: {}", .{err});
315323
}
316324

317-
if (wait) {
318-
self.checkpoint_condition.timedWait(&self.checkpoint_mutex, delay) catch {};
319-
}
325+
self.checkpoint_condition.timedWait(&self.checkpoint_mutex, std.time.ns_per_min) catch continue;
320326
}
321327
}
322328

323329
fn startCheckpointThread(self: *Self) !void {
324330
if (self.checkpoint_thread != null) return;
325331

332+
log.info("starting checkpoint thread", .{});
333+
326334
self.checkpoint_mutex.lock();
327335
self.checkpoint_stop = false;
328336
self.checkpoint_mutex.unlock();
@@ -344,40 +352,29 @@ fn stopCheckpointThread(self: *Self) void {
344352
}
345353

346354
fn fileSegmentMergeThreadFn(self: *Self) void {
347-
const min_delay = std.time.ns_per_s;
348-
const max_delay = std.time.ns_per_s * 60;
349-
var delay: u64 = min_delay;
350-
var retries: u32 = 0;
351-
352355
while (true) {
353356
self.file_segment_merge_mutex.lock();
354357
defer self.file_segment_merge_mutex.unlock();
355358

356359
if (self.file_segment_merge_stop) return;
357360

358-
var wait: bool = true;
359-
360361
if (self.maybeMergeFileSegments()) |successful| {
361-
delay = min_delay;
362-
retries = 0;
363362
if (successful) {
364-
wait = false;
363+
continue;
365364
}
366365
} else |err| {
367-
delay = @min(delay * 110 / 100, max_delay);
368-
retries += 1;
369-
log.err("file segment merge failed: {} (retry {})", .{ err, retries });
366+
log.err("file segment merge failed: {}", .{err});
370367
}
371368

372-
if (wait) {
373-
self.checkpoint_condition.timedWait(&self.file_segment_merge_mutex, delay) catch {};
374-
}
369+
self.file_segment_merge_condition.timedWait(&self.file_segment_merge_mutex, std.time.ns_per_min) catch continue;
375370
}
376371
}
377372

378373
fn startFileSegmentMergeThread(self: *Self) !void {
379374
if (self.file_segment_merge_thread != null) return;
380375

376+
log.info("starting file segment merge thread", .{});
377+
381378
self.file_segment_merge_mutex.lock();
382379
self.file_segment_merge_stop = false;
383380
self.file_segment_merge_mutex.unlock();
@@ -402,7 +399,11 @@ fn prepareFileSegmentMerge(self: *Self) !?FileSegmentList.PreparedMerge {
402399
self.segments_lock.lockShared();
403400
defer self.segments_lock.unlockShared();
404401

405-
const merge = try self.file_segments.prepareMerge(.{ .max_segment_size = self.options.max_segment_size }) orelse return null;
402+
const options = SegmentMergeOptions{
403+
.max_segment_size = self.options.max_segment_size,
404+
};
405+
406+
const merge = try self.file_segments.prepareMerge(options) orelse return null;
406407
errdefer self.file_segments.destroySegment(merge.target);
407408

408409
var merger = SegmentMerger(FileSegment).init(self.allocator, &self.file_segments);
@@ -470,9 +471,55 @@ pub fn maybeMergeFileSegments(self: *Self) !bool {
470471
return true;
471472
}
472473

474+
fn memorySegmentMergeThreadFn(self: *Self) void {
475+
while (true) {
476+
self.memory_segment_merge_mutex.lock();
477+
defer self.memory_segment_merge_mutex.unlock();
478+
479+
if (self.memory_segment_merge_stop) return;
480+
481+
if (self.maybeMergeMemorySegments()) |successful| {
482+
if (successful) {
483+
self.checkpoint_condition.signal();
484+
continue;
485+
}
486+
} else |err| {
487+
log.err("memory segment merge failed: {}", .{err});
488+
}
489+
490+
self.memory_segment_merge_condition.timedWait(&self.memory_segment_merge_mutex, std.time.ns_per_min) catch continue;
491+
}
492+
}
493+
494+
fn startMemorySegmentMergeThread(self: *Self) !void {
495+
if (self.memory_segment_merge_thread != null) return;
496+
497+
log.info("starting memory segment merge thread", .{});
498+
499+
self.memory_segment_merge_mutex.lock();
500+
self.memory_segment_merge_stop = false;
501+
self.memory_segment_merge_mutex.unlock();
502+
503+
self.memory_segment_merge_thread = try std.Thread.spawn(.{}, memorySegmentMergeThreadFn, .{self});
504+
}
505+
506+
fn stopMemorySegmentMergeThread(self: *Self) void {
507+
self.memory_segment_merge_mutex.lock();
508+
self.memory_segment_merge_stop = true;
509+
self.memory_segment_merge_condition.broadcast();
510+
self.memory_segment_merge_mutex.unlock();
511+
512+
if (self.memory_segment_merge_thread) |thread| {
513+
thread.join();
514+
}
515+
516+
self.memory_segment_merge_thread = null;
517+
}
518+
473519
pub fn open(self: *Self) !void {
474520
try self.startCheckpointThread();
475521
try self.startFileSegmentMergeThread();
522+
try self.startMemorySegmentMergeThread();
476523
try self.loadSegments();
477524
try self.oplog.open(self.getMaxCommitId(), Updater{ .index = self });
478525
}
@@ -508,11 +555,14 @@ fn getFileSegmentIds(self: *Self) !std.ArrayList(SegmentID) {
508555
}
509556

510557
pub fn update(self: *Self, changes: []const Change) !void {
511-
const start_checkpoint = try self.maybeMergeMemorySegments();
512-
if (start_checkpoint) {
558+
//const t1 = std.time.milliTimestamp();
559+
if (try self.maybeMergeMemorySegments()) {
513560
self.checkpoint_condition.signal();
514561
}
562+
//const t2 = std.time.milliTimestamp();
515563
try self.oplog.write(changes, Updater{ .index = self });
564+
// const t3 = std.time.milliTimestamp();
565+
//log.info("merge: {}ms, update: {}ms", .{ t2 - t1, t3 - t2 });
516566
}
517567

518568
pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {

src/MultiIndex.zig

+2-11
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ const log = std.log.scoped(.multi_index);
33
const assert = std.debug.assert;
44

55
const Index = @import("Index.zig");
6-
const Scheduler = @import("utils/Scheduler.zig");
76

87
const Self = @This();
98

@@ -17,17 +16,15 @@ pub const IndexRef = struct {
1716
lock: std.Thread.Mutex = .{},
1817
allocator: std.mem.Allocator,
1918
dir: std.fs.Dir,
20-
scheduler: *Scheduler,
2119
indexes: std.AutoHashMap(u8, IndexRef),
2220

2321
const max_sub_dir_name_size = 10;
2422
const sub_dir_name_fmt = "{x:0>2}";
2523

26-
pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir, scheduler: *Scheduler) Self {
24+
pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir) Self {
2725
return .{
2826
.allocator = allocator,
2927
.dir = dir,
30-
.scheduler = scheduler,
3128
.indexes = std.AutoHashMap(u8, IndexRef).init(allocator),
3229
};
3330
}
@@ -39,7 +36,6 @@ pub fn deinit(self: *Self) void {
3936
entry.value_ptr.dir.close();
4037
}
4138
self.indexes.deinit();
42-
self.* = undefined;
4339
}
4440

4541
pub fn releaseIndex(self: *Self, index_data: *IndexRef) void {
@@ -73,12 +69,7 @@ pub fn acquireIndex(self: *Self, id: u8, create: bool) !*IndexRef {
7369
result.value_ptr.index = try Index.init(self.allocator, result.value_ptr.dir, .{ .create = create });
7470
errdefer result.value_ptr.index.deinit();
7571

76-
result.value_ptr.index.open() catch |err| {
77-
if (err == error.FileNotFound) {
78-
return error.IndexNotFound;
79-
}
80-
return err;
81-
};
72+
try result.value_ptr.index.open();
8273

8374
result.value_ptr.references = 1;
8475
result.value_ptr.last_used_at = std.time.timestamp();

src/filefmt.zig

+2
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,8 @@ pub fn readSegmentFile(dir: fs.Dir, id: SegmentVersion, segment: *FileSegment) !
438438
}
439439
if (footer.num_items != num_items) {
440440
return error.InvalidSegment;
441+
} else {
442+
segment.num_items = num_items;
441443
}
442444
if (footer.num_blocks != num_blocks) {
443445
return error.InvalidSegment;

src/index_tests.zig

+1-17
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ const common = @import("common.zig");
44
const Change = @import("change.zig").Change;
55
const SearchResults = common.SearchResults;
66

7-
const Scheduler = @import("utils/Scheduler.zig");
8-
97
const Index = @import("Index.zig");
108

119
fn generateRandomHashes(buf: []u32, seed: u64) []u32 {
@@ -25,7 +23,7 @@ test "index does not exist" {
2523
defer index.deinit();
2624

2725
const result = index.open();
28-
try std.testing.expectError(error.FileNotFound, result);
26+
try std.testing.expectError(error.IndexNotFound, result);
2927
}
3028

3129
test "index create, update and search" {
@@ -104,13 +102,6 @@ test "index many updates" {
104102
var tmp_dir = std.testing.tmpDir(.{});
105103
defer tmp_dir.cleanup();
106104

107-
var data_dir = try tmp_dir.dir.makeOpenPath("data", .{});
108-
defer data_dir.close();
109-
110-
var scheduler = Scheduler.init(std.testing.allocator);
111-
defer scheduler.deinit();
112-
try scheduler.start(1);
113-
114105
var hashes: [100]u32 = undefined;
115106

116107
{
@@ -155,13 +146,6 @@ test "index, multiple fingerprints with the same hashes" {
155146
var tmp_dir = std.testing.tmpDir(.{});
156147
defer tmp_dir.cleanup();
157148

158-
var data_dir = try tmp_dir.dir.makeOpenPath("data", .{});
159-
defer data_dir.close();
160-
161-
var scheduler = Scheduler.init(std.testing.allocator);
162-
defer scheduler.deinit();
163-
try scheduler.start(1);
164-
165149
var index = try Index.init(std.testing.allocator, tmp_dir.dir, .{ .create = true });
166150
defer index.deinit();
167151

src/main.zig

+1-8
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ const std = @import("std");
22
const zul = @import("zul");
33

44
const MultiIndex = @import("MultiIndex.zig");
5-
const Scheduler = @import("utils/Scheduler.zig");
65
const server = @import("server.zig");
76
const metrics = @import("metrics.zig");
87

@@ -59,15 +58,9 @@ pub fn main() !void {
5958

6059
try metrics.initializeMetrics(.{ .prefix = "aindex_" });
6160

62-
var scheduler = Scheduler.init(allocator);
63-
defer scheduler.deinit();
64-
65-
var indexes = MultiIndex.init(allocator, dir, &scheduler);
61+
var indexes = MultiIndex.init(allocator, dir);
6662
defer indexes.deinit();
6763

68-
try scheduler.start(4);
69-
defer scheduler.stop();
70-
7164
try server.run(allocator, &indexes, address, port, threads);
7265
}
7366

0 commit comments

Comments
 (0)