Skip to content

Commit 926c721

Browse files
committed
Count number of docs
1 parent 1fa4d4f commit 926c721

8 files changed

+103
-39
lines changed

src/Index.zig

+17
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const Options = struct {
4545
options: Options,
4646
allocator: std.mem.Allocator,
4747
scheduler: *Scheduler,
48+
name: []const u8,
4849

4950
dir: std.fs.Dir,
5051

@@ -117,6 +118,7 @@ pub fn init(allocator: std.mem.Allocator, scheduler: *Scheduler, parent_dir: std
117118
.allocator = allocator,
118119
.scheduler = scheduler,
119120
.dir = dir,
121+
.name = path,
120122
.oplog = oplog,
121123
.segments_lock = .{},
122124
.memory_segments = memory_segments,
@@ -214,6 +216,8 @@ fn doCheckpoint(self: *Self) !bool {
214216
log.warn("failed to truncate oplog: {}", .{err});
215217
};
216218

219+
defer self.updateDocsMetrics();
220+
217221
// commit updated lists
218222

219223
self.segments_lock.lock();
@@ -229,6 +233,13 @@ fn doCheckpoint(self: *Self) !bool {
229233
return true;
230234
}
231235

236+
fn updateDocsMetrics(self: *Self) void {
237+
var snapshot = self.acquireReader();
238+
defer self.releaseReader(&snapshot);
239+
240+
metrics.docs(self.name, snapshot.getNumDocs());
241+
}
242+
232243
fn checkpointTask(self: *Self) void {
233244
_ = self.doCheckpoint() catch |err| {
234245
log.err("checkpoint failed: {}", .{err});
@@ -264,6 +275,8 @@ fn maybeMergeFileSegments(self: *Self) !bool {
264275

265276
try self.updateManifestFile(upd.segments.value);
266277

278+
defer self.updateDocsMetrics();
279+
267280
self.segments_lock.lock();
268281
defer self.segments_lock.unlock();
269282

@@ -292,6 +305,8 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
292305
var upd = try self.memory_segments.prepareMerge(self.allocator) orelse return false;
293306
defer self.memory_segments.cleanupAfterUpdate(self.allocator, &upd);
294307

308+
defer self.updateDocsMetrics();
309+
295310
self.segments_lock.lock();
296311
defer self.segments_lock.unlock();
297312

@@ -372,6 +387,8 @@ pub fn updateInternal(self: *Self, changes: []const Change, commit_id: ?u64) !vo
372387

373388
target.value.info.version = commit_id orelse try self.oplog.write(changes);
374389

390+
defer self.updateDocsMetrics();
391+
375392
self.segments_lock.lock();
376393
defer self.segments_lock.unlock();
377394

src/IndexReader.zig

+10-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, de
4141
return results;
4242
}
4343

44+
pub fn getNumDocs(self: *Self) u32 {
45+
var result: u32 = 0;
46+
inline for (segment_lists) |n| {
47+
const segments = @field(self, n);
48+
result += segments.value.getNumDocs();
49+
}
50+
return result;
51+
}
52+
4453
pub fn getDocInfo(self: *Self, doc_id: u32) !?DocInfo {
4554
// TODO optimize, read from the end
4655
var result: ?DocInfo = null;
@@ -92,7 +101,7 @@ pub fn getVersion(self: *Self) u64 {
92101
return 0;
93102
}
94103

95-
pub fn getSegmentCount(self: *Self) usize {
104+
pub fn getNumSegments(self: *Self) usize {
96105
return self.memory_segments.value.count() + self.file_segments.value.count();
97106
}
98107

src/MemorySegment.zig

+6
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ pub fn build(self: *Self, changes: []const Change) !void {
110110
const result = self.docs.getOrPutAssumeCapacity(op.id);
111111
if (!result.found_existing) {
112112
result.value_ptr.* = false;
113+
if (self.min_doc_id == 0 or op.id < self.min_doc_id) {
114+
self.min_doc_id = op.id;
115+
}
116+
if (self.max_doc_id == 0 or op.id > self.max_doc_id) {
117+
self.max_doc_id = op.id;
118+
}
113119
}
114120
},
115121
.set_attribute => |op| {

src/main.zig

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub fn main() !void {
7575
}
7676
log.info("using {} threads", .{threads});
7777

78-
try metrics.initializeMetrics(.{ .prefix = "aindex_" });
78+
try metrics.initializeMetrics(allocator, .{ .prefix = "aindex_" });
7979

8080
var scheduler = Scheduler.init(allocator);
8181
defer scheduler.deinit();

src/metrics.zig

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1+
const std = @import("std");
12
const m = @import("metrics");
23

34
var metrics = m.initializeNoop(Metrics);
45

6+
const WithIndex = struct { index: []const u8 };
7+
58
const Metrics = struct {
69
searches: m.Counter(u64),
710
updates: m.Counter(u64),
811
checkpoints: m.Counter(u64),
912
memory_segment_merges: m.Counter(u64),
1013
file_segment_merges: m.Counter(u64),
14+
docs: m.GaugeVec(u32, WithIndex),
1115
};
1216

1317
pub fn search() void {
@@ -30,13 +34,18 @@ pub fn fileSegmentMerge() void {
3034
metrics.file_segment_merges.incr();
3135
}
3236

33-
pub fn initializeMetrics(comptime opts: m.RegistryOpts) !void {
37+
pub fn docs(index_name: []const u8, value: u32) void {
38+
metrics.docs.set(.{ .index = index_name }, value) catch {};
39+
}
40+
41+
pub fn initializeMetrics(allocator: std.mem.Allocator, comptime opts: m.RegistryOpts) !void {
3442
metrics = .{
3543
.searches = m.Counter(u64).init("searches_total", .{}, opts),
3644
.updates = m.Counter(u64).init("updates_total", .{}, opts),
3745
.checkpoints = m.Counter(u64).init("checkpoints_total", .{}, opts),
3846
.memory_segment_merges = m.Counter(u64).init("memory_segment_merges_total", .{}, opts),
3947
.file_segment_merges = m.Counter(u64).init("file_segment_merges_total", .{}, opts),
48+
.docs = try m.GaugeVec(u32, WithIndex).init(allocator, "docs", .{}, opts),
4049
};
4150
}
4251

src/segment_list.zig

+8
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,14 @@ pub fn SegmentList(Segment: type) type {
110110
}
111111
}
112112

113+
pub fn getNumDocs(self: Self) u32 {
114+
var result: u32 = 0;
115+
for (self.nodes.items) |node| {
116+
result += node.value.docs.count();
117+
}
118+
return result;
119+
}
120+
113121
pub fn getMinDocId(self: Self) u32 {
114122
var result: u32 = 0;
115123
for (self.nodes.items) |node| {

src/server.zig

+3-1
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ const Attributes = struct {
431431
const GetIndexResponse = struct {
432432
version: u64,
433433
segments: usize,
434+
docs: usize,
434435
attributes: Attributes,
435436

436437
pub fn msgpackFormat() msgpack.StructFormat {
@@ -447,7 +448,8 @@ fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !voi
447448

448449
const response = GetIndexResponse{
449450
.version = index_reader.getVersion(),
450-
.segments = index_reader.getSegmentCount(),
451+
.segments = index_reader.getNumSegments(),
452+
.docs = index_reader.getNumDocs(),
451453
.attributes = .{
452454
.attributes = try index_reader.getAttributes(req.arena),
453455
},

tests/test_index_api.py

+48-35
Original file line numberDiff line numberDiff line change
@@ -4,95 +4,108 @@
44

55

66
def test_head_index_not_found(client, index_name):
7-
req = client.head(f'/{index_name}')
7+
req = client.head(f"/{index_name}")
88
assert req.status_code == 404, req.content
9-
assert req.content == b''
9+
assert req.content == b""
1010

1111

1212
def test_head_index(client, index_name, create_index):
13-
req = client.head(f'/{index_name}')
13+
req = client.head(f"/{index_name}")
1414
assert req.status_code == 200, req.content
15-
assert req.content == b''
15+
assert req.content == b""
1616

1717

1818
def test_get_index_not_found(client, index_name):
19-
req = client.get(f'/{index_name}')
19+
req = client.get(f"/{index_name}")
2020
assert req.status_code == 404, req.content
21-
assert json.loads(req.content) == {'error': 'IndexNotFound'}
21+
assert json.loads(req.content) == {"error": "IndexNotFound"}
2222

2323

24-
@pytest.mark.parametrize('fmt', ['json', 'msgpack'])
24+
@pytest.mark.parametrize("fmt", ["json", "msgpack"])
2525
def test_get_index(client, index_name, create_index, fmt):
26-
req = client.post(f'/{index_name}/_update', json={
27-
'changes': [
28-
{'insert': {'id': 1, 'hashes': [101, 201, 301]}},
29-
{'set_attribute': {'name': 'foo', 'value': 1234}},
30-
],
31-
})
26+
req = client.post(
27+
f"/{index_name}/_update",
28+
json={
29+
"changes": [
30+
{"insert": {"id": 1, "hashes": [101, 201, 301]}},
31+
{"set_attribute": {"name": "foo", "value": 1234}},
32+
],
33+
},
34+
)
3235
assert req.status_code == 200, req.content
3336

34-
req = client.get(f'/{index_name}', headers=headers(fmt))
37+
req = client.get(f"/{index_name}", headers=headers(fmt))
3538
assert req.status_code == 200, req.content
36-
if fmt == 'json':
37-
expected = {'version': 1, 'segments': 1, 'attributes': {'foo': 1234, 'min_document_id': 1, 'max_document_id': 1}}
39+
if fmt == "json":
40+
expected = {
41+
"version": 1,
42+
"segments": 1,
43+
"docs": 1,
44+
"attributes": {"foo": 1234, "min_document_id": 1, "max_document_id": 1},
45+
}
3846
else:
39-
expected = {'v': 1, 's': 1, 'a': {'foo': 1234, 'min_document_id': 1, 'max_document_id': 1}}
47+
expected = {
48+
"v": 1,
49+
"s": 1,
50+
"d": 1,
51+
"a": {"foo": 1234, "min_document_id": 1, "max_document_id": 1},
52+
}
4053
assert decode(fmt, req.content) == expected
4154

4255

4356
def headers(fmt):
44-
if fmt == 'json':
57+
if fmt == "json":
4558
return {
46-
'Content-Type': 'application/json',
47-
'Accept': 'application/json',
59+
"Content-Type": "application/json",
60+
"Accept": "application/json",
4861
}
49-
elif fmt == 'msgpack':
62+
elif fmt == "msgpack":
5063
return {
51-
'Content-Type': 'application/vnd.msgpack',
52-
'Accept': 'application/vnd.msgpack',
64+
"Content-Type": "application/vnd.msgpack",
65+
"Accept": "application/vnd.msgpack",
5366
}
5467
else:
5568
return {}
5669

5770

5871
def decode(fmt, content):
59-
if fmt == 'json':
72+
if fmt == "json":
6073
return json.loads(content)
61-
elif fmt == 'msgpack':
74+
elif fmt == "msgpack":
6275
return msgpack.loads(content)
6376
else:
6477
assert False
6578

6679

67-
@pytest.mark.parametrize('fmt', ['json', 'msgpack'])
80+
@pytest.mark.parametrize("fmt", ["json", "msgpack"])
6881
def test_create_index(client, index_name, fmt):
69-
req = client.head(f'/{index_name}')
82+
req = client.head(f"/{index_name}")
7083
assert req.status_code == 404, req.content
7184

72-
req = client.put(f'/{index_name}', headers=headers(fmt))
85+
req = client.put(f"/{index_name}", headers=headers(fmt))
7386
assert req.status_code == 200, req.content
7487
assert decode(fmt, req.content) == {}
7588

76-
req = client.put(f'/{index_name}', headers=headers(fmt))
89+
req = client.put(f"/{index_name}", headers=headers(fmt))
7790
assert req.status_code == 200, req.content
7891
assert decode(fmt, req.content) == {}
7992

80-
req = client.head(f'/{index_name}')
93+
req = client.head(f"/{index_name}")
8194
assert req.status_code == 200, req.content
8295

8396

84-
@pytest.mark.parametrize('fmt', ['json', 'msgpack'])
97+
@pytest.mark.parametrize("fmt", ["json", "msgpack"])
8598
def test_delete_index(client, index_name, create_index, fmt):
86-
req = client.head(f'/{index_name}')
99+
req = client.head(f"/{index_name}")
87100
assert req.status_code == 200, req.content
88101

89-
req = client.delete(f'/{index_name}', headers=headers(fmt))
102+
req = client.delete(f"/{index_name}", headers=headers(fmt))
90103
assert req.status_code == 200, req.content
91104
assert decode(fmt, req.content) == {}
92105

93-
req = client.delete(f'/{index_name}', headers=headers(fmt))
106+
req = client.delete(f"/{index_name}", headers=headers(fmt))
94107
assert req.status_code == 200, req.content
95108
assert decode(fmt, req.content) == {}
96109

97-
req = client.head(f'/{index_name}')
110+
req = client.head(f"/{index_name}")
98111
assert req.status_code == 404, req.content

0 commit comments

Comments
 (0)