From 1eb0a63d3cc6e1ee29d1e77bee9e788038fd691d Mon Sep 17 00:00:00 2001 From: Lukas Lalinsky Date: Thu, 5 Dec 2024 22:15:07 +0100 Subject: [PATCH 1/2] IndexReader API for consistent view of the index over multiple operations --- src/Index.zig | 112 +++++--------------------------------------- src/IndexReader.zig | 86 ++++++++++++++++++++++++++++++++++ src/server.zig | 19 ++++++-- 3 files changed, 111 insertions(+), 106 deletions(-) create mode 100644 src/IndexReader.zig diff --git a/src/Index.zig b/src/Index.zig index 13ea873..b1c8b19 100644 --- a/src/Index.zig +++ b/src/Index.zig @@ -24,6 +24,7 @@ const FileSegment = @import("FileSegment.zig"); const FileSegmentList = SegmentList(FileSegment); const FileSegmentNode = FileSegmentList.Node; +const IndexReader = @import("IndexReader.zig"); const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr; const SegmentMerger = @import("segment_merger.zig").SegmentMerger; @@ -180,8 +181,8 @@ fn loadSegments(self: *Self, create: bool) !u64 { } fn doCheckpoint(self: *Self) !bool { - var snapshot = self.acquireSegments(); - defer self.releaseSegments(&snapshot); + var snapshot = self.acquireReader(); + defer self.releaseReader(&snapshot); const source = snapshot.memory_segments.value.getFirst() orelse return false; if (source.value.getSize() < self.options.min_segment_size) { @@ -432,117 +433,26 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo } } -const SegmentsSnapshot = struct { - file_segments: SharedPtr(FileSegmentList), - memory_segments: SharedPtr(MemorySegmentList), -}; - -// Get the current segments lists and make sure they won't get deleted. -fn acquireSegments(self: *Self) SegmentsSnapshot { +pub fn acquireReader(self: *Self) IndexReader { self.segments_lock.lockShared(); defer self.segments_lock.unlockShared(); - return .{ + return IndexReader{ .file_segments = self.file_segments.segments.acquire(), .memory_segments = self.memory_segments.segments.acquire(), }; } -// Release the previously acquired segments lists, they will get deleted if no longer needed. -fn releaseSegments(self: *Self, segments: *SegmentsSnapshot) void { - MemorySegmentList.destroySegments(self.allocator, &segments.memory_segments); - FileSegmentList.destroySegments(self.allocator, &segments.file_segments); +pub fn releaseReader(self: *Self, reader: *IndexReader) void { + MemorySegmentList.destroySegments(self.allocator, &reader.memory_segments); + FileSegmentList.destroySegments(self.allocator, &reader.file_segments); } -const segment_lists = [_][]const u8{ - "file_segments", - "memory_segments", -}; - pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults { - const sorted_hashes = try allocator.dupe(u32, hashes); - defer allocator.free(sorted_hashes); - std.sort.pdq(u32, sorted_hashes, {}, std.sort.asc(u32)); - - var results = SearchResults.init(allocator); - errdefer results.deinit(); - - var snapshot = self.acquireSegments(); - defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread - - inline for (segment_lists) |n| { - const segments = @field(snapshot, n); - try segments.value.search(sorted_hashes, &results, deadline); - } - - results.sort(); - - return results; -} - -pub fn getDocInfo(self: *Self, doc_id: u32) !?DocInfo { - var snapshot = self.acquireSegments(); - defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread - - var result: ?DocInfo = null; - inline for (segment_lists) |n| { - const segments = @field(snapshot, n); - if (segments.value.getDocInfo(doc_id)) |res| { - result = res; - } - } - if (result) |res| { - if (!res.deleted) { - return res; - } - } - return null; -} + var reader = self.acquireReader(); + defer self.releaseReader(&reader); -pub const IndexInfo = struct { - version: u64, - attributes: std.StringHashMapUnmanaged(u64), - - pub fn deinit(self: *IndexInfo, allocator: std.mem.Allocator) void { - self.attributes.deinit(allocator); - } -}; - -pub fn getInfo(self: *Self, allocator: std.mem.Allocator) !IndexInfo { - var snapshot = self.acquireSegments(); - defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread - - var attributes: std.StringHashMapUnmanaged(u64) = .{}; - errdefer { - var iter = attributes.iterator(); - while (iter.next()) |e| { - allocator.free(e.key_ptr.*); - } - attributes.deinit(allocator); - } - - var version: u64 = 0; - inline for (segment_lists) |n| { - const segments = @field(snapshot, n); - for (segments.value.nodes.items) |node| { - var iter = node.value.attributes.iterator(); - while (iter.next()) |entry| { - const result = try attributes.getOrPut(allocator, entry.key_ptr.*); - if (!result.found_existing) { - errdefer attributes.removeByPtr(entry.key_ptr); - result.key_ptr.* = try allocator.dupe(u8, entry.key_ptr.*); - } - result.value_ptr.* = entry.value_ptr.*; - } - std.debug.assert(node.value.info.version > version); - version = node.value.info.version; - } - } - - return .{ - .version = version, - .attributes = attributes, - }; + return reader.search(hashes, allocator, deadline); } test { diff --git a/src/IndexReader.zig b/src/IndexReader.zig new file mode 100644 index 0000000..190fe34 --- /dev/null +++ b/src/IndexReader.zig @@ -0,0 +1,86 @@ +const std = @import("std"); + +const Self = @This(); + +const Deadline = @import("utils/Deadline.zig"); +const SearchResults = @import("common.zig").SearchResults; +const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr; +const DocInfo = @import("common.zig").DocInfo; + +const SegmentList = @import("segment_list.zig").SegmentList; + +const FileSegment = @import("FileSegment.zig"); +const FileSegmentList = SegmentList(FileSegment); + +const MemorySegment = @import("MemorySegment.zig"); +const MemorySegmentList = SegmentList(MemorySegment); + +const segment_lists = [_][]const u8{ + "file_segments", + "memory_segments", +}; + +file_segments: SharedPtr(FileSegmentList), +memory_segments: SharedPtr(MemorySegmentList), + +pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults { + const sorted_hashes = try allocator.dupe(u32, hashes); + defer allocator.free(sorted_hashes); + std.sort.pdq(u32, sorted_hashes, {}, std.sort.asc(u32)); + + var results = SearchResults.init(allocator); + errdefer results.deinit(); + + inline for (segment_lists) |n| { + const segments = @field(self, n); + try segments.value.search(sorted_hashes, &results, deadline); + } + + results.sort(); + + return results; +} + +pub fn getDocInfo(self: *Self, doc_id: u32) !?DocInfo { + // TODO optimize, read from the end + var result: ?DocInfo = null; + inline for (segment_lists) |n| { + const segments = @field(self, n); + if (segments.value.getDocInfo(doc_id)) |res| { + result = res; + } + } + if (result) |res| { + if (!res.deleted) { + return res; + } + } + return null; +} + +pub fn getVersion(self: *Self) u64 { + if (self.memory_segments.value.getLast()) |node| { + return node.value.info.version; + } + if (self.file_segments.value.getLast()) |node| { + return node.value.info.version; + } + return 0; +} + +pub fn getAttributes(self: *Self, allocator: std.mem.Allocator) !std.StringHashMapUnmanaged(u64) { + var attributes: std.StringHashMapUnmanaged(u64) = .{}; + errdefer attributes.deinit(allocator); + + inline for (segment_lists) |n| { + const segments = @field(self, n); + for (segments.value.nodes.items) |node| { + var iter = node.value.attributes.iterator(); + while (iter.next()) |entry| { + try attributes.put(allocator, entry.key_ptr.*, entry.value_ptr.*); + } + } + } + + return attributes; +} diff --git a/src/server.zig b/src/server.zig index 8c1fe5b..fdbd8dd 100644 --- a/src/server.zig +++ b/src/server.zig @@ -315,8 +315,11 @@ fn handleHeadFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Respons const index = &index_ref.index; defer releaseIndex(ctx, index_ref); + var index_reader = index.acquireReader(); + defer index.releaseReader(&index_reader); + const id = try getId(req, res, false) orelse return; - const info = try index.getDocInfo(id); + const info = try index_reader.getDocInfo(id); res.status = if (info == null) 404 else 200; } @@ -334,8 +337,11 @@ fn handleGetFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response const index = &index_ref.index; defer releaseIndex(ctx, index_ref); + var index_reader = index.acquireReader(); + defer index.releaseReader(&index_reader); + const id = try getId(req, res, true) orelse return; - const info = try index.getDocInfo(id) orelse { + const info = try index_reader.getDocInfo(id) orelse { return writeErrorResponse(404, error.FingerprintNotFound, req, res); }; @@ -421,13 +427,16 @@ const GetIndexResponse = struct { fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { const index_ref = try getIndex(ctx, req, res, true) orelse return; + const index = &index_ref.index; defer releaseIndex(ctx, index_ref); - const info = try index_ref.index.getInfo(req.arena); + var index_reader = index.acquireReader(); + defer index.releaseReader(&index_reader); + const response = GetIndexResponse{ - .version = info.version, + .version = index_reader.getVersion(), .attributes = .{ - .attributes = info.attributes, + .attributes = try index_reader.getAttributes(req.arena), }, }; return writeResponse(response, req, res); From 16942c9955774176a960e000ff7b763b75f2f13b Mon Sep 17 00:00:00 2001 From: Lukas Lalinsky Date: Thu, 5 Dec 2024 22:24:23 +0100 Subject: [PATCH 2/2] Simpler index getting API for the server --- src/MultiIndex.zig | 16 +++++++++------ src/server.zig | 49 ++++++++++++++++++++-------------------------- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/src/MultiIndex.zig b/src/MultiIndex.zig index 4b5e708..e52fa65 100644 --- a/src/MultiIndex.zig +++ b/src/MultiIndex.zig @@ -114,14 +114,18 @@ fn removeIndex(self: *Self, name: []const u8) void { } } -pub fn releaseIndex(self: *Self, index_ref: *IndexRef) void { +fn releaseIndexRef(self: *Self, index_ref: *IndexRef) void { self.lock.lock(); defer self.lock.unlock(); _ = index_ref.decRef(); } -pub fn acquireIndex(self: *Self, name: []const u8) !*IndexRef { +pub fn releaseIndex(self: *Self, index: *Index) void { + self.releaseIndexRef(@fieldParentPtr("index", index)); +} + +fn acquireIndex(self: *Self, name: []const u8) !*IndexRef { if (!isValidName(name)) { return error.InvalidIndexName; } @@ -149,20 +153,20 @@ pub fn acquireIndex(self: *Self, name: []const u8) !*IndexRef { return result.value_ptr; } -pub fn getIndex(self: *Self, name: []const u8) !*IndexRef { +pub fn getIndex(self: *Self, name: []const u8) !*Index { const index_ref = try self.acquireIndex(name); - errdefer self.releaseIndex(index_ref); + errdefer self.releaseIndexRef(index_ref); try index_ref.ensureOpen(false); - return index_ref; + return &index_ref.index; } pub fn createIndex(self: *Self, name: []const u8) !void { log.info("creating index {s}", .{name}); const index_ref = try self.acquireIndex(name); - defer self.releaseIndex(index_ref); + defer self.releaseIndexRef(index_ref); try index_ref.ensureOpen(true); } diff --git a/src/server.zig b/src/server.zig index fdbd8dd..0b81acc 100644 --- a/src/server.zig +++ b/src/server.zig @@ -6,7 +6,7 @@ const log = std.log.scoped(.server); const msgpack = @import("msgpack"); const MultiIndex = @import("MultiIndex.zig"); -const IndexData = MultiIndex.IndexRef; +const Index = @import("Index.zig"); const common = @import("common.zig"); const SearchResults = common.SearchResults; const Change = @import("change.zig").Change; @@ -138,7 +138,7 @@ fn getId(req: *httpz.Request, res: *httpz.Response, send_body: bool) !?u32 { }; } -fn getIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response, send_body: bool) !?*IndexData { +fn getIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response, send_body: bool) !?*Index { const index_name = req.param("index") orelse { if (send_body) { try writeErrorResponse(400, error.MissingIndexName, req, res); @@ -161,7 +161,7 @@ fn getIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response, send_body: return index; } -fn releaseIndex(ctx: *Context, index: *IndexData) void { +fn releaseIndex(ctx: *Context, index: *Index) void { ctx.indexes.releaseIndex(index); } @@ -262,9 +262,8 @@ fn getRequestBody(comptime T: type, req: *httpz.Request, res: *httpz.Response) ! fn handleSearch(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { const body = try getRequestBody(SearchRequestJSON, req, res) orelse return; - const index_ref = try getIndex(ctx, req, res, true) orelse return; - const index = &index_ref.index; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, true) orelse return; + defer releaseIndex(ctx, index); var timeout = body.timeout; if (timeout == 0) { @@ -299,9 +298,8 @@ const UpdateRequestJSON = struct { fn handleUpdate(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { const body = try getRequestBody(UpdateRequestJSON, req, res) orelse return; - const index_ref = try getIndex(ctx, req, res, true) orelse return; - const index = &index_ref.index; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, true) orelse return; + defer releaseIndex(ctx, index); metrics.update(body.changes.len); @@ -311,9 +309,8 @@ fn handleUpdate(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void } fn handleHeadFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { - const index_ref = try getIndex(ctx, req, res, false) orelse return; - const index = &index_ref.index; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, false) orelse return; + defer releaseIndex(ctx, index); var index_reader = index.acquireReader(); defer index.releaseReader(&index_reader); @@ -333,9 +330,8 @@ const GetFingerprintResponse = struct { }; fn handleGetFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { - const index_ref = try getIndex(ctx, req, res, true) orelse return; - const index = &index_ref.index; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, true) orelse return; + defer releaseIndex(ctx, index); var index_reader = index.acquireReader(); defer index.releaseReader(&index_reader); @@ -359,9 +355,8 @@ const PutFingerprintRequest = struct { fn handlePutFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { const body = try getRequestBody(PutFingerprintRequest, req, res) orelse return; - const index_ref = try getIndex(ctx, req, res, true) orelse return; - const index = &index_ref.index; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, true) orelse return; + defer releaseIndex(ctx, index); const id = try getId(req, res, true) orelse return; const change: Change = .{ .insert = .{ @@ -377,9 +372,8 @@ fn handlePutFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response } fn handleDeleteFingerprint(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { - const index_ref = try getIndex(ctx, req, res, true) orelse return; - const index = &index_ref.index; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, true) orelse return; + defer releaseIndex(ctx, index); const id = try getId(req, res, true) orelse return; const change: Change = .{ .delete = .{ @@ -426,9 +420,8 @@ const GetIndexResponse = struct { }; fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { - const index_ref = try getIndex(ctx, req, res, true) orelse return; - const index = &index_ref.index; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, true) orelse return; + defer releaseIndex(ctx, index); var index_reader = index.acquireReader(); defer index.releaseReader(&index_reader); @@ -461,13 +454,13 @@ fn handleDeleteIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) ! } fn handleHeadIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { - const index_ref = try getIndex(ctx, req, res, false) orelse return; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, false) orelse return; + defer releaseIndex(ctx, index); } fn handlePingIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void { - const index_ref = try getIndex(ctx, req, res, false) orelse return; - defer releaseIndex(ctx, index_ref); + const index = try getIndex(ctx, req, res, false) orelse return; + defer releaseIndex(ctx, index); try handlePing(ctx, req, res); }