Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert data structures to *Unmanaged #36

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/FileSegment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ allocator: std.mem.Allocator,
dir: std.fs.Dir,
info: SegmentInfo = .{},
attributes: std.AutoHashMapUnmanaged(u64, u64) = .{},
docs: std.AutoHashMap(u32, bool),
index: std.ArrayList(u32),
docs: std.AutoHashMapUnmanaged(u32, bool) = .{},
index: std.ArrayListUnmanaged(u32) = .{},
block_size: usize = 0,
blocks: []const u8,
merged: u32 = 0,
Expand All @@ -35,16 +35,14 @@ pub fn init(allocator: std.mem.Allocator, options: Options) Self {
return Self{
.allocator = allocator,
.dir = options.dir,
.docs = std.AutoHashMap(u32, bool).init(allocator),
.index = std.ArrayList(u32).init(allocator),
.blocks = undefined,
};
}

pub fn deinit(self: *Self, delete_file: KeepOrDelete) void {
self.attributes.deinit(self.allocator);
self.docs.deinit();
self.index.deinit();
self.docs.deinit(self.allocator);
self.index.deinit(self.allocator);

if (self.raw_data) |data| {
std.posix.munmap(data);
Expand Down Expand Up @@ -138,9 +136,9 @@ test "build" {

source.info = .{ .version = 1 };
source.frozen = true;
try source.docs.put(1, true);
try source.items.append(.{ .id = 1, .hash = 1 });
try source.items.append(.{ .id = 1, .hash = 2 });
try source.docs.put(source.allocator, 1, true);
try source.items.append(source.allocator, .{ .id = 1, .hash = 1 });
try source.items.append(source.allocator, .{ .id = 1, .hash = 2 });

var source_reader = source.reader();
defer source_reader.close();
Expand Down
20 changes: 9 additions & 11 deletions src/MemorySegment.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@ pub const Options = struct {};
allocator: std.mem.Allocator,
info: SegmentInfo = .{},
attributes: std.AutoHashMapUnmanaged(u64, u64) = .{},
docs: std.AutoHashMap(u32, bool),
items: std.ArrayList(Item),
docs: std.AutoHashMapUnmanaged(u32, bool) = .{},
items: std.ArrayListUnmanaged(Item) = .{},
frozen: bool = false,

pub fn init(allocator: std.mem.Allocator, opts: Options) Self {
_ = opts;
return .{
.allocator = allocator,
.docs = std.AutoHashMap(u32, bool).init(allocator),
.items = std.ArrayList(Item).init(allocator),
};
}

pub fn deinit(self: *Self, delete_file: KeepOrDelete) void {
_ = delete_file;

self.attributes.deinit(self.allocator);
self.docs.deinit();
self.items.deinit();
self.docs.deinit(self.allocator);
self.items.deinit(self.allocator);
}

pub fn search(self: Self, sorted_hashes: []const u32, results: *SearchResults) !void {
Expand Down Expand Up @@ -76,8 +74,8 @@ pub fn build(self: *Self, changes: []const Change) !void {
}

try self.attributes.ensureTotalCapacity(self.allocator, num_attributes);
try self.docs.ensureTotalCapacity(num_docs);
try self.items.ensureTotalCapacity(num_items);
try self.docs.ensureTotalCapacity(self.allocator, num_docs);
try self.items.ensureTotalCapacity(self.allocator, num_items);

var i = changes.len;
while (i > 0) {
Expand Down Expand Up @@ -124,14 +122,14 @@ pub fn merge(self: *Self, merger: *SegmentMerger(Self)) !void {
self.attributes.deinit(self.allocator);
self.attributes = merger.segment.attributes.move();

self.docs.deinit();
self.docs.deinit(self.allocator);
self.docs = merger.segment.docs.move();

self.items.clearRetainingCapacity();
try self.items.ensureTotalCapacity(merger.estimated_size);
try self.items.ensureTotalCapacity(self.allocator, merger.estimated_size);
while (true) {
const item = try merger.read() orelse break;
try self.items.append(item);
try self.items.append(self.allocator, item);
merger.advance();
}
}
Expand Down
24 changes: 15 additions & 9 deletions src/filefmt.zig
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ test "writeBlock/readBlock/readFirstItemFromBlock" {
var segment = MemorySegment.init(std.testing.allocator, .{});
defer segment.deinit(.delete);

try segment.items.append(.{ .hash = 1, .id = 1 });
try segment.items.append(.{ .hash = 2, .id = 1 });
try segment.items.append(.{ .hash = 3, .id = 1 });
try segment.items.append(.{ .hash = 3, .id = 2 });
try segment.items.append(.{ .hash = 4, .id = 1 });
try segment.items.ensureTotalCapacity(std.testing.allocator, 5);
segment.items.appendAssumeCapacity(.{ .hash = 1, .id = 1 });
segment.items.appendAssumeCapacity(.{ .hash = 2, .id = 1 });
segment.items.appendAssumeCapacity(.{ .hash = 3, .id = 1 });
segment.items.appendAssumeCapacity(.{ .hash = 3, .id = 2 });
segment.items.appendAssumeCapacity(.{ .hash = 4, .id = 1 });

const block_size = 1024;
var block_data: [block_size]u8 = undefined;
Expand Down Expand Up @@ -428,7 +429,12 @@ pub fn readSegmentFile(dir: fs.Dir, info: SegmentInfo, segment: *FileSegment) !v
}

if (header.has_docs) {
try unpacker.readMapInto(&segment.docs);
// FIXME nicer api in msgpack.zig
var docs = std.AutoHashMap(u32, bool).init(segment.allocator);
defer docs.deinit();
try unpacker.readMapInto(&docs);
segment.docs.deinit(segment.allocator);
segment.docs = docs.unmanaged.move();
}

const block_size = header.block_size;
Expand All @@ -437,8 +443,8 @@ pub fn readSegmentFile(dir: fs.Dir, info: SegmentInfo, segment: *FileSegment) !v

const blocks_data_start = fixed_buffer_stream.pos;

const estimated_block_count = (raw_data.len - fixed_buffer_stream.pos) / block_size;
try segment.index.ensureTotalCapacity(estimated_block_count);
const max_possible_block_count = (raw_data.len - fixed_buffer_stream.pos) / block_size;
try segment.index.ensureTotalCapacity(segment.allocator, max_possible_block_count);

var num_items: u32 = 0;
var num_blocks: u32 = 0;
Expand All @@ -452,7 +458,7 @@ pub fn readSegmentFile(dir: fs.Dir, info: SegmentInfo, segment: *FileSegment) !v
if (block_header.num_items == 0) {
break;
}
try segment.index.append(block_header.first_item.hash);
segment.index.appendAssumeCapacity(block_header.first_item.hash);
num_items += block_header.num_items;
num_blocks += 1;
crc.update(block_data[0..]);
Expand Down
4 changes: 2 additions & 2 deletions src/segment_list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@ pub fn SegmentListManager(Segment: type) type {
var target = try List.createSegment(allocator, self.options);
defer List.destroySegment(allocator, &target);

var merger = SegmentMerger(Segment).init(allocator, segments.value);
var merger = try SegmentMerger(Segment).init(allocator, segments.value, candidate.end - candidate.start);
defer merger.deinit();

for (segments.value.nodes.items[candidate.start..candidate.end]) |segment| {
try merger.addSource(segment.value);
merger.addSource(segment.value);
}
try merger.prepare();

Expand Down
51 changes: 28 additions & 23 deletions src/segment_merger.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ const SharedPtr = @import("utils/shared_ptr.zig").SharedPtr;
pub const MergedSegmentInfo = struct {
info: SegmentInfo = .{},
attributes: std.AutoHashMapUnmanaged(u64, u64) = .{},
docs: std.AutoHashMap(u32, bool),
docs: std.AutoHashMapUnmanaged(u32, bool) = .{},

pub fn deinit(self: *MergedSegmentInfo, allocator: std.mem.Allocator) void {
self.attributes.deinit(allocator);
self.docs.deinit(allocator);
}
};

pub fn SegmentMerger(comptime Segment: type) type {
Expand All @@ -17,7 +22,12 @@ pub fn SegmentMerger(comptime Segment: type) type {

const Source = struct {
reader: Segment.Reader,
skip_docs: std.AutoHashMap(u32, void),
skip_docs: std.AutoHashMapUnmanaged(u32, void) = .{},

pub fn deinit(self: *Source, allocator: std.mem.Allocator) void {
self.reader.close();
self.skip_docs.deinit(allocator);
}

pub fn read(self: *Source) !?Item {
while (true) {
Expand All @@ -36,38 +46,33 @@ pub fn SegmentMerger(comptime Segment: type) type {
};

allocator: std.mem.Allocator,
sources: std.ArrayList(Source),
collection: *SegmentList(Segment),
segment: MergedSegmentInfo,
sources: std.ArrayListUnmanaged(Source) = .{},
segment: MergedSegmentInfo = .{},
estimated_size: usize = 0,

current_item: ?Item = null,

pub fn init(allocator: std.mem.Allocator, collection: *SegmentList(Segment)) Self {
pub fn init(allocator: std.mem.Allocator, collection: *SegmentList(Segment), num_sources: usize) !Self {
return .{
.allocator = allocator,
.sources = std.ArrayList(Source).init(allocator),
.collection = collection,
.segment = .{
.docs = std.AutoHashMap(u32, bool).init(allocator),
},
.sources = try std.ArrayListUnmanaged(Source).initCapacity(allocator, num_sources),
};
}

pub fn deinit(self: *Self) void {
for (self.sources.items) |*source| {
source.reader.close();
source.skip_docs.deinit();
source.deinit(self.allocator);
}
self.sources.deinit();
self.segment.docs.deinit();
self.sources.deinit(self.allocator);
self.segment.deinit(self.allocator);
self.* = undefined;
}

pub fn addSource(self: *Self, source: *Segment) !void {
try self.sources.append(.{
pub fn addSource(self: *Self, source: *Segment) void {
self.sources.appendAssumeCapacity(.{
.reader = source.reader(),
.skip_docs = std.AutoHashMap(u32, void).init(self.allocator),
});
}

Expand Down Expand Up @@ -100,7 +105,7 @@ pub fn SegmentMerger(comptime Segment: type) type {
}
}

try self.segment.docs.ensureTotalCapacity(total_docs);
try self.segment.docs.ensureTotalCapacity(self.allocator, total_docs);
for (sources) |*source| {
const segment = source.reader.segment;
var docs_added: usize = 0;
Expand All @@ -111,10 +116,10 @@ pub fn SegmentMerger(comptime Segment: type) type {
const doc_id = entry.key_ptr.*;
const doc_status = entry.value_ptr.*;
if (!self.collection.hasNewerVersion(doc_id, segment.info.version)) {
try self.segment.docs.put(doc_id, doc_status);
try self.segment.docs.put(self.allocator, doc_id, doc_status);
docs_added += 1;
} else {
try source.skip_docs.put(doc_id, {});
try source.skip_docs.put(self.allocator, doc_id, {});
}
}
if (docs_found > 0) {
Expand Down Expand Up @@ -158,7 +163,7 @@ test "merge segments" {
var collection = try SegmentList(MemorySegment).init(std.testing.allocator, 3);
defer collection.deinit(std.testing.allocator, .delete);

var merger = SegmentMerger(MemorySegment).init(std.testing.allocator, &collection);
var merger = try SegmentMerger(MemorySegment).init(std.testing.allocator, &collection, 3);
defer merger.deinit();

var node1 = try SegmentList(MemorySegment).createSegment(std.testing.allocator, .{});
Expand All @@ -174,9 +179,9 @@ test "merge segments" {
node2.value.info = .{ .version = 12, .merges = 0 };
node3.value.info = .{ .version = 13, .merges = 0 };

try merger.addSource(node1.value);
try merger.addSource(node2.value);
try merger.addSource(node3.value);
merger.addSource(node1.value);
merger.addSource(node2.value);
merger.addSource(node3.value);

try merger.prepare();

Expand Down