Skip to content

Commit 4d5412b

Browse files
authored
[C# ]Add TryLock and TryPromoteSharedToExclusive (#848)
* Add Try(Promote)Lock * Add drain of writer to HashBucket.TryLockShared Clean up the latching spinCount loops and add "readonly" to "get" accessors in HashBucket and RecordInfo * remove stray SetDirty (it's set later) * Add UpsertOptions, RMWOptions, and DeleteOptions, all with KeyHash, and add overloads of these operations to take this argument. Add KeyHash to ReadOptions and add more Read overloads to include ReadOptions. * LockCode => KeyHash * Clean up more LockCode -> KeyHash; handle null pool in SectorAlignedMemory.Return * Move GetKeyHash from ILockableContext to IFasterContext and implement on all session types
1 parent 7e41e49 commit 4d5412b

37 files changed

+2777
-461
lines changed

cs/src/core/Async/DeleteAsync.cs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
1313
{
1414
internal struct DeleteAsyncOperation<Input, Output, Context> : IAsyncOperation<Input, Output, Context, DeleteAsyncResult<Input, Output, Context>>
1515
{
16+
DeleteOptions deleteOptions;
17+
18+
internal DeleteAsyncOperation(ref DeleteOptions deleteOptions)
19+
{
20+
this.deleteOptions = deleteOptions;
21+
}
22+
1623
/// <inheritdoc/>
1724
public DeleteAsyncResult<Input, Output, Context> CreateCompletedResult(Status status, Output output, RecordMetadata recordMetadata) => new DeleteAsyncResult<Input, Output, Context>(status);
1825

@@ -21,9 +28,11 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
2128
out Output output)
2229
{
2330
OperationStatus internalStatus;
31+
ref var key = ref pendingContext.key.Get();
32+
var keyHash = deleteOptions.KeyHash ?? fasterKV.comparer.GetHashCode64(ref key);
2433
do
2534
{
26-
internalStatus = fasterKV.InternalDelete(ref pendingContext.key.Get(), ref pendingContext.userContext, ref pendingContext, fasterSession, pendingContext.serialNum);
35+
internalStatus = fasterKV.InternalDelete(ref key, keyHash, ref pendingContext.userContext, ref pendingContext, fasterSession, pendingContext.serialNum);
2736
} while (fasterKV.HandleImmediateRetryStatus(internalStatus, fasterSession, ref pendingContext));
2837
output = default;
2938
return TranslateStatus(internalStatus);
@@ -32,7 +41,7 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
3241
/// <inheritdoc/>
3342
public ValueTask<DeleteAsyncResult<Input, Output, Context>> DoSlowOperation(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
3443
PendingContext<Input, Output, Context> pendingContext, CancellationToken token)
35-
=> SlowDeleteAsync(fasterKV, fasterSession, pendingContext, token);
44+
=> SlowDeleteAsync(fasterKV, fasterSession, pendingContext, deleteOptions, token);
3645

3746
/// <inheritdoc/>
3847
public bool HasPendingIO => false;
@@ -55,11 +64,11 @@ internal DeleteAsyncResult(Status status)
5564
}
5665

5766
internal DeleteAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
58-
PendingContext<Input, Output, Context> pendingContext, ExceptionDispatchInfo exceptionDispatchInfo)
67+
PendingContext<Input, Output, Context> pendingContext, ref DeleteOptions deleteOptions, ExceptionDispatchInfo exceptionDispatchInfo)
5968
{
6069
this.Status = new(StatusCode.Pending);
6170
updateAsyncInternal = new AsyncOperationInternal<Input, Output, Context, DeleteAsyncOperation<Input, Output, Context>, DeleteAsyncResult<Input, Output, Context>>(
62-
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new ());
71+
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new (ref deleteOptions));
6372
}
6473

6574
/// <summary>Complete the Delete operation, issuing additional allocation asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
@@ -76,7 +85,7 @@ public ValueTask<DeleteAsyncResult<Input, Output, Context>> CompleteAsync(Cancel
7685

7786
[MethodImpl(MethodImplOptions.AggressiveInlining)]
7887
internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input, Output, Context, FasterSession>(FasterSession fasterSession,
79-
ref Key key, Context userContext, long serialNo, CancellationToken token = default)
88+
ref Key key, ref DeleteOptions deleteOptions, Context userContext, long serialNo, CancellationToken token = default)
8089
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
8190
{
8291
var pcontext = new PendingContext<Input, Output, Context> { IsAsync = true };
@@ -85,9 +94,10 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
8594
try
8695
{
8796
OperationStatus internalStatus;
97+
var keyHash = deleteOptions.KeyHash ?? comparer.GetHashCode64(ref key);
8898
do
8999
{
90-
internalStatus = InternalDelete(ref key, ref userContext, ref pcontext, fasterSession, serialNo);
100+
internalStatus = InternalDelete(ref key, keyHash, ref userContext, ref pcontext, fasterSession, serialNo);
91101
} while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));
92102

93103
if (OperationStatusUtils.TryConvertToCompletedStatusCode(internalStatus, out Status status))
@@ -101,17 +111,17 @@ internal ValueTask<DeleteAsyncResult<Input, Output, Context>> DeleteAsync<Input,
101111
fasterSession.UnsafeSuspendThread();
102112
}
103113

104-
return SlowDeleteAsync(this, fasterSession, pcontext, token);
114+
return SlowDeleteAsync(this, fasterSession, pcontext, deleteOptions, token);
105115
}
106116

107117
private static async ValueTask<DeleteAsyncResult<Input, Output, Context>> SlowDeleteAsync<Input, Output, Context>(
108118
FasterKV<Key, Value> @this,
109119
IFasterSession<Key, Value, Input, Output, Context> fasterSession,
110-
PendingContext<Input, Output, Context> pcontext, CancellationToken token = default)
120+
PendingContext<Input, Output, Context> pcontext, DeleteOptions deleteOptions, CancellationToken token = default)
111121
{
112122
ExceptionDispatchInfo exceptionDispatchInfo = await WaitForFlushCompletionAsync(@this, pcontext.flushEvent, token).ConfigureAwait(false);
113123
pcontext.flushEvent = default;
114-
return new DeleteAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, exceptionDispatchInfo);
124+
return new DeleteAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, ref deleteOptions, exceptionDispatchInfo);
115125
}
116126
}
117127
}

cs/src/core/Async/RMWAsync.cs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@ public partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
1414
internal struct RmwAsyncOperation<Input, Output, Context> : IAsyncOperation<Input, Output, Context, RmwAsyncResult<Input, Output, Context>>
1515
{
1616
AsyncIOContext<Key, Value> diskRequest;
17-
internal RmwAsyncOperation(AsyncIOContext<Key, Value> diskRequest) => this.diskRequest = diskRequest;
17+
RMWOptions rmwOptions;
18+
19+
internal RmwAsyncOperation(AsyncIOContext<Key, Value> diskRequest, ref RMWOptions rmwOptions)
20+
{
21+
this.diskRequest = diskRequest;
22+
this.rmwOptions = rmwOptions;
23+
}
1824

1925
/// <inheritdoc/>
2026
public RmwAsyncResult<Input, Output, Context> CreateCompletedResult(Status status, Output output, RecordMetadata recordMetadata) => new(status, output, recordMetadata);
@@ -25,8 +31,8 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
2531
{
2632
Status status = !this.diskRequest.IsDefault()
2733
? fasterKV.InternalCompletePendingRequestFromContext(fasterSession, this.diskRequest, ref pendingContext, out AsyncIOContext<Key, Value> newDiskRequest)
28-
: fasterKV.CallInternalRMW(fasterSession, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, pendingContext.userContext,
29-
pendingContext.serialNum, out newDiskRequest);
34+
: fasterKV.CallInternalRMW(fasterSession, ref pendingContext, ref pendingContext.key.Get(), ref pendingContext.input.Get(), ref pendingContext.output, ref this.rmwOptions,
35+
pendingContext.userContext, pendingContext.serialNum, out newDiskRequest);
3036
output = pendingContext.output;
3137
this.diskRequest = newDiskRequest;
3238
return status;
@@ -35,7 +41,7 @@ public Status DoFastOperation(FasterKV<Key, Value> fasterKV, ref PendingContext<
3541
/// <inheritdoc/>
3642
public ValueTask<RmwAsyncResult<Input, Output, Context>> DoSlowOperation(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
3743
PendingContext<Input, Output, Context> pendingContext, CancellationToken token)
38-
=> SlowRmwAsync(fasterKV, fasterSession, pendingContext, diskRequest, token);
44+
=> SlowRmwAsync(fasterKV, fasterSession, pendingContext, rmwOptions, diskRequest, token);
3945

4046
/// <inheritdoc/>
4147
public bool HasPendingIO => !this.diskRequest.IsDefault();
@@ -67,13 +73,13 @@ internal RmwAsyncResult(Status status, TOutput output, RecordMetadata recordMeta
6773
}
6874

6975
internal RmwAsyncResult(FasterKV<Key, Value> fasterKV, IFasterSession<Key, Value, Input, TOutput, Context> fasterSession,
70-
PendingContext<Input, TOutput, Context> pendingContext, AsyncIOContext<Key, Value> diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
76+
PendingContext<Input, TOutput, Context> pendingContext, ref RMWOptions rmwOptions, AsyncIOContext<Key, Value> diskRequest, ExceptionDispatchInfo exceptionDispatchInfo)
7177
{
7278
Status = new(StatusCode.Pending);
7379
this.Output = default;
7480
this.RecordMetadata = default;
7581
updateAsyncInternal = new AsyncOperationInternal<Input, TOutput, Context, RmwAsyncOperation<Input, TOutput, Context>, RmwAsyncResult<Input, TOutput, Context>>(
76-
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new (diskRequest));
82+
fasterKV, fasterSession, pendingContext, exceptionDispatchInfo, new (diskRequest, ref rmwOptions));
7783
}
7884

7985
/// <summary>Complete the RMW operation, issuing additional (rare) I/O asynchronously if needed. It is usually preferable to use Complete() instead of this.</summary>
@@ -105,7 +111,7 @@ public ValueTask<RmwAsyncResult<Input, TOutput, Context>> CompleteAsync(Cancella
105111

106112
[MethodImpl(MethodImplOptions.AggressiveInlining)]
107113
internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Output, Context, FasterSession>(FasterSession fasterSession,
108-
ref Key key, ref Input input, Context context, long serialNo, CancellationToken token = default)
114+
ref Key key, ref Input input, ref RMWOptions rmwOptions, Context context, long serialNo, CancellationToken token = default)
109115
where FasterSession : IFasterSession<Key, Value, Input, Output, Context>
110116
{
111117
var pcontext = new PendingContext<Input, Output, Context> { IsAsync = true };
@@ -115,7 +121,7 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu
115121
try
116122
{
117123
Output output = default;
118-
var status = CallInternalRMW(fasterSession, ref pcontext, ref key, ref input, ref output, context, serialNo, out diskRequest);
124+
var status = CallInternalRMW(fasterSession, ref pcontext, ref key, ref input, ref output, ref rmwOptions, context, serialNo, out diskRequest);
119125
if (!status.IsPending)
120126
return new ValueTask<RmwAsyncResult<Input, Output, Context>>(new RmwAsyncResult<Input, Output, Context>(status, output, new RecordMetadata(pcontext.recordInfo, pcontext.logicalAddress)));
121127
}
@@ -126,29 +132,30 @@ internal ValueTask<RmwAsyncResult<Input, Output, Context>> RmwAsync<Input, Outpu
126132
fasterSession.UnsafeSuspendThread();
127133
}
128134

129-
return SlowRmwAsync(this, fasterSession, pcontext, diskRequest, token);
135+
return SlowRmwAsync(this, fasterSession, pcontext, rmwOptions, diskRequest, token);
130136
}
131137

132138
[MethodImpl(MethodImplOptions.AggressiveInlining)]
133139
private Status CallInternalRMW<Input, Output, Context>(IFasterSession<Key, Value, Input, Output, Context> fasterSession, ref PendingContext<Input, Output, Context> pcontext,
134-
ref Key key, ref Input input, ref Output output, Context context, long serialNo, out AsyncIOContext<Key, Value> diskRequest)
140+
ref Key key, ref Input input, ref Output output, ref RMWOptions rmwOptions, Context context, long serialNo, out AsyncIOContext<Key, Value> diskRequest)
135141
{
136142
OperationStatus internalStatus;
143+
var keyHash = rmwOptions.KeyHash ?? comparer.GetHashCode64(ref key);
137144
do
138-
internalStatus = InternalRMW(ref key, ref input, ref output, ref context, ref pcontext, fasterSession, serialNo);
145+
internalStatus = InternalRMW(ref key, keyHash, ref input, ref output, ref context, ref pcontext, fasterSession, serialNo);
139146
while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));
140147

141148
return HandleOperationStatus(fasterSession.Ctx, ref pcontext, internalStatus, out diskRequest);
142149
}
143150

144151
private static async ValueTask<RmwAsyncResult<Input, Output, Context>> SlowRmwAsync<Input, Output, Context>(
145152
FasterKV<Key, Value> @this, IFasterSession<Key, Value, Input, Output, Context> fasterSession,
146-
PendingContext<Input, Output, Context> pcontext, AsyncIOContext<Key, Value> diskRequest, CancellationToken token = default)
153+
PendingContext<Input, Output, Context> pcontext, RMWOptions rmwOptions, AsyncIOContext<Key, Value> diskRequest, CancellationToken token = default)
147154
{
148155
ExceptionDispatchInfo exceptionDispatchInfo;
149156
(diskRequest, exceptionDispatchInfo) = await WaitForFlushOrIOCompletionAsync(@this, fasterSession.Ctx, pcontext.flushEvent, diskRequest, token);
150157
pcontext.flushEvent = default;
151-
return new RmwAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, diskRequest, exceptionDispatchInfo);
158+
return new RmwAsyncResult<Input, Output, Context>(@this, fasterSession, pcontext, ref rmwOptions, diskRequest, exceptionDispatchInfo);
152159
}
153160
}
154161
}

cs/src/core/Async/ReadAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,9 @@ private Status CallInternalRead<Input, Output, Context>(IFasterSession<Key, Valu
132132
out AsyncIOContext<Key, Value> diskRequest)
133133
{
134134
OperationStatus internalStatus;
135+
var keyHash = readOptions.KeyHash ?? comparer.GetHashCode64(ref key);
135136
do
136-
internalStatus = InternalRead(ref key, ref input, ref output, readOptions.StartAddress, ref context, ref pcontext, fasterSession, serialNo);
137+
internalStatus = InternalRead(ref key, keyHash, ref input, ref output, readOptions.StartAddress, ref context, ref pcontext, fasterSession, serialNo);
137138
while (HandleImmediateRetryStatus(internalStatus, fasterSession, ref pcontext));
138139

139140
return HandleOperationStatus(fasterSession.Ctx, ref pcontext, internalStatus, out diskRequest);

0 commit comments

Comments
 (0)