Skip to content

Commit 63ba949

Browse files
authored
[C#] BumpCurrentEpoch calling threads must be protected (#871)
* - Ensure threads calling BumpCurrentEpoch are protected - Back out change for shared bucket lock to wait for writer * Move try/finally block for better inlining
1 parent 32d04e6 commit 63ba949

File tree

5 files changed

+57
-42
lines changed

5 files changed

+57
-42
lines changed

cs/src/core/Epochs/EpochProtectedVersionScheme.cs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ public EpochProtectedVersionScheme(LightEpoch epoch)
274274
[MethodImpl(MethodImplOptions.AggressiveInlining)]
275275
private bool MakeTransition(VersionSchemeState expectedState, VersionSchemeState nextState)
276276
{
277-
if (Interlocked.CompareExchange(ref state.Word, nextState.Word, expectedState.Word) !=
278-
expectedState.Word) return false;
277+
if (Interlocked.CompareExchange(ref state.Word, nextState.Word, expectedState.Word) != expectedState.Word)
278+
return false;
279279
Debug.WriteLine("Moved to {0}, {1}", nextState.Phase, nextState.Version);
280280
return true;
281281
}
@@ -358,28 +358,35 @@ internal void TryStepStateMachine(VersionSchemeStateMachine expectedMachine = nu
358358

359359
var intermediate = VersionSchemeState.MakeIntermediate(oldState);
360360
if (!MakeTransition(oldState, intermediate)) return;
361+
361362
// Avoid upfront memory allocation by going to a function
362363
StepMachineHeavy(machineLocal, oldState, nextState);
363-
364-
// Ensure that state machine is able to make progress if this thread is the only active thread
365-
if (!epoch.ThisInstanceProtected())
366-
{
367-
epoch.Resume();
368-
epoch.Suspend();
369-
}
370364
}
371365

372366
[MethodImpl(MethodImplOptions.AggressiveInlining)]
373367
private void StepMachineHeavy(VersionSchemeStateMachine machineLocal, VersionSchemeState old, VersionSchemeState next)
374368
{
375-
epoch.BumpCurrentEpoch(() =>
369+
// // Resume epoch to ensure that state machine is able to make progress
370+
// if this thread is the only active thread. Also, StepMachineHeavy calls BumpCurrentEpoch, which requires a protected thread.
371+
bool isProtected = epoch.ThisInstanceProtected();
372+
if (!isProtected)
373+
epoch.Resume();
374+
try
376375
{
377-
machineLocal.OnEnteringState(old, next);
378-
var success = MakeTransition(VersionSchemeState.MakeIntermediate(old), next);
379-
machineLocal.AfterEnteringState(next);
380-
Debug.Assert(success);
381-
TryStepStateMachine(machineLocal);
382-
});
376+
epoch.BumpCurrentEpoch(() =>
377+
{
378+
machineLocal.OnEnteringState(old, next);
379+
var success = MakeTransition(VersionSchemeState.MakeIntermediate(old), next);
380+
machineLocal.AfterEnteringState(next);
381+
Debug.Assert(success);
382+
TryStepStateMachine(machineLocal);
383+
});
384+
}
385+
finally
386+
{
387+
if (!isProtected)
388+
epoch.Suspend();
389+
}
383390
}
384391

385392
/// <summary>

cs/src/core/Epochs/LightEpoch.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ public void Resume()
225225
/// <returns></returns>
226226
long BumpCurrentEpoch()
227227
{
228+
Debug.Assert(this.ThisInstanceProtected(), "BumpCurrentEpoch must be called on a protected thread");
228229
long nextEpoch = Interlocked.Increment(ref CurrentEpoch);
229230

230231
if (drainCount > 0)

cs/src/core/FasterLog/FasterLog.cs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -322,18 +322,22 @@ public void Dispose()
322322
/// <param name="spinWait"> whether to spin until log completion becomes committed </param>
323323
public void CompleteLog(bool spinWait = false)
324324
{
325-
326-
// Ensure all currently started entries will enqueue before we declare log closed
327-
epoch.BumpCurrentEpoch(() =>
325+
// Ensure progress even if there is no thread in epoch table. Also, BumpCurrentEpoch must be done on a protected thread.
326+
bool isProtected = epoch.ThisInstanceProtected();
327+
if (!isProtected)
328+
epoch.Resume();
329+
try
328330
{
329-
CommitInternal(out _, out _, false, Array.Empty<byte>(), long.MaxValue, null);
330-
});
331-
332-
// Ensure progress even if there is no thread in epoch table
333-
if (!epoch.ThisInstanceProtected())
331+
// Ensure all currently started entries will enqueue before we declare log closed
332+
epoch.BumpCurrentEpoch(() =>
333+
{
334+
CommitInternal(out _, out _, false, Array.Empty<byte>(), long.MaxValue, null);
335+
});
336+
}
337+
finally
334338
{
335-
epoch.Resume();
336-
epoch.Suspend();
339+
if (!isProtected)
340+
epoch.Suspend();
337341
}
338342

339343
if (spinWait)

cs/src/core/Index/FASTER/FASTERBase.cs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -116,27 +116,19 @@ public static bool TryAcquireSharedLatch(HashBucket* bucket)
116116

117117
for (int spinCount = Constants.kMaxLockSpins; ; Thread.Yield())
118118
{
119+
// Note: If reader starvation is encountered, consider rotating priority between reader and writer locks.
119120
long expected_word = entry_word;
120-
if ((expected_word & kSharedLatchBitMask) != kSharedLatchBitMask) // shared lock is not full
121+
if ((expected_word & kExclusiveLatchBitMask) == 0) // not exclusively locked
121122
{
122-
if (expected_word == Interlocked.CompareExchange(ref entry_word, expected_word + kSharedLatchIncrement, expected_word))
123-
break;
123+
if ((expected_word & kSharedLatchBitMask) != kSharedLatchBitMask) // shared lock is not full
124+
{
125+
if (expected_word == Interlocked.CompareExchange(ref entry_word, expected_word + kSharedLatchIncrement, expected_word))
126+
return true;
127+
}
124128
}
125129
if (--spinCount <= 0)
126130
return false;
127131
}
128-
129-
// Wait for any writer to drain. Another session may hold the XLock on this bucket and need an epoch refresh to unlock, so limit this to avoid deadlock.
130-
for (var ii = 0; ii < Constants.kMaxWriterLockDrainSpins; ++ii)
131-
{
132-
if ((entry_word & kExclusiveLatchBitMask) == 0)
133-
return true;
134-
Thread.Yield();
135-
}
136-
137-
// Release the shared-latch increment we added and return false so the caller will retry the operation.
138-
Interlocked.Add(ref entry_word, -kSharedLatchIncrement);
139-
return false;
140132
}
141133

142134
[MethodImpl(MethodImplOptions.AggressiveInlining)]

cs/src/core/Index/Synchronization/IndexResizeStateMachine.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,18 @@ public void GlobalAfterEnteringState<Key, Value>(
5353
switch (next.Phase)
5454
{
5555
case Phase.PREPARE_GROW:
56-
faster.epoch.BumpCurrentEpoch(() => allThreadsInPrepareGrow = true);
56+
bool isProtected = faster.epoch.ThisInstanceProtected();
57+
if (!isProtected)
58+
faster.epoch.Resume();
59+
try
60+
{
61+
faster.epoch.BumpCurrentEpoch(() => allThreadsInPrepareGrow = true);
62+
}
63+
finally
64+
{
65+
if (!isProtected)
66+
faster.epoch.Suspend();
67+
}
5768
break;
5869
case Phase.IN_PROGRESS_GROW:
5970
case Phase.REST:

0 commit comments

Comments
 (0)