Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IndexReader API for consistent view of the index over multiple operat… #41

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 11 additions & 101 deletions src/Index.zig
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const FileSegment = @import("FileSegment.zig");
const FileSegmentList = SegmentList(FileSegment);
const FileSegmentNode = FileSegmentList.Node;

const IndexReader = @import("IndexReader.zig");
const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr;

const SegmentMerger = @import("segment_merger.zig").SegmentMerger;
Expand Down Expand Up @@ -180,8 +181,8 @@ fn loadSegments(self: *Self, create: bool) !u64 {
}

fn doCheckpoint(self: *Self) !bool {
var snapshot = self.acquireSegments();
defer self.releaseSegments(&snapshot);
var snapshot = self.acquireReader();
defer self.releaseReader(&snapshot);

const source = snapshot.memory_segments.value.getFirst() orelse return false;
if (source.value.getSize() < self.options.min_segment_size) {
Expand Down Expand Up @@ -432,117 +433,26 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo
}
}

const SegmentsSnapshot = struct {
file_segments: SharedPtr(FileSegmentList),
memory_segments: SharedPtr(MemorySegmentList),
};

// Get the current segments lists and make sure they won't get deleted.
fn acquireSegments(self: *Self) SegmentsSnapshot {
pub fn acquireReader(self: *Self) IndexReader {
self.segments_lock.lockShared();
defer self.segments_lock.unlockShared();

return .{
return IndexReader{
.file_segments = self.file_segments.segments.acquire(),
.memory_segments = self.memory_segments.segments.acquire(),
};
}

// Release the previously acquired segments lists, they will get deleted if no longer needed.
fn releaseSegments(self: *Self, segments: *SegmentsSnapshot) void {
MemorySegmentList.destroySegments(self.allocator, &segments.memory_segments);
FileSegmentList.destroySegments(self.allocator, &segments.file_segments);
pub fn releaseReader(self: *Self, reader: *IndexReader) void {
MemorySegmentList.destroySegments(self.allocator, &reader.memory_segments);
FileSegmentList.destroySegments(self.allocator, &reader.file_segments);
}

const segment_lists = [_][]const u8{
"file_segments",
"memory_segments",
};

pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {
const sorted_hashes = try allocator.dupe(u32, hashes);
defer allocator.free(sorted_hashes);
std.sort.pdq(u32, sorted_hashes, {}, std.sort.asc(u32));

var results = SearchResults.init(allocator);
errdefer results.deinit();

var snapshot = self.acquireSegments();
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread

inline for (segment_lists) |n| {
const segments = @field(snapshot, n);
try segments.value.search(sorted_hashes, &results, deadline);
}

results.sort();

return results;
}

pub fn getDocInfo(self: *Self, doc_id: u32) !?DocInfo {
var snapshot = self.acquireSegments();
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread

var result: ?DocInfo = null;
inline for (segment_lists) |n| {
const segments = @field(snapshot, n);
if (segments.value.getDocInfo(doc_id)) |res| {
result = res;
}
}
if (result) |res| {
if (!res.deleted) {
return res;
}
}
return null;
}
var reader = self.acquireReader();
defer self.releaseReader(&reader);

pub const IndexInfo = struct {
version: u64,
attributes: std.StringHashMapUnmanaged(u64),

pub fn deinit(self: *IndexInfo, allocator: std.mem.Allocator) void {
self.attributes.deinit(allocator);
}
};

pub fn getInfo(self: *Self, allocator: std.mem.Allocator) !IndexInfo {
var snapshot = self.acquireSegments();
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread

var attributes: std.StringHashMapUnmanaged(u64) = .{};
errdefer {
var iter = attributes.iterator();
while (iter.next()) |e| {
allocator.free(e.key_ptr.*);
}
attributes.deinit(allocator);
}

var version: u64 = 0;
inline for (segment_lists) |n| {
const segments = @field(snapshot, n);
for (segments.value.nodes.items) |node| {
var iter = node.value.attributes.iterator();
while (iter.next()) |entry| {
const result = try attributes.getOrPut(allocator, entry.key_ptr.*);
if (!result.found_existing) {
errdefer attributes.removeByPtr(entry.key_ptr);
result.key_ptr.* = try allocator.dupe(u8, entry.key_ptr.*);
}
result.value_ptr.* = entry.value_ptr.*;
}
std.debug.assert(node.value.info.version > version);
version = node.value.info.version;
}
}

return .{
.version = version,
.attributes = attributes,
};
return reader.search(hashes, allocator, deadline);
}

test {
Expand Down
86 changes: 86 additions & 0 deletions src/IndexReader.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const std = @import("std");

const Self = @This();

const Deadline = @import("utils/Deadline.zig");
const SearchResults = @import("common.zig").SearchResults;
const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr;
const DocInfo = @import("common.zig").DocInfo;

const SegmentList = @import("segment_list.zig").SegmentList;

const FileSegment = @import("FileSegment.zig");
const FileSegmentList = SegmentList(FileSegment);

const MemorySegment = @import("MemorySegment.zig");
const MemorySegmentList = SegmentList(MemorySegment);

const segment_lists = [_][]const u8{
"file_segments",
"memory_segments",
};

file_segments: SharedPtr(FileSegmentList),
memory_segments: SharedPtr(MemorySegmentList),

pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {
const sorted_hashes = try allocator.dupe(u32, hashes);
defer allocator.free(sorted_hashes);
std.sort.pdq(u32, sorted_hashes, {}, std.sort.asc(u32));

var results = SearchResults.init(allocator);
errdefer results.deinit();

inline for (segment_lists) |n| {
const segments = @field(self, n);
try segments.value.search(sorted_hashes, &results, deadline);
}

results.sort();

return results;
}

pub fn getDocInfo(self: *Self, doc_id: u32) !?DocInfo {
// TODO optimize, read from the end
var result: ?DocInfo = null;
inline for (segment_lists) |n| {
const segments = @field(self, n);
if (segments.value.getDocInfo(doc_id)) |res| {
result = res;
}
}
if (result) |res| {
if (!res.deleted) {
return res;
}
}
return null;
}

pub fn getVersion(self: *Self) u64 {
if (self.memory_segments.value.getLast()) |node| {
return node.value.info.version;
}
if (self.file_segments.value.getLast()) |node| {
return node.value.info.version;
}
return 0;
}

pub fn getAttributes(self: *Self, allocator: std.mem.Allocator) !std.StringHashMapUnmanaged(u64) {
var attributes: std.StringHashMapUnmanaged(u64) = .{};
errdefer attributes.deinit(allocator);

inline for (segment_lists) |n| {
const segments = @field(self, n);
for (segments.value.nodes.items) |node| {
var iter = node.value.attributes.iterator();
while (iter.next()) |entry| {
try attributes.put(allocator, entry.key_ptr.*, entry.value_ptr.*);
}
}
}

return attributes;
}
16 changes: 10 additions & 6 deletions src/MultiIndex.zig
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,18 @@ fn removeIndex(self: *Self, name: []const u8) void {
}
}

pub fn releaseIndex(self: *Self, index_ref: *IndexRef) void {
fn releaseIndexRef(self: *Self, index_ref: *IndexRef) void {
self.lock.lock();
defer self.lock.unlock();

_ = index_ref.decRef();
}

pub fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
pub fn releaseIndex(self: *Self, index: *Index) void {
self.releaseIndexRef(@fieldParentPtr("index", index));
}

fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
if (!isValidName(name)) {
return error.InvalidIndexName;
}
Expand Down Expand Up @@ -149,20 +153,20 @@ pub fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
return result.value_ptr;
}

pub fn getIndex(self: *Self, name: []const u8) !*IndexRef {
pub fn getIndex(self: *Self, name: []const u8) !*Index {
const index_ref = try self.acquireIndex(name);
errdefer self.releaseIndex(index_ref);
errdefer self.releaseIndexRef(index_ref);

try index_ref.ensureOpen(false);

return index_ref;
return &index_ref.index;
}

pub fn createIndex(self: *Self, name: []const u8) !void {
log.info("creating index {s}", .{name});

const index_ref = try self.acquireIndex(name);
defer self.releaseIndex(index_ref);
defer self.releaseIndexRef(index_ref);

try index_ref.ensureOpen(true);
}
Expand Down
Loading