Skip to content

Commit 3f726e2

Browse files
authored
Bug fix: for evaluate API, use position in source instead of schema. (#41)
1 parent 7ad6922 commit 3f726e2

File tree

3 files changed

+14
-20
lines changed

3 files changed

+14
-20
lines changed

src/execution/evaluator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ async fn evaluate_op_scope(
412412

413413
pub async fn evaluate_source_entry<'a>(
414414
plan: &ExecutionPlan,
415-
source_op_idx: u32,
415+
source_op_idx: usize,
416416
schema: &schema::DataSchema,
417417
key: &value::KeyValue,
418418
cache: Option<&EvaluationCache>,
@@ -426,7 +426,7 @@ pub async fn evaluate_source_entry<'a>(
426426
schema: &root_schema,
427427
};
428428

429-
let source_op = &plan.source_ops[source_op_idx as usize];
429+
let source_op = &plan.source_ops[source_op_idx];
430430
let collection_schema = match &root_schema.fields[source_op.output.field_idx as usize]
431431
.value_type
432432
.typ

src/execution/indexer.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ async fn commit_source_tracking_info(
415415

416416
pub async fn update_source_entry<'a>(
417417
plan: &ExecutionPlan,
418-
source_op_idx: u32,
418+
source_op_idx: usize,
419419
schema: &schema::DataSchema,
420420
key: &value::KeyValue,
421421
pool: &PgPool,
@@ -528,7 +528,7 @@ pub async fn update_source_entry<'a>(
528528
async fn update_source(
529529
source_name: &str,
530530
plan: &ExecutionPlan,
531-
source_op_idx: u32,
531+
source_op_idx: usize,
532532
schema: &schema::DataSchema,
533533
pool: &PgPool,
534534
) -> Result<SourceUpdateInfo> {
@@ -591,14 +591,7 @@ pub async fn update(
591591
.iter()
592592
.enumerate()
593593
.map(|(source_op_idx, source_op)| async move {
594-
update_source(
595-
source_op.name.as_str(),
596-
plan,
597-
source_op_idx as u32,
598-
schema,
599-
pool,
600-
)
601-
.await
594+
update_source(source_op.name.as_str(), plan, source_op_idx, schema, pool).await
602595
})
603596
.collect::<Vec<_>>(),
604597
)

src/service/flows.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,30 +122,31 @@ pub async fn evaluate_data(
122122
let fl = &lib_context.with_flow_context(&flow_name, |ctx| ctx.flow.clone())?;
123123
let schema = &fl.data_schema;
124124

125-
let field_idx = schema
126-
.fields
125+
let source_op_idx = fl
126+
.flow_instance
127+
.source_ops
127128
.iter()
128-
.position(|f| f.name == query.field)
129+
.position(|source_op| source_op.name == query.field)
129130
.ok_or_else(|| {
130131
ApiError::new(
131-
&format!("field not found: {}", query.field),
132+
&format!("source field not found: {}", query.field),
132133
StatusCode::BAD_REQUEST,
133134
)
134135
})?;
135-
136-
let field_schema = &schema.fields[field_idx];
136+
let execution_plan = fl.get_execution_plan().await?;
137+
let field_schema =
138+
&schema.fields[execution_plan.source_ops[source_op_idx].output.field_idx as usize];
137139
let collection_schema = match &field_schema.value_type.typ {
138140
schema::ValueType::Collection(collection) => collection,
139141
_ => api_bail!("field is not a table: {}", query.field),
140142
};
141-
let execution_plan = fl.get_execution_plan().await?;
142143
let key_field = collection_schema
143144
.key_field()
144145
.ok_or_else(|| api_error!("field {} does not have a key", query.field))?;
145146
let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?;
146147

147148
let data_builder =
148-
evaluator::evaluate_source_entry(&execution_plan, field_idx as u32, &schema, &key, None)
149+
evaluator::evaluate_source_entry(&execution_plan, source_op_idx, &schema, &key, None)
149150
.await?
150151
.ok_or_else(|| {
151152
api_error!("value not found for source at the specified key: {key:?}")

0 commit comments

Comments
 (0)