Skip to content

Commit a947302

Browse files
authored
Merge pull request #37 from acoustid/http-msgpack-in-all-endpoints
Support msgpack in all HTTP endpoints
2 parents 189a439 + a2d245f commit a947302

File tree

3 files changed

+94
-60
lines changed

3 files changed

+94
-60
lines changed

src/Index.zig

+20-8
Original file line numberDiff line numberDiff line change
@@ -472,27 +472,39 @@ pub fn search(self: *Self, hashes: []const u32, allocator: std.mem.Allocator, de
472472
return results;
473473
}
474474

475-
pub fn getAttributes(self: *Self, allocator: std.mem.Allocator) !std.AutoHashMapUnmanaged(u64, u64) {
476-
var result: std.AutoHashMapUnmanaged(u64, u64) = .{};
477-
errdefer result.deinit(allocator);
475+
pub const IndexInfo = struct {
476+
version: u64,
477+
attributes: std.AutoHashMapUnmanaged(u64, u64),
478478

479+
pub fn deinit(self: *IndexInfo, allocator: std.mem.Allocator) void {
480+
self.attributes.deinit(allocator);
481+
}
482+
};
483+
484+
pub fn getInfo(self: *Self, allocator: std.mem.Allocator) !IndexInfo {
479485
var snapshot = self.acquireSegments();
480486
defer self.releaseSegments(&snapshot); // FIXME this possibly deletes orphaned segments, do it in a separate thread
481487

482-
var last_version: u64 = 0;
488+
var attributes: std.AutoHashMapUnmanaged(u64, u64) = .{};
489+
errdefer attributes.deinit(allocator);
490+
491+
var version: u64 = 0;
483492
inline for (segment_lists) |n| {
484493
const segments = @field(snapshot, n);
485494
for (segments.value.nodes.items) |node| {
486495
var iter = node.value.attributes.iterator();
487496
while (iter.next()) |entry| {
488-
try result.put(allocator, entry.key_ptr.*, entry.value_ptr.*);
497+
try attributes.put(allocator, entry.key_ptr.*, entry.value_ptr.*);
489498
}
490-
std.debug.assert(node.value.info.version > last_version);
491-
last_version = node.value.info.version;
499+
std.debug.assert(node.value.info.version > version);
500+
version = node.value.info.version;
492501
}
493502
}
494503

495-
return result;
504+
return .{
505+
.version = version,
506+
.attributes = attributes,
507+
};
496508
}
497509

498510
test {

src/MultiIndex.zig

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub fn init(allocator: std.mem.Allocator, dir: std.fs.Dir) Self {
5656
pub fn deinit(self: *Self) void {
5757
var iter = self.indexes.iterator();
5858
while (iter.next()) |entry| {
59+
self.allocator.free(entry.key_ptr.*);
5960
entry.value_ptr.index.deinit();
6061
}
6162
self.indexes.deinit();

src/server.zig

+73-52
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ pub fn run(allocator: std.mem.Allocator, indexes: *MultiIndex, address: []const
6464
var server = try Server.init(allocator, config, &ctx);
6565
defer server.deinit();
6666

67+
server.errorHandler(handleError);
68+
server.notFound(handleNotFound);
69+
6770
try installSignalHandlers(&server);
6871

6972
var router = server.router();
@@ -113,9 +116,10 @@ fn getIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response, send_body:
113116
const index_name = req.param("index") orelse return null;
114117
const index = ctx.indexes.getIndex(index_name) catch |err| {
115118
if (err == error.IndexNotFound) {
116-
res.status = 404;
117119
if (send_body) {
118-
try res.json(.{ .status = "index not found" }, .{});
120+
try writeErrorResponse(400, err, req, res);
121+
} else {
122+
res.status = 404;
119123
}
120124
return null;
121125
}
@@ -133,13 +137,16 @@ const ContentType = enum {
133137
msgpack,
134138
};
135139

136-
fn parseContentTypeHeader(content_type: []const u8) !ContentType {
137-
if (std.mem.eql(u8, content_type, "application/json")) {
138-
return .json;
139-
} else if (std.mem.eql(u8, content_type, "application/vnd.msgpack")) {
140-
return .msgpack;
140+
fn parseContentTypeHeader(req: *httpz.Request) !ContentType {
141+
if (req.header("content-type")) |content_type| {
142+
if (std.mem.eql(u8, content_type, "application/json")) {
143+
return .json;
144+
} else if (std.mem.eql(u8, content_type, "application/vnd.msgpack")) {
145+
return .msgpack;
146+
}
147+
return error.InvalidContentType;
141148
}
142-
return error.InvalidContentType;
149+
return .json;
143150
}
144151

145152
fn parseAcceptHeader(req: *httpz.Request) ContentType {
@@ -165,36 +172,52 @@ fn writeResponse(value: anytype, req: *httpz.Request, res: *httpz.Response) !voi
165172
}
166173
}
167174

175+
const ErrorResponse = struct {
176+
@"error": []const u8,
177+
178+
pub fn msgpackFormat() msgpack.StructFormat {
179+
return .{ .as_map = .{ .key = .{ .field_name_prefix = 1 } } };
180+
}
181+
};
182+
183+
fn handleNotFound(_: *Context, req: *httpz.Request, res: *httpz.Response) !void {
184+
try writeErrorResponse(404, error.NotFound, req, res);
185+
}
186+
187+
fn handleError(_: *Context, req: *httpz.Request, res: *httpz.Response, err: anyerror) void {
188+
log.err("unhandled error in {s}: {any}", .{ req.url.raw, err });
189+
writeErrorResponse(500, err, req, res) catch {
190+
res.status = 500;
191+
res.body = "internal error";
192+
};
193+
}
194+
195+
fn writeErrorResponse(status: u16, err: anyerror, req: *httpz.Request, res: *httpz.Response) !void {
196+
res.status = status;
197+
try writeResponse(ErrorResponse{ .@"error" = @errorName(err) }, req, res);
198+
}
199+
168200
fn getRequestBody(comptime T: type, req: *httpz.Request, res: *httpz.Response) !?T {
169201
const content = req.body() orelse {
170-
res.status = 400;
171-
try writeResponse(.{ .status = "no content" }, req, res);
202+
try writeErrorResponse(400, error.NoContent, req, res);
172203
return null;
173204
};
174205

175-
const content_type_name = req.header("content-type") orelse {
176-
res.status = 415;
177-
try writeResponse(.{ .status = "missing content type header" }, req, res);
178-
return null;
179-
};
180-
const content_type = parseContentTypeHeader(content_type_name) catch {
181-
res.status = 415;
182-
try writeResponse(.{ .status = "unsupported content type" }, req, res);
206+
const content_type = parseContentTypeHeader(req) catch {
207+
try writeErrorResponse(415, error.UnsupportedContentType, req, res);
183208
return null;
184209
};
185210

186211
switch (content_type) {
187212
.json => {
188213
return json.parseFromSliceLeaky(T, req.arena, content, .{}) catch {
189-
res.status = 400;
190-
try writeResponse(.{ .status = "invalid body" }, req, res);
214+
try writeErrorResponse(400, error.InvalidContent, req, res);
191215
return null;
192216
};
193217
},
194218
.msgpack => {
195219
return msgpack.decodeFromSliceLeaky(T, req.arena, content) catch {
196-
res.status = 400;
197-
try writeResponse(.{ .status = "invalid body" }, req, res);
220+
try writeErrorResponse(400, error.InvalidContent, req, res);
198221
return null;
199222
};
200223
},
@@ -221,11 +244,7 @@ fn handleSearch(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void
221244

222245
metrics.search();
223246

224-
const results = index.search(body.query, req.arena, deadline) catch |err| {
225-
log.err("index search error: {}", .{err});
226-
res.status = 500;
227-
return writeResponse(.{ .status = "internal error" }, req, res);
228-
};
247+
const results = try index.search(body.query, req.arena, deadline);
229248

230249
var results_json = SearchResultsJSON{ .results = try req.arena.alloc(SearchResultJSON, results.count()) };
231250
for (results.values(), 0..) |r, i| {
@@ -253,21 +272,14 @@ fn handleUpdate(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void
253272

254273
metrics.update(body.changes.len);
255274

256-
index.update(body.changes) catch |err| {
257-
log.err("index search error: {}", .{err});
258-
res.status = 500;
259-
return writeResponse(.{ .status = "internal error" }, req, res);
260-
};
275+
try index.update(body.changes);
261276

262277
return writeResponse(.{ .status = "ok" }, req, res);
263278
}
264279

265280
fn handleHeadIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
266281
const index_ref = try getIndex(ctx, req, res, false) orelse return;
267282
defer releaseIndex(ctx, index_ref);
268-
269-
res.status = 200;
270-
return;
271283
}
272284

273285
const Attributes = struct {
@@ -284,47 +296,56 @@ const Attributes = struct {
284296
}
285297
try jws.endArray();
286298
}
299+
300+
pub fn msgpackWrite(self: Attributes, packer: anytype) !void {
301+
try packer.writeMapHeader(self.attributes.count());
302+
var iter = self.attributes.iterator();
303+
while (iter.next()) |entry| {
304+
try packer.write(@TypeOf(entry.key_ptr.*), entry.key_ptr.*);
305+
try packer.write(@TypeOf(entry.value_ptr.*), entry.value_ptr.*);
306+
}
307+
}
287308
};
288309

289310
const GetIndexResponse = struct {
290-
status: []const u8,
311+
version: u64,
291312
attributes: Attributes,
313+
314+
pub fn msgpackFormat() msgpack.StructFormat {
315+
return .{ .as_map = .{ .key = .{ .field_name_prefix = 1 } } };
316+
}
292317
};
293318

294319
fn handleGetIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
295320
const index_ref = try getIndex(ctx, req, res, true) orelse return;
296321
defer releaseIndex(ctx, index_ref);
297322

298-
const attributes = try index_ref.index.getAttributes(req.arena);
323+
const info = try index_ref.index.getInfo(req.arena);
299324
const response = GetIndexResponse{
300-
.status = "ok",
301-
.attributes = .{ .attributes = attributes },
325+
.version = info.version,
326+
.attributes = .{
327+
.attributes = info.attributes,
328+
},
302329
};
303-
return res.json(&response, .{});
330+
return writeResponse(response, req, res);
304331
}
305332

333+
const EmptyResponse = struct {};
334+
306335
fn handlePutIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
307336
const index_name = req.param("index") orelse return;
308337

309-
ctx.indexes.createIndex(index_name) catch |err| {
310-
log.err("index create error: {}", .{err});
311-
res.status = 500;
312-
return res.json(.{ .status = "internal error" }, .{});
313-
};
338+
try ctx.indexes.createIndex(index_name);
314339

315-
return res.json(.{ .status = "ok" }, .{});
340+
return writeResponse(EmptyResponse{}, req, res);
316341
}
317342

318343
fn handleDeleteIndex(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {
319344
const index_name = req.param("index") orelse return;
320345

321-
ctx.indexes.deleteIndex(index_name) catch |err| {
322-
log.err("index delete error: {}", .{err});
323-
res.status = 500;
324-
return res.json(.{ .status = "internal error" }, .{});
325-
};
346+
try ctx.indexes.deleteIndex(index_name);
326347

327-
return res.json(.{ .status = "ok" }, .{});
348+
return writeResponse(EmptyResponse{}, req, res);
328349
}
329350

330351
fn handlePing(ctx: *Context, req: *httpz.Request, res: *httpz.Response) !void {

0 commit comments

Comments
 (0)