@@ -306,14 +306,17 @@ fn doCheckpoint(self: *Self) !bool {
306
306
self .memory_segments .commitUpdate (& memory_segments_update );
307
307
self .file_segments .commitUpdate (& file_segments_update );
308
308
309
+ if (self .file_segments .needsMerge ()) {
310
+ self .file_segment_merge_event .set ();
311
+ }
312
+
309
313
return true ;
310
314
}
311
315
312
316
fn checkpointThreadFn (self : * Self ) void {
313
317
while (! self .stopping .load (.acquire )) {
314
318
if (self .doCheckpoint ()) | successful | {
315
319
if (successful ) {
316
- self .scheduleFileSegmentMerge ();
317
320
continue ;
318
321
}
319
322
self .checkpoint_event .reset ();
@@ -357,6 +360,7 @@ fn maybeMergeFileSegments(self: *Self) !bool {
357
360
defer self .segments_lock .unlock ();
358
361
359
362
self .file_segments .commitUpdate (& upd );
363
+
360
364
return true ;
361
365
}
362
366
@@ -398,14 +402,16 @@ fn maybeMergeMemorySegments(self: *Self) !bool {
398
402
defer self .segments_lock .unlock ();
399
403
400
404
self .memory_segments .commitUpdate (& upd );
405
+
406
+ self .maybeScheduleCheckpoint ();
407
+
401
408
return true ;
402
409
}
403
410
404
411
fn memorySegmentMergeThreadFn (self : * Self ) void {
405
412
while (! self .stopping .load (.acquire )) {
406
413
if (self .maybeMergeMemorySegments ()) | successful | {
407
414
if (successful ) {
408
- self .checkpoint_event .set ();
409
415
continue ;
410
416
}
411
417
self .memory_segment_merge_event .reset ();
@@ -445,6 +451,14 @@ const Checkpoint = struct {
445
451
dest : ? * FileSegmentNode = null ,
446
452
};
447
453
454
+ fn maybeScheduleCheckpoint (self : * Self ) void {
455
+ if (self .memory_segments .segments .value .getFirst ()) | first_node | {
456
+ if (first_node .value .getSize () >= self .options .min_segment_size ) {
457
+ self .checkpoint_event .set ();
458
+ }
459
+ }
460
+ }
461
+
448
462
fn readyForCheckpoint (self : * Self ) ? MemorySegmentNode {
449
463
self .segments_lock .lockShared ();
450
464
defer self .segments_lock .unlockShared ();
@@ -457,32 +471,29 @@ fn readyForCheckpoint(self: *Self) ?MemorySegmentNode {
457
471
return null ;
458
472
}
459
473
460
- fn scheduleCheckpoint (self : * Self ) void {
461
- self .checkpoint_event .set ();
462
- }
474
+ pub fn update (self : * Self , changes : []const Change ) ! void {
475
+ log .debug ("update with {} changes" , .{changes .len });
463
476
464
- fn scheduleMemorySegmentMerge (self : * Self ) void {
465
- self .segments_lock .lockShared ();
466
- defer self .segments_lock .unlockShared ();
477
+ var target = try MemorySegmentList .createSegment (self .allocator , .{});
478
+ defer MemorySegmentList .destroySegment (self .allocator , & target );
467
479
468
- if (self .memory_segments .needsMerge ()) {
469
- self .memory_segment_merge_event .set ();
470
- }
471
- }
480
+ try target .value .build (changes );
472
481
473
- fn scheduleFileSegmentMerge (self : * Self ) void {
474
- self .segments_lock .lockShared ();
475
- defer self .segments_lock .unlockShared ();
482
+ var upd = try self .memory_segments .beginUpdate ();
483
+ defer self .memory_segments .cleanupAfterUpdate (& upd );
476
484
477
- if (self .file_segments .needsMerge ()) {
478
- self .file_segment_merge_event .set ();
479
- }
480
- }
485
+ upd .appendSegment (target );
481
486
482
- pub fn update (self : * Self , changes : []const Change ) ! void {
483
- log .debug ("update with {} changes" , .{changes .len });
484
- try self .oplog .write (changes , Updater { .index = self });
485
- self .scheduleMemorySegmentMerge ();
487
+ try self .oplog .write (changes );
488
+
489
+ self .segments_lock .lock ();
490
+ defer self .segments_lock .unlock ();
491
+
492
+ self .memory_segments .commitUpdate (& upd );
493
+
494
+ if (self .memory_segments .needsMerge ()) {
495
+ self .memory_segment_merge_event .set ();
496
+ }
486
497
}
487
498
488
499
const SegmentsSnapshot = struct {
0 commit comments