Skip to content

Commit dab8a6f

Browse files
authored
Enable evaluation cache in the readonly evaluation API. (#42)
* Cleanup unnecessary casts. * Enable evaluation cache in the readonly evaluation API.
1 parent 3f726e2 commit dab8a6f

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

src/execution/indexer.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,14 +413,41 @@ async fn commit_source_tracking_info(
413413
Ok(WithApplyStatus::Normal(()))
414414
}
415415

416+
/// Built an evaluation cache on the existing data.
417+
pub async fn evaluation_cache_on_existing_data(
418+
plan: &ExecutionPlan,
419+
source_op_idx: usize,
420+
key: &value::KeyValue,
421+
pool: &PgPool,
422+
) -> Result<EvaluationCache> {
423+
let source_id = plan.source_ops[source_op_idx].source_id;
424+
let source_key_json = serde_json::to_value(key)?;
425+
let existing_tracking_info = read_source_tracking_info(
426+
source_id,
427+
&source_key_json,
428+
&plan.tracking_table_setup,
429+
pool,
430+
)
431+
.await?;
432+
let process_timestamp = chrono::Utc::now();
433+
let memoization_info = existing_tracking_info
434+
.map(|info| info.memoization_info.map(|info| info.0))
435+
.flatten()
436+
.flatten();
437+
Ok(EvaluationCache::new(
438+
process_timestamp,
439+
memoization_info.map(|info| info.cache),
440+
))
441+
}
442+
416443
pub async fn update_source_entry<'a>(
417444
plan: &ExecutionPlan,
418445
source_op_idx: usize,
419446
schema: &schema::DataSchema,
420447
key: &value::KeyValue,
421448
pool: &PgPool,
422449
) -> Result<()> {
423-
let source_id = plan.source_ops[source_op_idx as usize].source_id;
450+
let source_id = plan.source_ops[source_op_idx].source_id;
424451
let source_key_json = serde_json::to_value(key)?;
425452
let process_timestamp = chrono::Utc::now();
426453

@@ -532,7 +559,7 @@ async fn update_source(
532559
schema: &schema::DataSchema,
533560
pool: &PgPool,
534561
) -> Result<SourceUpdateInfo> {
535-
let source_op = &plan.source_ops[source_op_idx as usize];
562+
let source_op = &plan.source_ops[source_op_idx];
536563
let (keys, existing_keys_json) = try_join(
537564
source_op.executor.list_keys(),
538565
db_tracking::list_source_tracking_keys(

src/service/flows.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,22 @@ pub async fn evaluate_data(
145145
.ok_or_else(|| api_error!("field {} does not have a key", query.field))?;
146146
let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?;
147147

148-
let data_builder =
149-
evaluator::evaluate_source_entry(&execution_plan, source_op_idx, &schema, &key, None)
150-
.await?
151-
.ok_or_else(|| {
152-
api_error!("value not found for source at the specified key: {key:?}")
153-
})?;
148+
let evaluation_cache = indexer::evaluation_cache_on_existing_data(
149+
&execution_plan,
150+
source_op_idx,
151+
&key,
152+
&lib_context.pool,
153+
)
154+
.await?;
155+
let data_builder = evaluator::evaluate_source_entry(
156+
&execution_plan,
157+
source_op_idx,
158+
&schema,
159+
&key,
160+
Some(&evaluation_cache),
161+
)
162+
.await?
163+
.ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?;
154164

155165
Ok(Json(EvaluateDataResponse {
156166
schema: schema.clone(),

0 commit comments

Comments
 (0)