Skip to content

Commit bd35ea1

Browse files
committed
Fully supported msgpack in the HTTP API
1 parent 189a439 commit bd35ea1

File tree

3 files changed

+89
-60
lines changed

3 files changed

+89
-60
lines changed

src/Index.zig

+20-8
Original file line numberDiff line numberDiff line change
@@ -472,27 +472,39 @@ pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, de
472472
return results;
473473
}
474474

475-
pub fn getAttributes(self: *Self, allocator: std.mem.Allocator) !std.AutoHashMapUnmanaged(u64, u64) {
476-
var result: std.AutoHashMapUnmanaged(u64, u64) = .{};
477-
errdefer result.deinit(allocator);
475+
pub const IndexInfo = struct {
476+
version: u64,
477+
attributes: std.AutoHashMapUnmanaged(u64, u64),
478478

479+
pub fn deinit(self: *IndexInfo, allocator: std.mem.Allocator) void {
480+
self.attributes.deinit(allocator);
481+
}
482+
};
483+
484+
pub fn getInfo(self: *Self, allocator: std.mem.Allocator) !IndexInfo {
479485
var snapshot = self.acquireSegments();
480486
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread
481487

482-
var last_version: u64 = 0;
488+
var attributes: std.AutoHashMapUnmanaged(u64, u64) = .{};
489+
errdefer attributes.deinit(allocator);
490+
491+
var version: u64 = 0;
483492
inline for (segment_lists) |n| {
484493
const segments = @field(snapshot, n);
485494
for (segments.value.nodes.items) |node| {
486495
var iter = node.value.attributes.iterator();
487496
while (iter.next()) |entry| {
488-
try result.put(allocator, entry.key_ptr.*, entry.value_ptr.*);
497+
try attributes.put(allocator, entry.key_ptr.*, entry.value_ptr.*);
489498
}
490-
std.debug.assert(node.value.info.version > last_version);
491-
last_version = node.value.info.version;
499+
std.debug.assert(node.value.info.version > version);
500+
version = node.value.info.version;
492501
}
493502
}
494503

495-
return result;
504+
return .{
505+
.version = version,
506+
.attributes = attributes,
507+
};
496508
}
497509

498510
test {

src/MultiIndex.zig

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir) Self {
5656
pub fn deinit(self: *Self) void {
5757
var iter = self.indexes.iterator();
5858
while (iter.next()) |entry| {
59+
self.allocator.free(entry.key_ptr.*);
5960
entry.value_ptr.index.deinit();
6061
}
6162
self.indexes.deinit();

src/server.zig

+68-52
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ pub fn run(allocator: std.mem.Allocator, indexes: *MultiIndex, address: []const
6464
var server = try Server.init(allocator, config, &ctx);
6565
defer server.deinit();
6666

67+
server.errorHandler(errorHandler);
68+
6769
try installSignalHandlers(&server);
6870

6971
var router = server.router();
@@ -113,9 +115,10 @@ fn getIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response, send_body:
113115
const index_name = req.param("index") orelse return null;
114116
const index = ctx.indexes.getIndex(index_name) catch |err| {
115117
if (err == error.IndexNotFound) {
116-
res.status = 404;
117118
if (send_body) {
118-
try res.json(.{ .status = "index not found" }, .{});
119+
try writeErrorResponse(400, err, req, res);
120+
} else {
121+
res.status = 404;
119122
}
120123
return null;
121124
}
@@ -133,13 +136,16 @@ const ContentType = enum {
133136
msgpack,
134137
};
135138

136-
fn parseContentTypeHeader(content_type: []const u8) !ContentType {
137-
if (std.mem.eql(u8, content_type, "application/json")) {
138-
return .json;
139-
} else if (std.mem.eql(u8, content_type, "application/vnd.msgpack")) {
140-
return .msgpack;
139+
fn parseContentTypeHeader(req: *httpz.Request) !ContentType {
140+
if (req.header("content-type")) |content_type| {
141+
if (std.mem.eql(u8, content_type, "application/json")) {
142+
return .json;
143+
} else if (std.mem.eql(u8, content_type, "application/vnd.msgpack")) {
144+
return .msgpack;
145+
}
146+
return error.InvalidContentType;
141147
}
142-
return error.InvalidContentType;
148+
return .json;
143149
}
144150

145151
fn parseAcceptHeader(req: *httpz.Request) ContentType {
@@ -165,36 +171,48 @@ fn writeResponse(value: anytype, req: *httpz.Request, res: *httpz.Response) !voi
165171
}
166172
}
167173

174+
const ErrorResponse = struct {
175+
@"error": []const u8,
176+
177+
pub fn msgpackFormat() msgpack.StructFormat {
178+
return .{ .as_map = .{ .key = .{ .field_name_prefix = 1 } } };
179+
}
180+
};
181+
182+
fn errorHandler(_: *Context, req: *httpz.Request, res: *httpz.Response, err: anyerror) void {
183+
log.err("unhandled error in {s}: {any}", .{ req.url.raw, err });
184+
writeErrorResponse(500, err, req, res) catch {
185+
res.status = 500;
186+
res.body = "internal error";
187+
};
188+
}
189+
190+
fn writeErrorResponse(status: u16, err: anyerror, req: *httpz.Request, res: *httpz.Response) !void {
191+
res.status = status;
192+
try writeResponse(ErrorResponse{ .@"error" = @errorName(err) }, req, res);
193+
}
194+
168195
fn getRequestBody(comptime T: type, req: *httpz.Request, res: *httpz.Response) !?T {
169196
const content = req.body() orelse {
170-
res.status = 400;
171-
try writeResponse(.{ .status = "no content" }, req, res);
197+
try writeErrorResponse(400, error.NoContent, req, res);
172198
return null;
173199
};
174200

175-
const content_type_name = req.header("content-type") orelse {
176-
res.status = 415;
177-
try writeResponse(.{ .status = "missing content type header" }, req, res);
178-
return null;
179-
};
180-
const content_type = parseContentTypeHeader(content_type_name) catch {
181-
res.status = 415;
182-
try writeResponse(.{ .status = "unsupported content type" }, req, res);
201+
const content_type = parseContentTypeHeader(req) catch {
202+
try writeErrorResponse(415, error.UnsupportedContentType, req, res);
183203
return null;
184204
};
185205

186206
switch (content_type) {
187207
.json => {
188208
return json.parseFromSliceLeaky(T, req.arena, content, .{}) catch {
189-
res.status = 400;
190-
try writeResponse(.{ .status = "invalid body" }, req, res);
209+
try writeErrorResponse(400, error.InvalidContent, req, res);
191210
return null;
192211
};
193212
},
194213
.msgpack => {
195214
return msgpack.decodeFromSliceLeaky(T, req.arena, content) catch {
196-
res.status = 400;
197-
try writeResponse(.{ .status = "invalid body" }, req, res);
215+
try writeErrorResponse(400, error.InvalidContent, req, res);
198216
return null;
199217
};
200218
},
@@ -221,11 +239,7 @@ fn handleSearch(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void
221239

222240
metrics.search();
223241

224-
const results = index.search(body.query, req.arena, deadline) catch |err| {
225-
log.err("index search error: {}", .{err});
226-
res.status = 500;
227-
return writeResponse(.{ .status = "internal error" }, req, res);
228-
};
242+
const results = try index.search(body.query, req.arena, deadline);
229243

230244
var results_json = SearchResultsJSON{ .results = try req.arena.alloc(SearchResultJSON, results.count()) };
231245
for (results.values(), 0..) |r, i| {
@@ -253,21 +267,14 @@ fn handleUpdate(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void
253267

254268
metrics.update(body.changes.len);
255269

256-
index.update(body.changes) catch |err| {
257-
log.err("index search error: {}", .{err});
258-
res.status = 500;
259-
return writeResponse(.{ .status = "internal error" }, req, res);
260-
};
270+
try index.update(body.changes);
261271

262272
return writeResponse(.{ .status = "ok" }, req, res);
263273
}
264274

265275
fn handleHeadIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
266276
const index_ref = try getIndex(ctx, req, res, false) orelse return;
267277
defer releaseIndex(ctx, index_ref);
268-
269-
res.status = 200;
270-
return;
271278
}
272279

273280
const Attributes = struct {
@@ -284,47 +291,56 @@ const Attributes = struct {
284291
}
285292
try jws.endArray();
286293
}
294+
295+
pub fn msgpackWrite(self: Attributes, packer: anytype) !void {
296+
try packer.writeMapHeader(self.attributes.count());
297+
var iter = self.attributes.iterator();
298+
while (iter.next()) |entry| {
299+
try packer.write(@TypeOf(entry.key_ptr.*), entry.key_ptr.*);
300+
try packer.write(@TypeOf(entry.value_ptr.*), entry.value_ptr.*);
301+
}
302+
}
287303
};
288304

289305
const GetIndexResponse = struct {
290-
status: []const u8,
306+
version: u64,
291307
attributes: Attributes,
308+
309+
pub fn msgpackFormat() msgpack.StructFormat {
310+
return .{ .as_map = .{ .key = .{ .field_name_prefix = 1 } } };
311+
}
292312
};
293313

294314
fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
295315
const index_ref = try getIndex(ctx, req, res, true) orelse return;
296316
defer releaseIndex(ctx, index_ref);
297317

298-
const attributes = try index_ref.index.getAttributes(req.arena);
318+
const info = try index_ref.index.getInfo(req.arena);
299319
const response = GetIndexResponse{
300-
.status = "ok",
301-
.attributes = .{ .attributes = attributes },
320+
.version = info.version,
321+
.attributes = .{
322+
.attributes = info.attributes,
323+
},
302324
};
303-
return res.json(&response, .{});
325+
return writeResponse(response, req, res);
304326
}
305327

328+
const EmptyResponse = struct {};
329+
306330
fn handlePutIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
307331
const index_name = req.param("index") orelse return;
308332

309-
ctx.indexes.createIndex(index_name) catch |err| {
310-
log.err("index create error: {}", .{err});
311-
res.status = 500;
312-
return res.json(.{ .status = "internal error" }, .{});
313-
};
333+
try ctx.indexes.createIndex(index_name);
314334

315-
return res.json(.{ .status = "ok" }, .{});
335+
return writeResponse(EmptyResponse{}, req, res);
316336
}
317337

318338
fn handleDeleteIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
319339
const index_name = req.param("index") orelse return;
320340

321-
ctx.indexes.deleteIndex(index_name) catch |err| {
322-
log.err("index delete error: {}", .{err});
323-
res.status = 500;
324-
return res.json(.{ .status = "internal error" }, .{});
325-
};
341+
try ctx.indexes.deleteIndex(index_name);
326342

327-
return res.json(.{ .status = "ok" }, .{});
343+
return writeResponse(EmptyResponse{}, req, res);
328344
}
329345

330346
fn handlePing(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {

0 commit comments

Comments
 (0)