Skip to content

Commit 0f490f2

Browse files
authored
Merge pull request #41 from acoustid/indexreader
IndexReader API for consistent view of the index over multiple operat…
2 parents ef6290f + 16942c9 commit 0f490f2

File tree

4 files changed

+141
-139
lines changed

4 files changed

+141
-139
lines changed

src/Index.zig

+11-101
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const FileSegment = @import("FileSegment.zig");
2424
const FileSegmentList = SegmentList(FileSegment);
2525
const FileSegmentNode = FileSegmentList.Node;
2626

27+
const IndexReader = @import("IndexReader.zig");
2728
const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr;
2829

2930
const SegmentMerger = @import("segment_merger.zig").SegmentMerger;
@@ -180,8 +181,8 @@ fn loadSegments(self: *Self, create: bool) !u64 {
180181
}
181182

182183
fn doCheckpoint(self: *Self) !bool {
183-
var snapshot = self.acquireSegments();
184-
defer self.releaseSegments(&snapshot);
184+
var snapshot = self.acquireReader();
185+
defer self.releaseReader(&snapshot);
185186

186187
const source = snapshot.memory_segments.value.getFirst() orelse return false;
187188
if (source.value.getSize() < self.options.min_segment_size) {
@@ -432,117 +433,26 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo
432433
}
433434
}
434435

435-
const SegmentsSnapshot = struct {
436-
file_segments: SharedPtr(FileSegmentList),
437-
memory_segments: SharedPtr(MemorySegmentList),
438-
};
439-
440-
// Get the current segments lists and make sure they won't get deleted.
441-
fn acquireSegments(self: *Self) SegmentsSnapshot {
436+
pub fn acquireReader(self: *Self) IndexReader {
442437
self.segments_lock.lockShared();
443438
defer self.segments_lock.unlockShared();
444439

445-
return .{
440+
return IndexReader{
446441
.file_segments = self.file_segments.segments.acquire(),
447442
.memory_segments = self.memory_segments.segments.acquire(),
448443
};
449444
}
450445

451-
// Release the previously acquired segments lists, they will get deleted if no longer needed.
452-
fn releaseSegments(self: *Self, segments: *SegmentsSnapshot) void {
453-
MemorySegmentList.destroySegments(self.allocator, &segments.memory_segments);
454-
FileSegmentList.destroySegments(self.allocator, &segments.file_segments);
446+
pub fn releaseReader(self: *Self, reader: *IndexReader) void {
447+
MemorySegmentList.destroySegments(self.allocator, &reader.memory_segments);
448+
FileSegmentList.destroySegments(self.allocator, &reader.file_segments);
455449
}
456450

457-
const segment_lists = [_][]const u8{
458-
"file_segments",
459-
"memory_segments",
460-
};
461-
462451
pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {
463-
const sorted_hashes = try allocator.dupe(u32, hashes);
464-
defer allocator.free(sorted_hashes);
465-
std.sort.pdq(u32, sorted_hashes, {}, std.sort.asc(u32));
466-
467-
var results = SearchResults.init(allocator);
468-
errdefer results.deinit();
469-
470-
var snapshot = self.acquireSegments();
471-
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread
472-
473-
inline for (segment_lists) |n| {
474-
const segments = @field(snapshot, n);
475-
try segments.value.search(sorted_hashes, &results, deadline);
476-
}
477-
478-
results.sort();
479-
480-
return results;
481-
}
482-
483-
pub fn getDocInfo(self: *Self, doc_id: u32) !?DocInfo {
484-
var snapshot = self.acquireSegments();
485-
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread
486-
487-
var result: ?DocInfo = null;
488-
inline for (segment_lists) |n| {
489-
const segments = @field(snapshot, n);
490-
if (segments.value.getDocInfo(doc_id)) |res| {
491-
result = res;
492-
}
493-
}
494-
if (result) |res| {
495-
if (!res.deleted) {
496-
return res;
497-
}
498-
}
499-
return null;
500-
}
452+
var reader = self.acquireReader();
453+
defer self.releaseReader(&reader);
501454

502-
pub const IndexInfo = struct {
503-
version: u64,
504-
attributes: std.StringHashMapUnmanaged(u64),
505-
506-
pub fn deinit(self: *IndexInfo, allocator: std.mem.Allocator) void {
507-
self.attributes.deinit(allocator);
508-
}
509-
};
510-
511-
pub fn getInfo(self: *Self, allocator: std.mem.Allocator) !IndexInfo {
512-
var snapshot = self.acquireSegments();
513-
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread
514-
515-
var attributes: std.StringHashMapUnmanaged(u64) = .{};
516-
errdefer {
517-
var iter = attributes.iterator();
518-
while (iter.next()) |e| {
519-
allocator.free(e.key_ptr.*);
520-
}
521-
attributes.deinit(allocator);
522-
}
523-
524-
var version: u64 = 0;
525-
inline for (segment_lists) |n| {
526-
const segments = @field(snapshot, n);
527-
for (segments.value.nodes.items) |node| {
528-
var iter = node.value.attributes.iterator();
529-
while (iter.next()) |entry| {
530-
const result = try attributes.getOrPut(allocator, entry.key_ptr.*);
531-
if (!result.found_existing) {
532-
errdefer attributes.removeByPtr(entry.key_ptr);
533-
result.key_ptr.* = try allocator.dupe(u8, entry.key_ptr.*);
534-
}
535-
result.value_ptr.* = entry.value_ptr.*;
536-
}
537-
std.debug.assert(node.value.info.version > version);
538-
version = node.value.info.version;
539-
}
540-
}
541-
542-
return .{
543-
.version = version,
544-
.attributes = attributes,
545-
};
455+
return reader.search(hashes, allocator, deadline);
546456
}
547457

548458
test {

src/IndexReader.zig

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
const std = @import("std");
2+
3+
const Self = @This();
4+
5+
const Deadline = @import("utils/Deadline.zig");
6+
const SearchResults = @import("common.zig").SearchResults;
7+
const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr;
8+
const DocInfo = @import("common.zig").DocInfo;
9+
10+
const SegmentList = @import("segment_list.zig").SegmentList;
11+
12+
const FileSegment = @import("FileSegment.zig");
13+
const FileSegmentList = SegmentList(FileSegment);
14+
15+
const MemorySegment = @import("MemorySegment.zig");
16+
const MemorySegmentList = SegmentList(MemorySegment);
17+
18+
const segment_lists = [_][]const u8{
19+
"file_segments",
20+
"memory_segments",
21+
};
22+
23+
file_segments: SharedPtr(FileSegmentList),
24+
memory_segments: SharedPtr(MemorySegmentList),
25+
26+
pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, deadline: Deadline) !SearchResults {
27+
const sorted_hashes = try allocator.dupe(u32, hashes);
28+
defer allocator.free(sorted_hashes);
29+
std.sort.pdq(u32, sorted_hashes, {}, std.sort.asc(u32));
30+
31+
var results = SearchResults.init(allocator);
32+
errdefer results.deinit();
33+
34+
inline for (segment_lists) |n| {
35+
const segments = @field(self, n);
36+
try segments.value.search(sorted_hashes, &results, deadline);
37+
}
38+
39+
results.sort();
40+
41+
return results;
42+
}
43+
44+
pub fn getDocInfo(self: *Self, doc_id: u32) !?DocInfo {
45+
// TODO optimize, read from the end
46+
var result: ?DocInfo = null;
47+
inline for (segment_lists) |n| {
48+
const segments = @field(self, n);
49+
if (segments.value.getDocInfo(doc_id)) |res| {
50+
result = res;
51+
}
52+
}
53+
if (result) |res| {
54+
if (!res.deleted) {
55+
return res;
56+
}
57+
}
58+
return null;
59+
}
60+
61+
pub fn getVersion(self: *Self) u64 {
62+
if (self.memory_segments.value.getLast()) |node| {
63+
return node.value.info.version;
64+
}
65+
if (self.file_segments.value.getLast()) |node| {
66+
return node.value.info.version;
67+
}
68+
return 0;
69+
}
70+
71+
pub fn getAttributes(self: *Self, allocator: std.mem.Allocator) !std.StringHashMapUnmanaged(u64) {
72+
var attributes: std.StringHashMapUnmanaged(u64) = .{};
73+
errdefer attributes.deinit(allocator);
74+
75+
inline for (segment_lists) |n| {
76+
const segments = @field(self, n);
77+
for (segments.value.nodes.items) |node| {
78+
var iter = node.value.attributes.iterator();
79+
while (iter.next()) |entry| {
80+
try attributes.put(allocator, entry.key_ptr.*, entry.value_ptr.*);
81+
}
82+
}
83+
}
84+
85+
return attributes;
86+
}

src/MultiIndex.zig

+10-6
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,18 @@ fn removeIndex(self: *Self, name: []const u8) void {
114114
}
115115
}
116116

117-
pub fn releaseIndex(self: *Self, index_ref: *IndexRef) void {
117+
fn releaseIndexRef(self: *Self, index_ref: *IndexRef) void {
118118
self.lock.lock();
119119
defer self.lock.unlock();
120120

121121
_ = index_ref.decRef();
122122
}
123123

124-
pub fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
124+
pub fn releaseIndex(self: *Self, index: *Index) void {
125+
self.releaseIndexRef(@fieldParentPtr("index", index));
126+
}
127+
128+
fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
125129
if (!isValidName(name)) {
126130
return error.InvalidIndexName;
127131
}
@@ -149,20 +153,20 @@ pub fn acquireIndex(self: *Self, name: []const u8) !*IndexRef {
149153
return result.value_ptr;
150154
}
151155

152-
pub fn getIndex(self: *Self, name: []const u8) !*IndexRef {
156+
pub fn getIndex(self: *Self, name: []const u8) !*Index {
153157
const index_ref = try self.acquireIndex(name);
154-
errdefer self.releaseIndex(index_ref);
158+
errdefer self.releaseIndexRef(index_ref);
155159

156160
try index_ref.ensureOpen(false);
157161

158-
return index_ref;
162+
return &index_ref.index;
159163
}
160164

161165
pub fn createIndex(self: *Self, name: []const u8) !void {
162166
log.info("creating index {s}", .{name});
163167

164168
const index_ref = try self.acquireIndex(name);
165-
defer self.releaseIndex(index_ref);
169+
defer self.releaseIndexRef(index_ref);
166170

167171
try index_ref.ensureOpen(true);
168172
}

0 commit comments

Comments
 (0)