Skip to content

Commit 1c6dd81

Browse files
committed
Plubming EvaluatorCache through the evaluator.
1 parent 96d6629 commit 1c6dd81

File tree

3 files changed

+33
-29
lines changed

3 files changed

+33
-29
lines changed

src/execution/evaluator.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
utils::immutable::RefList,
1212
};
1313

14-
use super::memoization::{EvaluationCache, MemoizationInfo};
14+
use super::memoization::EvaluationCache;
1515

1616
#[derive(Debug)]
1717
pub struct ScopeValueBuilder {
@@ -285,22 +285,19 @@ fn assemble_input_values(
285285
.collect()
286286
}
287287

288-
struct CacheEntry {
289-
value: serde_json::Value,
290-
used: bool,
291-
}
292-
293288
async fn evaluate_child_op_scope(
294289
op_scope: &AnalyzedOpScope,
295290
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
296291
child_scope_entry: ScopeEntry<'_>,
292+
cache: Option<&EvaluationCache>,
297293
) -> Result<()> {
298-
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry)).await
294+
evaluate_op_scope(op_scope, scoped_entries.prepend(&child_scope_entry), cache).await
299295
}
300296

301297
async fn evaluate_op_scope(
302298
op_scope: &AnalyzedOpScope,
303299
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
300+
cache: Option<&EvaluationCache>,
304301
) -> Result<()> {
305302
let head_scope = *scoped_entries.head().unwrap();
306303
for reactive_op in op_scope.reactive_ops.iter() {
@@ -331,6 +328,7 @@ async fn evaluate_op_scope(
331328
value: &item,
332329
schema: &collection_schema.row,
333330
},
331+
cache,
334332
)
335333
})
336334
.collect::<Vec<_>>(),
@@ -345,6 +343,7 @@ async fn evaluate_op_scope(
345343
value: v,
346344
schema: &collection_schema.row,
347345
},
346+
cache,
348347
)
349348
})
350349
.collect::<Vec<_>>(),
@@ -360,6 +359,7 @@ async fn evaluate_op_scope(
360359
value: item,
361360
schema: &collection_schema.row,
362361
},
362+
cache,
363363
)
364364
})
365365
.collect::<Vec<_>>(),
@@ -395,8 +395,8 @@ pub async fn evaluate_source_entry<'a>(
395395
source_op_idx: u32,
396396
schema: &schema::DataSchema,
397397
key: &value::KeyValue,
398-
memoization_info: Option<MemoizationInfo>,
399-
) -> Result<Option<(ScopeValueBuilder, MemoizationInfo)>> {
398+
cache: Option<&EvaluationCache>,
399+
) -> Result<Option<ScopeValueBuilder>> {
400400
let root_schema = &schema.schema;
401401
let root_scope_value =
402402
ScopeValueBuilder::new(root_schema.fields.len(), schema.collectors.len());
@@ -419,23 +419,20 @@ pub async fn evaluate_source_entry<'a>(
419419

420420
let result = match source_op.executor.get_value(&key).await? {
421421
Some(val) => {
422-
let cache = memoization_info
423-
.map(|info| EvaluationCache::from_stored(info.cache))
424-
.unwrap_or_default();
425422
let scope_value =
426423
ScopeValueBuilder::augmented_from(value::ScopeValue(val), &collection_schema)?;
427424
root_scope_entry.define_field_w_builder(
428425
&source_op.output,
429426
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
430427
);
431428

432-
evaluate_op_scope(&plan.op_scope, RefList::Nil.prepend(&root_scope_entry)).await?;
433-
Some((
434-
root_scope_value,
435-
MemoizationInfo {
436-
cache: cache.into_stored()?,
437-
},
438-
))
429+
evaluate_op_scope(
430+
&plan.op_scope,
431+
RefList::Nil.prepend(&root_scope_entry),
432+
cache,
433+
)
434+
.await?;
435+
Some(root_scope_value)
439436
}
440437
None => None,
441438
};
@@ -468,6 +465,7 @@ pub async fn evaluate_transient_flow(
468465
evaluate_op_scope(
469466
&flow.execution_plan.op_scope,
470467
RefList::Nil.prepend(&root_scope_entry),
468+
None,
471469
)
472470
.await?;
473471
let output_value = assemble_value(

src/execution/indexer.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use sqlx::PgPool;
1010

1111
use super::db_tracking::{self, read_source_tracking_info};
1212
use super::db_tracking_setup;
13-
use super::memoization::MemoizationInfo;
13+
use super::memoization::{EvaluationCache, MemoizationInfo};
1414
use crate::base::schema;
1515
use crate::base::spec::FlowInstanceSpec;
1616
use crate::base::value::{self, FieldValues, KeyValue};
@@ -446,28 +446,34 @@ pub async fn update_source_entry<'a>(
446446
let memoization_info = existing_tracking_info
447447
.map(|info| info.memoization_info.map(|info| info.0))
448448
.flatten();
449-
let evaluate_output =
450-
evaluate_source_entry(plan, source_op_idx, schema, key, memoization_info).await?;
449+
let evaluation_cache = memoization_info
450+
.map(|info| EvaluationCache::from_stored(info.cache))
451+
.unwrap_or_default();
452+
let value_builder =
453+
evaluate_source_entry(plan, source_op_idx, schema, key, Some(&evaluation_cache)).await?;
451454

452455
// Didn't exist and still doesn't exist. No need to apply any changes.
453-
if !already_exists && evaluate_output.is_none() {
456+
if !already_exists && value_builder.is_none() {
454457
return Ok(());
455458
}
456459

457-
let (source_ordinal, precommit_data) = match &evaluate_output {
458-
Some((scope_value, memoization_info)) => {
459-
// TODO: Generate the actual source ordinal.
460+
let memoization_info = MemoizationInfo {
461+
cache: evaluation_cache.into_stored()?,
462+
};
463+
let (source_ordinal, precommit_data) = match &value_builder {
464+
Some(scope_value) => {
460465
(
466+
// TODO: Generate the actual source ordinal.
461467
Some(1),
462468
Some(PrecommitData {
463469
scope_value,
464-
memoization_info,
470+
memoization_info: &memoization_info,
465471
}),
466472
)
467473
}
468474
None => (None, None),
469475
};
470-
if evaluate_output.is_some() {
476+
if value_builder.is_some() {
471477
Some(1)
472478
} else {
473479
None

src/service/flows.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ pub async fn evaluate_data(
144144
.ok_or_else(|| api_error!("field {} does not have a key", query.field))?;
145145
let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?;
146146

147-
let (data_builder, _) =
147+
let data_builder =
148148
evaluator::evaluate_source_entry(&execution_plan, field_idx as u32, &schema, &key, None)
149149
.await?
150150
.ok_or_else(|| {

0 commit comments

Comments
 (0)