Skip to content

Commit abfdd54

Browse files
authored
feat(row-status): expose an API to get row indexing status (#472)
* refactor: pack `SourceEvalContext` for common args for processing/eval * feat(row-status): expose an API to get row indexing status
1 parent b0909af commit abfdd54

12 files changed

+263
-132
lines changed

src/execution/db_tracking.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,28 @@ impl ListTrackedSourceKeyMetadataState {
211211
sqlx::query_as(&self.query_str).bind(source_id).fetch(pool)
212212
}
213213
}
214+
215+
#[derive(sqlx::FromRow, Debug)]
216+
pub struct SourceLastProcessedInfo {
217+
pub processed_source_ordinal: Option<i64>,
218+
pub process_logic_fingerprint: Option<Vec<u8>>,
219+
pub process_time_micros: Option<i64>,
220+
}
221+
222+
pub async fn read_source_last_processed_info(
223+
source_id: i32,
224+
source_key_json: &serde_json::Value,
225+
db_setup: &TrackingTableSetupState,
226+
pool: &PgPool,
227+
) -> Result<Option<SourceLastProcessedInfo>> {
228+
let query_str = format!(
229+
"SELECT processed_source_ordinal, process_logic_fingerprint, process_time_micros FROM {} WHERE source_id = $1 AND source_key = $2",
230+
db_setup.table_name
231+
);
232+
let last_processed_info = sqlx::query_as(&query_str)
233+
.bind(source_id)
234+
.bind(source_key_json)
235+
.fetch_optional(pool)
236+
.await?;
237+
Ok(last_processed_info)
238+
}

src/execution/dumper.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::collections::BTreeMap;
1111
use std::path::{Path, PathBuf};
1212
use yaml_rust2::YamlEmitter;
1313

14+
use super::evaluator::SourceRowEvaluationContext;
1415
use super::memoization::EvaluationMemoryOptions;
1516
use super::row_indexer;
1617
use crate::base::{schema, value};
@@ -77,10 +78,12 @@ impl<'a> Dumper<'a> {
7778
'a: 'b,
7879
{
7980
let data_builder = row_indexer::evaluate_source_entry_with_memory(
80-
self.plan,
81-
import_op,
82-
self.schema,
83-
key,
81+
&SourceRowEvaluationContext {
82+
plan: self.plan,
83+
import_op,
84+
schema: self.schema,
85+
key,
86+
},
8487
EvaluationMemoryOptions {
8588
enable_cache: self.options.use_cache,
8689
evaluation_only: true,

src/execution/evaluator.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -446,30 +446,34 @@ async fn evaluate_op_scope(
446446
Ok(())
447447
}
448448

449+
pub struct SourceRowEvaluationContext<'a> {
450+
pub plan: &'a ExecutionPlan,
451+
pub import_op: &'a AnalyzedImportOp,
452+
pub schema: &'a schema::FlowSchema,
453+
pub key: &'a value::KeyValue,
454+
}
455+
449456
#[derive(Debug)]
450457
pub struct EvaluateSourceEntryOutput {
451458
pub data_scope: ScopeValueBuilder,
452459
pub collected_values: Vec<Vec<value::FieldValues>>,
453460
}
454461

455462
pub async fn evaluate_source_entry(
456-
plan: &ExecutionPlan,
457-
import_op: &AnalyzedImportOp,
458-
schema: &schema::FlowSchema,
459-
key: &value::KeyValue,
463+
src_eval_ctx: &SourceRowEvaluationContext<'_>,
460464
source_value: value::FieldValues,
461465
memory: &EvaluationMemory,
462466
) -> Result<EvaluateSourceEntryOutput> {
463-
let root_schema = &schema.schema;
467+
let root_schema = &src_eval_ctx.schema.schema;
464468
let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len());
465469
let root_scope_entry = ScopeEntry::new(
466470
ScopeKey::None,
467471
&root_scope_value,
468472
root_schema,
469-
&plan.op_scope,
473+
&src_eval_ctx.plan.op_scope,
470474
);
471475

472-
let table_schema = match &root_schema.fields[import_op.output.field_idx as usize]
476+
let table_schema = match &root_schema.fields[src_eval_ctx.import_op.output.field_idx as usize]
473477
.value_type
474478
.typ
475479
{
@@ -482,12 +486,12 @@ pub async fn evaluate_source_entry(
482486
let scope_value =
483487
ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), table_schema)?;
484488
root_scope_entry.define_field_w_builder(
485-
&import_op.output,
486-
value::Value::KTable(BTreeMap::from([(key.clone(), scope_value)])),
489+
&src_eval_ctx.import_op.output,
490+
value::Value::KTable(BTreeMap::from([(src_eval_ctx.key.clone(), scope_value)])),
487491
);
488492

489493
evaluate_op_scope(
490-
&plan.op_scope,
494+
&src_eval_ctx.plan.op_scope,
491495
RefList::Nil.prepend(&root_scope_entry),
492496
memory,
493497
)

src/execution/indexing_status.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use crate::prelude::*;
2+
3+
use super::db_tracking;
4+
use super::evaluator;
5+
use futures::try_join;
6+
7+
#[derive(Debug, Serialize)]
8+
pub struct SourceRowLastProcessedInfo {
9+
pub source_ordinal: Option<interface::Ordinal>,
10+
pub processing_time: Option<chrono::DateTime<chrono::Utc>>,
11+
pub is_logic_current: bool,
12+
}
13+
14+
#[derive(Debug, Serialize)]
15+
pub struct SourceRowInfo {
16+
pub ordinal: Option<interface::Ordinal>,
17+
}
18+
19+
#[derive(Debug, Serialize)]
20+
pub struct SourceRowIndexingStatus {
21+
pub last_processed: Option<SourceRowLastProcessedInfo>,
22+
pub current: Option<SourceRowInfo>,
23+
}
24+
25+
pub async fn get_source_row_indexing_status(
26+
src_eval_ctx: &evaluator::SourceRowEvaluationContext<'_>,
27+
pool: &sqlx::PgPool,
28+
) -> Result<SourceRowIndexingStatus> {
29+
let source_key_json = serde_json::to_value(src_eval_ctx.key)?;
30+
let last_processed_fut = db_tracking::read_source_last_processed_info(
31+
src_eval_ctx.import_op.source_id,
32+
&source_key_json,
33+
&src_eval_ctx.plan.tracking_table_setup,
34+
pool,
35+
);
36+
let current_fut = src_eval_ctx.import_op.executor.get_value(
37+
&src_eval_ctx.key,
38+
&interface::SourceExecutorGetOptions {
39+
include_value: false,
40+
include_ordinal: true,
41+
},
42+
);
43+
let (last_processed, current) = try_join!(last_processed_fut, current_fut)?;
44+
45+
let last_processed = last_processed.map(|l| SourceRowLastProcessedInfo {
46+
source_ordinal: l.processed_source_ordinal.map(interface::Ordinal),
47+
processing_time: l
48+
.process_time_micros
49+
.map(chrono::DateTime::<chrono::Utc>::from_timestamp_micros)
50+
.flatten(),
51+
is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice())
52+
== l.process_logic_fingerprint.as_ref().map(|b| b.as_slice()),
53+
});
54+
let current = current.map(|c| SourceRowInfo { ordinal: c.ordinal });
55+
Ok(SourceRowIndexingStatus {
56+
last_processed,
57+
current,
58+
})
59+
}

src/execution/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod db_tracking_setup;
22
pub(crate) mod dumper;
33
pub(crate) mod evaluator;
4+
pub(crate) mod indexing_status;
45
pub(crate) mod memoization;
56
pub(crate) mod query;
67
pub(crate) mod row_indexer;

0 commit comments

Comments
 (0)