Skip to content

Commit 2b51887

Browse files
authored
fix: multithreaded nested fixpoint iteration (#882)
* Set `validate_final` in `execute` after removing the last cycle head * Add runaway query repro * Add tracing * Fix part 1 * Fix `cycle_head_kinds` to always return provisional for memos that aren't verified final (They should be validated by `validate_same_iteration` or wait for the cycle head * Fix cycle error * Documentation * Fix await for queries depending on initial value * correctly initialize queued * Cleanup * Short circuit if entire query runs on single thread * Move parallel code into its own method * Rename method, add self_key to queued * Revert self-key changes * Move check *after* `deep_verify_memo` * Add a test for a cycle with changing cycle heads * Short circuit more often * Consider iteration in `validate_provisional` * Only yield if all heads result in a cycle. Retry if even just one inner cycle made progress (in which case there's a probably a new memo) * Fix hangs * Cargo fmt * clippy * Fix hang if cycle initial panics * Rename `cycle_head_kind` enable `cycle_a_t1_b_t2_fallback` shuttle test * Cleanup * Docs
1 parent 80fb79e commit 2b51887

25 files changed

+878
-224
lines changed

src/active_query.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{fmt, mem, ops};
33
use crate::accumulator::accumulated_map::{
44
AccumulatedMap, AtomicInputAccumulatedValues, InputAccumulatedValues,
55
};
6-
use crate::cycle::CycleHeads;
6+
use crate::cycle::{CycleHeads, IterationCount};
77
use crate::durability::Durability;
88
use crate::hash::FxIndexSet;
99
use crate::key::DatabaseKeyIndex;
@@ -61,7 +61,7 @@ pub(crate) struct ActiveQuery {
6161
cycle_heads: CycleHeads,
6262

6363
/// If this query is a cycle head, iteration count of that cycle.
64-
iteration_count: u32,
64+
iteration_count: IterationCount,
6565
}
6666

6767
impl ActiveQuery {
@@ -147,7 +147,7 @@ impl ActiveQuery {
147147
}
148148
}
149149

150-
pub(super) fn iteration_count(&self) -> u32 {
150+
pub(super) fn iteration_count(&self) -> IterationCount {
151151
self.iteration_count
152152
}
153153

@@ -161,7 +161,7 @@ impl ActiveQuery {
161161
}
162162

163163
impl ActiveQuery {
164-
fn new(database_key_index: DatabaseKeyIndex, iteration_count: u32) -> Self {
164+
fn new(database_key_index: DatabaseKeyIndex, iteration_count: IterationCount) -> Self {
165165
ActiveQuery {
166166
database_key_index,
167167
durability: Durability::MAX,
@@ -189,7 +189,7 @@ impl ActiveQuery {
189189
ref mut accumulated,
190190
accumulated_inputs,
191191
ref mut cycle_heads,
192-
iteration_count: _,
192+
iteration_count,
193193
} = self;
194194

195195
let origin = if untracked_read {
@@ -204,6 +204,7 @@ impl ActiveQuery {
204204
mem::take(accumulated),
205205
mem::take(tracked_struct_ids),
206206
mem::take(cycle_heads),
207+
iteration_count,
207208
);
208209
let accumulated_inputs = AtomicInputAccumulatedValues::new(accumulated_inputs);
209210

@@ -236,10 +237,14 @@ impl ActiveQuery {
236237
tracked_struct_ids.clear();
237238
accumulated.clear();
238239
*cycle_heads = Default::default();
239-
*iteration_count = 0;
240+
*iteration_count = IterationCount::initial();
240241
}
241242

242-
fn reset_for(&mut self, new_database_key_index: DatabaseKeyIndex, new_iteration_count: u32) {
243+
fn reset_for(
244+
&mut self,
245+
new_database_key_index: DatabaseKeyIndex,
246+
new_iteration_count: IterationCount,
247+
) {
243248
let Self {
244249
database_key_index,
245250
durability,
@@ -323,7 +328,7 @@ impl QueryStack {
323328
pub(crate) fn push_new_query(
324329
&mut self,
325330
database_key_index: DatabaseKeyIndex,
326-
iteration_count: u32,
331+
iteration_count: IterationCount,
327332
) {
328333
if self.len < self.stack.len() {
329334
self.stack[self.len].reset_for(database_key_index, iteration_count);
@@ -373,7 +378,7 @@ struct CapturedQuery {
373378
durability: Durability,
374379
changed_at: Revision,
375380
cycle_heads: CycleHeads,
376-
iteration_count: u32,
381+
iteration_count: IterationCount,
377382
}
378383

379384
impl fmt::Debug for CapturedQuery {
@@ -449,7 +454,7 @@ impl fmt::Display for Backtrace {
449454
write!(fmt, "{idx:>4}: {database_key_index:?}")?;
450455
if full {
451456
write!(fmt, " -> ({changed_at:?}, {durability:#?}")?;
452-
if !cycle_heads.is_empty() || iteration_count > 0 {
457+
if !cycle_heads.is_empty() || !iteration_count.is_initial() {
453458
write!(fmt, ", iteration = {iteration_count:?}")?;
454459
}
455460
write!(fmt, ")")?;

src/cycle.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use crate::sync::OnceLock;
6060
/// The maximum number of times we'll fixpoint-iterate before panicking.
6161
///
6262
/// Should only be relevant in case of a badly configured cycle recovery.
63-
pub const MAX_ITERATIONS: u32 = 200;
63+
pub const MAX_ITERATIONS: IterationCount = IterationCount(200);
6464

6565
pub struct UnexpectedCycle(Option<crate::Backtrace>);
6666

@@ -147,7 +147,33 @@ pub enum CycleRecoveryStrategy {
147147
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
148148
pub struct CycleHead {
149149
pub(crate) database_key_index: DatabaseKeyIndex,
150-
pub(crate) iteration_count: u32,
150+
pub(crate) iteration_count: IterationCount,
151+
}
152+
153+
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Default)]
154+
pub struct IterationCount(u8);
155+
156+
impl IterationCount {
157+
pub(crate) const fn initial() -> Self {
158+
Self(0)
159+
}
160+
161+
pub(crate) const fn is_initial(self) -> bool {
162+
self.0 == 0
163+
}
164+
165+
pub(crate) const fn increment(self) -> Option<Self> {
166+
let next = Self(self.0 + 1);
167+
if next.0 <= MAX_ITERATIONS.0 {
168+
Some(next)
169+
} else {
170+
None
171+
}
172+
}
173+
174+
pub(crate) const fn as_u32(self) -> u32 {
175+
self.0 as u32
176+
}
151177
}
152178

153179
/// Any provisional value generated by any query in a cycle will track the cycle head(s) (can be
@@ -164,7 +190,7 @@ impl CycleHeads {
164190
pub(crate) fn initial(database_key_index: DatabaseKeyIndex) -> Self {
165191
Self(thin_vec![CycleHead {
166192
database_key_index,
167-
iteration_count: 0,
193+
iteration_count: IterationCount::initial(),
168194
}])
169195
}
170196

@@ -190,7 +216,7 @@ impl CycleHeads {
190216
pub(crate) fn update_iteration_count(
191217
&mut self,
192218
cycle_head_index: DatabaseKeyIndex,
193-
new_iteration_count: u32,
219+
new_iteration_count: IterationCount,
194220
) {
195221
if let Some(cycle_head) = self
196222
.0
@@ -208,11 +234,11 @@ impl CycleHeads {
208234
.iter()
209235
.find(|candidate| candidate.database_key_index == database_key_index)
210236
{
211-
assert_eq!(existing.iteration_count, 0);
237+
assert_eq!(existing.iteration_count, IterationCount::initial());
212238
} else {
213239
self.0.push(CycleHead {
214240
database_key_index,
215-
iteration_count: 0,
241+
iteration_count: IterationCount::initial(),
216242
});
217243
}
218244
}
@@ -266,8 +292,18 @@ pub(crate) fn empty_cycle_heads() -> &'static CycleHeads {
266292
}
267293

268294
#[derive(Debug, PartialEq, Eq)]
269-
pub enum CycleHeadKind {
270-
Provisional,
271-
NotProvisional,
295+
pub enum ProvisionalStatus {
296+
Provisional { iteration: IterationCount },
297+
Final { iteration: IterationCount },
272298
FallbackImmediate,
273299
}
300+
301+
impl ProvisionalStatus {
302+
pub(crate) const fn iteration(&self) -> Option<IterationCount> {
303+
match self {
304+
ProvisionalStatus::Provisional { iteration } => Some(*iteration),
305+
ProvisionalStatus::Final { iteration } => Some(*iteration),
306+
ProvisionalStatus::FallbackImmediate => None,
307+
}
308+
}
309+
}

src/event.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::cycle::IterationCount;
12
use crate::key::DatabaseKeyIndex;
23
use crate::sync::thread::{self, ThreadId};
34
use crate::Revision;
@@ -61,7 +62,7 @@ pub enum EventKind {
6162
WillIterateCycle {
6263
/// The database-key for the cycle head. Implements `Debug`.
6364
database_key: DatabaseKeyIndex,
64-
iteration_count: u32,
65+
iteration_count: IterationCount,
6566
fell_back: bool,
6667
},
6768

src/function.rs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
1+
pub(crate) use maybe_changed_after::VerifyResult;
12
use std::any::Any;
23
use std::fmt;
34
use std::ptr::NonNull;
4-
5-
pub(crate) use maybe_changed_after::VerifyResult;
5+
use std::sync::atomic::Ordering;
6+
pub(crate) use sync::SyncGuard;
67

78
use crate::accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues};
8-
use crate::cycle::{CycleHeadKind, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy};
9+
use crate::cycle::{
10+
empty_cycle_heads, CycleHeads, CycleRecoveryAction, CycleRecoveryStrategy, ProvisionalStatus,
11+
};
912
use crate::function::delete::DeletedEntries;
1013
use crate::function::sync::{ClaimResult, SyncTable};
11-
use crate::ingredient::Ingredient;
14+
use crate::ingredient::{Ingredient, WaitForResult};
1215
use crate::key::DatabaseKeyIndex;
1316
use crate::plumbing::MemoIngredientMap;
1417
use crate::salsa_struct::SalsaStructInDb;
@@ -244,30 +247,46 @@ where
244247
self.maybe_changed_after(db, input, revision, cycle_heads)
245248
}
246249

247-
/// True if the input `input` contains a memo that cites itself as a cycle head.
248-
/// This indicates an intermediate value for a cycle that has not yet reached a fixed point.
249-
fn cycle_head_kind(&self, zalsa: &Zalsa, input: Id) -> CycleHeadKind {
250-
let is_provisional = self
251-
.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))
252-
.is_some_and(|memo| {
253-
memo.cycle_heads()
254-
.into_iter()
255-
.any(|head| head.database_key_index == self.database_key_index(input))
256-
});
257-
if is_provisional {
258-
CycleHeadKind::Provisional
259-
} else if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate {
260-
CycleHeadKind::FallbackImmediate
250+
/// Returns `final` only if the memo has the `verified_final` flag set and the cycle recovery strategy is not `FallbackImmediate`.
251+
///
252+
/// Otherwise, the value is still provisional. For both final and provisional, it also
253+
/// returns the iteration in which this memo was created (always 0 except for cycle heads).
254+
fn provisional_status(&self, zalsa: &Zalsa, input: Id) -> Option<ProvisionalStatus> {
255+
let memo =
256+
self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))?;
257+
258+
let iteration = memo.revisions.iteration();
259+
let verified_final = memo.revisions.verified_final.load(Ordering::Relaxed);
260+
261+
Some(if verified_final {
262+
if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate {
263+
ProvisionalStatus::FallbackImmediate
264+
} else {
265+
ProvisionalStatus::Final { iteration }
266+
}
261267
} else {
262-
CycleHeadKind::NotProvisional
263-
}
268+
ProvisionalStatus::Provisional { iteration }
269+
})
270+
}
271+
272+
fn cycle_heads<'db>(&self, zalsa: &'db Zalsa, input: Id) -> &'db CycleHeads {
273+
self.get_memo_from_table_for(zalsa, input, self.memo_ingredient_index(zalsa, input))
274+
.map(|memo| memo.cycle_heads())
275+
.unwrap_or(empty_cycle_heads())
264276
}
265277

266-
/// Attempts to claim `key_index`, returning `false` if a cycle occurs.
267-
fn wait_for(&self, zalsa: &Zalsa, key_index: Id) -> bool {
278+
/// Attempts to claim `key_index` without blocking.
279+
///
280+
/// * [`WaitForResult::Running`] if the `key_index` is running on another thread. It's up to the caller to block on the other thread
281+
/// to wait until the result becomes available.
282+
/// * [`WaitForResult::Available`] It is (or at least was) possible to claim the `key_index`
283+
/// * [`WaitResult::Cycle`] Claiming the `key_index` results in a cycle because it's on the current's thread query stack or
284+
/// running on another thread that is blocked on this thread.
285+
fn wait_for<'me>(&'me self, zalsa: &'me Zalsa, key_index: Id) -> WaitForResult<'me> {
268286
match self.sync_table.try_claim(zalsa, key_index) {
269-
ClaimResult::Retry | ClaimResult::Claimed(_) => true,
270-
ClaimResult::Cycle => false,
287+
ClaimResult::Running(blocked_on) => WaitForResult::Running(blocked_on),
288+
ClaimResult::Cycle { same_thread } => WaitForResult::Cycle { same_thread },
289+
ClaimResult::Claimed(_) => WaitForResult::Available,
271290
}
272291
}
273292

src/function/accumulated.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ where
9696
db: &'db C::DbView,
9797
key: Id,
9898
) -> (Option<&'db AccumulatedMap>, InputAccumulatedValues) {
99+
let (zalsa, zalsa_local) = db.zalsas();
99100
// NEXT STEP: stash and refactor `fetch` to return an `&Memo` so we can make this work
100-
let memo = self.refresh_memo(db, db.zalsa(), key);
101+
let memo = self.refresh_memo(db, zalsa, zalsa_local, key);
101102
(
102103
memo.revisions.accumulated(),
103104
memo.revisions.accumulated_inputs.load(),

src/function/execute.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::cycle::{CycleRecoveryStrategy, MAX_ITERATIONS};
1+
use crate::cycle::{CycleRecoveryStrategy, IterationCount};
22
use crate::function::memo::Memo;
33
use crate::function::{Configuration, IngredientImpl};
44
use crate::sync::atomic::{AtomicBool, Ordering};
@@ -74,7 +74,9 @@ where
7474
// Cycle participants that don't have a fallback will be discarded in
7575
// `validate_provisional()`.
7676
let cycle_heads = std::mem::take(cycle_heads);
77-
let active_query = db.zalsa_local().push_query(database_key_index, 0);
77+
let active_query = db
78+
.zalsa_local()
79+
.push_query(database_key_index, IterationCount::initial());
7880
new_value = C::cycle_initial(db, C::id_to_input(db, id));
7981
revisions = active_query.pop();
8082
// We need to set `cycle_heads` and `verified_final` because it needs to propagate to the callers.
@@ -125,7 +127,7 @@ where
125127
memo_ingredient_index: MemoIngredientIndex,
126128
) -> (C::Output<'db>, QueryRevisions) {
127129
let database_key_index = active_query.database_key_index;
128-
let mut iteration_count: u32 = 0;
130+
let mut iteration_count = IterationCount::initial();
129131
let mut fell_back = false;
130132

131133
// Our provisional value from the previous iteration, when doing fixpoint iteration.
@@ -189,12 +191,10 @@ where
189191
match C::recover_from_cycle(
190192
db,
191193
&new_value,
192-
iteration_count,
194+
iteration_count.as_u32(),
193195
C::id_to_input(db, id),
194196
) {
195-
crate::CycleRecoveryAction::Iterate => {
196-
tracing::debug!("{database_key_index:?}: execute: iterate again");
197-
}
197+
crate::CycleRecoveryAction::Iterate => {}
198198
crate::CycleRecoveryAction::Fallback(fallback_value) => {
199199
tracing::debug!(
200200
"{database_key_index:?}: execute: user cycle_fn says to fall back"
@@ -208,10 +208,9 @@ where
208208
}
209209
// `iteration_count` can't overflow as we check it against `MAX_ITERATIONS`
210210
// which is less than `u32::MAX`.
211-
iteration_count += 1;
212-
if iteration_count > MAX_ITERATIONS {
213-
panic!("{database_key_index:?}: execute: too many cycle iterations");
214-
}
211+
iteration_count = iteration_count.increment().unwrap_or_else(|| {
212+
panic!("{database_key_index:?}: execute: too many cycle iterations")
213+
});
215214
zalsa.event(&|| {
216215
Event::new(EventKind::WillIterateCycle {
217216
database_key: database_key_index,
@@ -220,6 +219,10 @@ where
220219
})
221220
});
222221
cycle_heads.update_iteration_count(database_key_index, iteration_count);
222+
revisions.update_iteration_count(iteration_count);
223+
tracing::debug!(
224+
"{database_key_index:?}: execute: iterate again, revisions: {revisions:#?}"
225+
);
223226
opt_last_provisional = Some(self.insert_memo(
224227
zalsa,
225228
id,

0 commit comments

Comments
 (0)