Skip to content

Commit 60e162f

Browse files
authored
- Set both Sealed and Invalid on new records to ensure consistency both during normal operations and after recovery, which clears the Sealed bit. (#864)
- Seal ConcurrentUpdater and InPlaceUpdater source records on successful RCU even when not doing Standard locking
1 parent b15e1cd commit 60e162f

File tree

10 files changed

+115
-37
lines changed

10 files changed

+115
-37
lines changed

cs/src/core/Index/Common/RecordInfo.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,14 @@ public struct RecordInfo
6666

6767
public void WriteInfo(bool inNewVersion, bool tombstone, long previousAddress)
6868
{
69+
// For Recovery reasons, we need to have the record both Sealed and Invalid:
70+
// - Recovery removes the Sealed bit, so we need Invalid to survive from this point on to successful CAS.
71+
// Otherwise, Scan could return partial records (e.g. a checkpoint was taken that flushed midway through the record update).
72+
// - Revivification sets Sealed; we need to preserve it here.
73+
// We'll clear both on successful CAS.
6974
this.word = default;
7075
this.Tombstone = tombstone;
71-
this.SetValid();
76+
this.SealAndInvalidate();
7277
this.PreviousAddress = previousAddress;
7378
this.IsInNewVersion = inNewVersion;
7479
}
@@ -112,7 +117,9 @@ public void UnlockExclusive()
112117
{
113118
Debug.Assert(!IsLockedShared, "Trying to X unlock an S locked record");
114119
Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record");
115-
Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");
120+
121+
// Because we seal the source of an RCU and that source is likely locked, we cannot assert !IsSealed.
122+
// Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");
116123
word &= ~kExclusiveLockBitMask; // Safe because there should be no other threads (e.g., readers) updating the word at this point
117124
}
118125

@@ -123,8 +130,13 @@ public void UnlockExclusive()
123130
public void UnlockExclusiveAndSeal()
124131
{
125132
Debug.Assert(!IsLockedShared, "Trying to X unlock an S locked record");
126-
Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record");
127-
Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");
133+
134+
// Because we seal the source of an RCU and that source is likely locked, we cannot assert !IsSealed.
135+
// Debug.Assert(!IsSealed, "Trying to X unlock a Sealed record");
136+
137+
// For this we are Unlocking and Sealing without the cost of an "if EphemeralLocking", so do not assert this.
138+
// Debug.Assert(IsLockedExclusive, "Trying to X unlock an unlocked record");
139+
128140
word = (word & ~kExclusiveLockBitMask) | kSealedBitMask; // Safe because there should be no other threads (e.g., readers) updating the word at this point
129141
}
130142

@@ -322,6 +334,8 @@ public bool IsInNewVersion
322334
public void SetTombstone() => word |= kTombstoneBitMask;
323335
public void SetValid() => word |= kValidBitMask;
324336
public void SetInvalid() => word &= ~(kValidBitMask | kExclusiveLockBitMask);
337+
public void SealAndInvalidate() => word &= (word & ~kValidBitMask) | kSealedBitMask;
338+
public void UnsealAndValidate() => word = (word & ~kSealedBitMask) | kValidBitMask;
325339

326340
[MethodImpl(MethodImplOptions.AggressiveInlining)]
327341
public void SetInvalidAtomic()

cs/src/core/Index/FASTER/Implementation/ConditionalCopyToTail.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,22 @@ namespace FASTER.core
88
{
99
public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
1010
{
11+
/// <summary>
12+
/// Copy a record to the tail of the log after caller has verifyied it does not exist within a specified range.
13+
/// </summary>
14+
/// <param name="fasterSession">Callback functions.</param>
15+
/// <param name="pendingContext">pending context created when the operation goes pending.</param>
16+
/// <param name="key">key of the record.</param>
17+
/// <param name="input">input passed through.</param>
18+
/// <param name="value">the value to insert</param>
19+
/// <param name="output">Location to store output computed from input and value.</param>
20+
/// <param name="userContext">user context corresponding to operation used during completion callback.</param>
21+
/// <param name="lsn">Operation serial number</param>
22+
/// <param name="stackCtx">Contains information about the call context, record metadata, and so on</param>
23+
/// <param name="writeReason">The reason the CopyToTail is being done</param>
24+
/// <param name="wantIO">Whether to do IO if the search must go below HeadAddress. ReadFromImmutable, for example,
25+
/// is just an optimization to avoid future IOs, so if we need an IO here we just defer them to the next Read().</param>
26+
/// <returns></returns>
1127
[MethodImpl(MethodImplOptions.AggressiveInlining)]
1228
private OperationStatus ConditionalCopyToTail<Input, Output, Context, FasterSession>(FasterSession fasterSession,
1329
ref PendingContext<Input, Output, Context> pendingContext,
@@ -17,9 +33,11 @@ private OperationStatus ConditionalCopyToTail<Input, Output, Context, FasterSess
1733
{
1834
bool callerHasLock = stackCtx.recSrc.HasTransientLock;
1935

20-
// We are called by one of ReadFromImmutable, CompactionConditionalCopyToTail, or ContinueConditionalCopyToTail, and stackCtx is set up for the first try.
21-
// minAddress is the stackCtx.recSrc.LatestLogicalAddress; by the time we get here, any IO below that has been done due to PrepareConditionalCopyToTailIO,
22-
// which then went to ContinueConditionalCopyToTail, which evaluated whether the record was found at that level.
36+
// We are called by one of ReadFromImmutable, CompactionConditionalCopyToTail, or ContinuePendingConditionalCopyToTail;
37+
// these have already searched to see if the record is present above minAddress, and stackCtx is set up for the first try.
38+
// minAddress is the stackCtx.recSrc.LatestLogicalAddress; by the time we get here, any IO below that has been done due to
39+
// PrepareConditionalCopyToTailIO, which then went to ContinuePendingConditionalCopyToTail, which evaluated whether the
40+
// record was found at that level.
2341
while (true)
2442
{
2543
// ConditionalCopyToTail is different in regard to locking from the usual procedures, in that if we find a source record we don't lock--we exit with success.

cs/src/core/Index/FASTER/Implementation/Helpers.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,12 @@ private bool CASRecordIntoChain(ref Key key, ref OperationStackContext<Key, Valu
9595
if (DoEphemeralLocking)
9696
newRecordInfo.InitializeLockExclusive();
9797

98-
return stackCtx.recSrc.LowestReadCachePhysicalAddress == Constants.kInvalidAddress
98+
var result = stackCtx.recSrc.LowestReadCachePhysicalAddress == Constants.kInvalidAddress
9999
? stackCtx.hei.TryCAS(newLogicalAddress)
100100
: SpliceIntoHashChainAtReadCacheBoundary(ref key, ref stackCtx, newLogicalAddress);
101+
if (result)
102+
newRecordInfo.UnsealAndValidate();
103+
return result;
101104
}
102105

103106
[MethodImpl(MethodImplOptions.AggressiveInlining)]

cs/src/core/Index/FASTER/Implementation/InternalRMW.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,9 @@ private OperationStatus CreateNewRecordRMW<Input, Output, Context, FasterSession
483483
{
484484
// Else it was a CopyUpdater so call PCU
485485
fasterSession.PostCopyUpdater(ref key, ref input, ref value, ref hlog.GetValue(newPhysicalAddress), ref output, ref newRecordInfo, ref rmwInfo);
486-
if (stackCtx.recSrc.ephemeralLockResult == EphemeralLockResult.HoldForSeal)
486+
487+
// Success should always Seal the old record if it's in mutable.
488+
if (stackCtx.recSrc.HasMainLogSrc && stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress)
487489
srcRecordInfo.UnlockExclusiveAndSeal();
488490
}
489491
stackCtx.ClearNewRecord();

cs/src/core/Index/FASTER/Implementation/InternalRead.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ private OperationStatus ReadFromMutableRegion<Input, Output, Context, FasterSess
233233

234234
try
235235
{
236+
if (srcRecordInfo.IsClosed && !useStartAddress)
237+
return OperationStatus.RETRY_LATER;
236238
if (srcRecordInfo.Tombstone)
237239
return OperationStatus.NOTFOUND;
238240

@@ -277,8 +279,11 @@ private OperationStatus ReadFromImmutableRegion<Input, Output, Context, FasterSe
277279

278280
try
279281
{
282+
if (srcRecordInfo.IsClosed && !useStartAddress)
283+
return OperationStatus.RETRY_LATER;
280284
if (srcRecordInfo.Tombstone)
281285
return OperationStatus.NOTFOUND;
286+
282287
ref Value recordValue = ref stackCtx.recSrc.GetValue();
283288

284289
if (fasterSession.SingleReader(ref key, ref input, ref recordValue, ref output, ref srcRecordInfo, ref readInfo))

cs/src/core/Index/FASTER/Implementation/InternalUpsert.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,11 @@ private OperationStatus CreateNewRecordUpsert<Input, Output, Context, FasterSess
346346
PostCopyToTail(ref key, ref stackCtx, ref srcRecordInfo);
347347

348348
fasterSession.PostSingleWriter(ref key, ref input, ref value, ref newValue, ref output, ref newRecordInfo, ref upsertInfo, WriteReason.Upsert);
349-
if (stackCtx.recSrc.ephemeralLockResult == EphemeralLockResult.HoldForSeal)
349+
350+
// Success should always Seal the old record if it's in mutable.
351+
if (stackCtx.recSrc.HasMainLogSrc && stackCtx.recSrc.LogicalAddress >= hlog.ReadOnlyAddress)
350352
srcRecordInfo.UnlockExclusiveAndSeal();
353+
351354
stackCtx.ClearNewRecord();
352355
pendingContext.recordInfo = newRecordInfo;
353356
pendingContext.logicalAddress = newLogicalAddress;

cs/src/core/Index/FASTER/Implementation/TryCopyToReadCache.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT license.
33

4-
using System.Threading;
5-
64
namespace FASTER.core
75
{
86
public unsafe partial class FasterKV<Key, Value> : FasterBase, IFasterKV<Key, Value>
@@ -58,6 +56,8 @@ internal bool TryCopyToReadCache<Input, Output, Context, FasterSession>(FasterSe
5856

5957
// ReadCache entries are CAS'd in as the first entry in the hash chain.
6058
var success = stackCtx.hei.TryCAS(newLogicalAddress | Constants.kReadCacheBitMask);
59+
if (success)
60+
newRecordInfo.UnsealAndValidate();
6161
var casSuccess = success;
6262

6363
if (success && stackCtx.recSrc.LowestReadCacheLogicalAddress != Constants.kInvalidAddress)

cs/src/core/Index/FASTER/Implementation/TryCopyToTail.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ internal OperationStatus TryCopyToTail<Input, Output, Context, FasterSession>(re
7878
}
7979

8080
if (success)
81+
{
82+
newRecordInfo.UnsealAndValidate();
8183
PostCopyToTail(ref key, ref stackCtx, ref srcRecordInfo, pendingContext.InitialEntryAddress);
84+
}
8285
else
8386
{
8487
stackCtx.SetNewRecordInvalid(ref newRecordInfo);

cs/test/MiscFASTERTests.cs

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using FASTER.core;
55
using NUnit.Framework;
6+
using System;
67
using static FASTER.test.TestUtils;
78

89
namespace FASTER.test
@@ -102,18 +103,25 @@ public void MixedTest2()
102103

103104
[Test]
104105
[Category("FasterKV")]
105-
public void ShouldCreateNewRecordIfConcurrentWriterReturnsFalse()
106+
public void ForceRCUAndRecover([Values(UpdateOp.Upsert, UpdateOp.Delete)] UpdateOp updateOp)
106107
{
107108
var copyOnWrite = new FunctionsCopyOnWrite();
108109

109110
// FunctionsCopyOnWrite
110111
var log = default(IDevice);
112+
FasterKV<KeyStruct, ValueStruct> fht = default;
113+
ClientSession<KeyStruct, ValueStruct, InputStruct, OutputStruct, Empty, IFunctions<KeyStruct, ValueStruct, InputStruct, OutputStruct, Empty>> session = default;
114+
111115
try
112116
{
113-
log = Devices.CreateLogDevice(TestUtils.MethodTestDir + "/hlog1.log", deleteOnClose: true);
114-
using var fht = new FasterKV<KeyStruct, ValueStruct>
115-
(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 }, lockingMode: LockingMode.None);
116-
using var session = fht.NewSession(copyOnWrite);
117+
var checkpointDir = MethodTestDir + $"/checkpoints";
118+
log = Devices.CreateLogDevice(MethodTestDir + "/hlog1.log", deleteOnClose: true);
119+
fht = new FasterKV<KeyStruct, ValueStruct>
120+
(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 },
121+
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir },
122+
lockingMode: LockingMode.None);
123+
124+
session = fht.NewSession(copyOnWrite);
117125

118126
var key = default(KeyStruct);
119127
var value = default(ValueStruct);
@@ -126,28 +134,58 @@ public void ShouldCreateNewRecordIfConcurrentWriterReturnsFalse()
126134
var status = session.Upsert(ref key, ref input, ref value, ref output, out RecordMetadata recordMetadata1);
127135
Assert.IsTrue(!status.Found && status.Record.Created, status.ToString());
128136

129-
// ConcurrentWriter returns false, so we create a new record.
137+
// ConcurrentWriter and InPlaceUpater return false, so we create a new record.
138+
RecordMetadata recordMetadata2;
130139
value = new ValueStruct() { vfield1 = 1001, vfield2 = 2002 };
131-
status = session.Upsert(ref key, ref input, ref value, ref output, out RecordMetadata recordMetadata2);
132-
Assert.IsTrue(!status.Found && status.Record.Created, status.ToString());
133-
140+
if (updateOp == UpdateOp.Upsert)
141+
{
142+
status = session.Upsert(ref key, ref input, ref value, ref output, out recordMetadata2);
143+
Assert.AreEqual(1, copyOnWrite.ConcurrentWriterCallCount);
144+
Assert.IsTrue(!status.Found && status.Record.Created, status.ToString());
145+
}
146+
else
147+
{
148+
status = session.RMW(ref key, ref input, ref output, out recordMetadata2);
149+
Assert.AreEqual(1, copyOnWrite.InPlaceUpdaterCallCount);
150+
Assert.IsTrue(status.Found && status.Record.CopyUpdated, status.ToString());
151+
}
134152
Assert.Greater(recordMetadata2.Address, recordMetadata1.Address);
135153

136-
var recordCount = 0;
137154
using (var iterator = fht.Log.Scan(fht.Log.BeginAddress, fht.Log.TailAddress))
138155
{
139-
// We should get both the old and the new records.
140-
while (iterator.GetNext(out var info))
141-
recordCount++;
156+
Assert.True(iterator.GetNext(out var info)); // We should only get the new record...
157+
Assert.False(iterator.GetNext(out info)); // ... the old record was Sealed.
142158
}
159+
status = session.Read(ref key, ref output);
160+
Assert.IsTrue(status.Found, status.ToString());
161+
162+
fht.TryInitiateFullCheckpoint(out Guid token, CheckpointType.Snapshot);
163+
fht.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();
143164

144-
Assert.AreEqual(1, copyOnWrite.ConcurrentWriterCallCount);
145-
Assert.AreEqual(2, recordCount);
165+
session.Dispose();
166+
fht.Dispose();
167+
168+
fht = new FasterKV<KeyStruct, ValueStruct>
169+
(128, new LogSettings { LogDevice = log, MemorySizeBits = 29 },
170+
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir },
171+
lockingMode: LockingMode.None);
172+
173+
fht.Recover(token);
174+
session = fht.NewSession(copyOnWrite);
175+
176+
using (var iterator = fht.Log.Scan(fht.Log.BeginAddress, fht.Log.TailAddress))
177+
{
178+
Assert.True(iterator.GetNext(out var info)); // We should get both records...
179+
Assert.True(iterator.GetNext(out info)); // ... the old record was Unsealed by Recovery.
180+
}
181+
status = session.Read(ref key, ref output);
182+
Assert.IsTrue(status.Found, status.ToString());
146183
}
147184
finally
148185
{
149-
if (log != null)
150-
log.Dispose();
186+
session?.Dispose();
187+
fht?.Dispose();
188+
log?.Dispose();
151189
}
152190
}
153191
}

cs/test/StateMachineTests.cs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -682,14 +682,6 @@ void createSessions(out SimpleFunctions f,
682682
fht1.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();
683683
}
684684

685-
686-
bool tryStartLUC(
687-
ref LockableUnsafeContext<AdId, NumClicks, NumClicks, NumClicks, Empty, SimpleFunctions> luContext,
688-
ClientSession<AdId, NumClicks, NumClicks, NumClicks, Empty, SimpleFunctions> session)
689-
{
690-
luContext = session.LockableUnsafeContext;
691-
return !session.IsInPreparePhase();
692-
}
693685
void RecoverAndTest(IDevice log)
694686
{
695687
NumClicks inputArg = default;

0 commit comments

Comments
 (0)