Skip to content

Commit 8f10543

Browse files
authored
Change source_op to import_op and add a ImportOpSpec layer. (#234)
1 parent 82007fb commit 8f10543

File tree

10 files changed

+84
-77
lines changed

10 files changed

+84
-77
lines changed

src/base/spec.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,11 @@ pub struct OpSpec {
163163
pub spec: serde_json::Map<String, serde_json::Value>,
164164
}
165165

166+
#[derive(Debug, Clone, Serialize, Deserialize)]
167+
pub struct ImportOpSpec {
168+
pub source: OpSpec,
169+
}
170+
166171
/// Transform data using a given operator.
167172
#[derive(Debug, Clone, Serialize, Deserialize)]
168173
pub struct TransformOpSpec {
@@ -244,7 +249,7 @@ pub struct FlowInstanceSpec {
244249
pub name: String,
245250

246251
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
247-
pub source_ops: Vec<NamedSpec<OpSpec>>,
252+
pub import_ops: Vec<NamedSpec<ImportOpSpec>>,
248253

249254
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
250255
pub reactive_ops: Vec<NamedSpec<ReactiveOpSpec>>,

src/builder/analyzer.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -591,25 +591,25 @@ fn add_collector(
591591
}
592592

593593
impl AnalyzerContext<'_> {
594-
pub(super) fn analyze_source_op(
594+
pub(super) fn analyze_import_op(
595595
&self,
596596
scope: &mut DataScopeBuilder,
597-
source_op: NamedSpec<OpSpec>,
597+
import_op: NamedSpec<ImportOpSpec>,
598598
metadata: Option<&mut FlowSetupMetadata>,
599599
existing_source_states: Option<&Vec<&SourceSetupState>>,
600-
) -> Result<impl Future<Output = Result<AnalyzedSourceOp>> + Send> {
601-
let factory = self.registry.get(&source_op.spec.kind);
600+
) -> Result<impl Future<Output = Result<AnalyzedImportOp>> + Send> {
601+
let factory = self.registry.get(&import_op.spec.source.kind);
602602
let source_factory = match factory {
603603
Some(ExecutorFactory::Source(source_executor)) => source_executor.clone(),
604604
_ => {
605605
return Err(anyhow::anyhow!(
606606
"Source executor not found for kind: {}",
607-
source_op.spec.kind
607+
import_op.spec.source.kind
608608
))
609609
}
610610
};
611611
let (output_type, executor) = source_factory.build(
612-
serde_json::Value::Object(source_op.spec.spec),
612+
serde_json::Value::Object(import_op.spec.source.spec),
613613
self.flow_ctx.clone(),
614614
)?;
615615

@@ -642,7 +642,7 @@ impl AnalyzerContext<'_> {
642642
metadata.last_source_id
643643
};
644644
metadata.sources.insert(
645-
source_op.name.clone(),
645+
import_op.name.clone(),
646646
SourceSetupState {
647647
source_id,
648648
key_schema: key_schema_no_attrs,
@@ -651,13 +651,13 @@ impl AnalyzerContext<'_> {
651651
source_id
652652
});
653653

654-
let op_name = source_op.name.clone();
655-
let output = scope.add_field(source_op.name, &output_type)?;
654+
let op_name = import_op.name.clone();
655+
let output = scope.add_field(import_op.name, &output_type)?;
656656
let result_fut = async move {
657657
trace!("Start building executor for source op `{}`", op_name);
658658
let executor = executor.await?;
659659
trace!("Finished building executor for source op `{}`", op_name);
660-
Ok(AnalyzedSourceOp {
660+
Ok(AnalyzedImportOp {
661661
source_id: source_id.unwrap_or_default(),
662662
executor,
663663
output,
@@ -1100,14 +1100,14 @@ pub fn analyze_flow(
11001100
name: ROOT_SCOPE_NAME,
11011101
data: &mut root_data_scope,
11021102
};
1103-
let source_ops_futs = flow_inst
1104-
.source_ops
1103+
let import_ops_futs = flow_inst
1104+
.import_ops
11051105
.iter()
1106-
.map(|source_op| {
1107-
let existing_source_states = source_states_by_name.get(source_op.name.as_str());
1108-
analyzer_ctx.analyze_source_op(
1106+
.map(|import_op| {
1107+
let existing_source_states = source_states_by_name.get(import_op.name.as_str());
1108+
analyzer_ctx.analyze_import_op(
11091109
root_exec_scope.data,
1110-
source_op.clone(),
1110+
import_op.clone(),
11111111
Some(&mut setup_state.metadata),
11121112
existing_source_states,
11131113
)
@@ -1138,8 +1138,8 @@ pub fn analyze_flow(
11381138
.with(&data_schema)?
11391139
.into_fingerprint();
11401140
let plan_fut = async move {
1141-
let (source_ops, op_scope, export_ops) = try_join3(
1142-
try_join_all(source_ops_futs),
1141+
let (import_ops, op_scope, export_ops) = try_join3(
1142+
try_join_all(import_ops_futs),
11431143
op_scope_fut,
11441144
try_join_all(export_ops_futs),
11451145
)
@@ -1148,7 +1148,7 @@ pub fn analyze_flow(
11481148
Ok(ExecutionPlan {
11491149
tracking_table_setup,
11501150
logic_fingerprint,
1151-
source_ops,
1151+
import_ops,
11521152
op_scope,
11531153
export_ops,
11541154
})

src/builder/flow_builder.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ pub struct FlowBuilder {
329329
direct_input_fields: Vec<FieldSchema>,
330330
direct_output_value: Option<spec::ValueMapping>,
331331

332-
source_ops: Vec<NamedSpec<spec::OpSpec>>,
332+
import_ops: Vec<NamedSpec<spec::ImportOpSpec>>,
333333
export_ops: Vec<NamedSpec<spec::ExportOpSpec>>,
334334

335335
next_generated_op_id: usize,
@@ -365,7 +365,7 @@ impl FlowBuilder {
365365

366366
reactive_ops: vec![],
367367

368-
source_ops: vec![],
368+
import_ops: vec![],
369369
export_ops: vec![],
370370

371371
direct_input_fields: vec![],
@@ -395,11 +395,13 @@ impl FlowBuilder {
395395
));
396396
}
397397
}
398-
let source_op = spec::NamedSpec {
398+
let import_op = spec::NamedSpec {
399399
name,
400-
spec: spec::OpSpec {
401-
kind,
402-
spec: op_spec.into_inner(),
400+
spec: spec::ImportOpSpec {
401+
source: spec::OpSpec {
402+
kind,
403+
spec: op_spec.into_inner(),
404+
},
403405
},
404406
};
405407
let analyzer_ctx = AnalyzerContext {
@@ -409,14 +411,14 @@ impl FlowBuilder {
409411
let mut root_data_scope = self.root_data_scope.lock().unwrap();
410412

411413
let analyzed = analyzer_ctx
412-
.analyze_source_op(&mut root_data_scope, source_op.clone(), None, None)
414+
.analyze_import_op(&mut root_data_scope, import_op.clone(), None, None)
413415
.into_py_result()?;
414416
std::mem::drop(analyzed);
415417

416418
let result =
417419
Self::last_field_to_data_slice(&root_data_scope, self.root_data_scope_ref.clone())
418420
.into_py_result()?;
419-
self.source_ops.push(source_op);
421+
self.import_ops.push(import_op);
420422
Ok(result)
421423
}
422424

@@ -633,7 +635,7 @@ impl FlowBuilder {
633635
pub fn build_flow(&self, py: Python<'_>) -> PyResult<py::Flow> {
634636
let spec = spec::FlowInstanceSpec {
635637
name: self.flow_instance_name.clone(),
636-
source_ops: self.source_ops.clone(),
638+
import_ops: self.import_ops.clone(),
637639
reactive_ops: self.reactive_ops.clone(),
638640
export_ops: self.export_ops.clone(),
639641
};
@@ -705,7 +707,7 @@ impl FlowBuilder {
705707
impl std::fmt::Display for FlowBuilder {
706708
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707709
write!(f, "Flow instance name: {}\n\n", self.flow_instance_name)?;
708-
for op in self.source_ops.iter() {
710+
for op in self.import_ops.iter() {
709711
write!(
710712
f,
711713
"Source op {}\n{}\n",

src/builder/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub struct AnalyzedOpOutput {
5555
pub field_idx: u32,
5656
}
5757

58-
pub struct AnalyzedSourceOp {
58+
pub struct AnalyzedImportOp {
5959
pub name: String,
6060
pub source_id: i32,
6161
pub executor: Box<dyn SourceExecutor>,
@@ -128,7 +128,7 @@ pub struct ExecutionPlan {
128128
pub tracking_table_setup: db_tracking_setup::TrackingTableSetupState,
129129
pub logic_fingerprint: Fingerprint,
130130

131-
pub source_ops: Vec<AnalyzedSourceOp>,
131+
pub import_ops: Vec<AnalyzedImportOp>,
132132
pub op_scope: AnalyzedOpScope,
133133
pub export_ops: Vec<AnalyzedExportOp>,
134134
}

src/execution/dumper.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use yaml_rust2::YamlEmitter;
1414
use super::memoization::EvaluationMemoryOptions;
1515
use super::row_indexer;
1616
use crate::base::{schema, value};
17-
use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan};
17+
use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan};
1818
use crate::ops::interface::SourceExecutorListOptions;
1919
use crate::utils::yaml_ser::YamlSerializer;
2020

@@ -69,7 +69,7 @@ struct Dumper<'a> {
6969
impl<'a> Dumper<'a> {
7070
async fn evaluate_source_entry<'b>(
7171
&'a self,
72-
source_op: &'a AnalyzedSourceOp,
72+
import_op: &'a AnalyzedImportOp,
7373
key: &value::KeyValue,
7474
collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
7575
) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
@@ -78,7 +78,7 @@ impl<'a> Dumper<'a> {
7878
{
7979
let data_builder = row_indexer::evaluate_source_entry_with_memory(
8080
self.plan,
81-
source_op,
81+
import_op,
8282
self.schema,
8383
key,
8484
EvaluationMemoryOptions {
@@ -130,13 +130,13 @@ impl<'a> Dumper<'a> {
130130

131131
async fn evaluate_and_dump_source_entry(
132132
&self,
133-
source_op: &AnalyzedSourceOp,
133+
import_op: &AnalyzedImportOp,
134134
key: value::KeyValue,
135135
file_path: PathBuf,
136136
) -> Result<()> {
137137
let mut collected_values_buffer = Vec::new();
138138
let (exports, error) = match self
139-
.evaluate_source_entry(source_op, &key, &mut collected_values_buffer)
139+
.evaluate_source_entry(import_op, &key, &mut collected_values_buffer)
140140
.await
141141
{
142142
Ok(exports) => (exports, None),
@@ -145,7 +145,7 @@ impl<'a> Dumper<'a> {
145145
let key_value = value::Value::from(key);
146146
let file_data = SourceOutputData {
147147
key: value::TypedValue {
148-
t: &source_op.primary_key_type,
148+
t: &import_op.primary_key_type,
149149
v: &key_value,
150150
},
151151
exports,
@@ -166,10 +166,10 @@ impl<'a> Dumper<'a> {
166166
Ok(())
167167
}
168168

169-
async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> {
169+
async fn evaluate_and_dump_for_source(&self, import_op: &AnalyzedImportOp) -> Result<()> {
170170
let mut keys_by_filename_prefix: IndexMap<String, Vec<value::KeyValue>> = IndexMap::new();
171171

172-
let mut rows_stream = source_op.executor.list(SourceExecutorListOptions {
172+
let mut rows_stream = import_op.executor.list(SourceExecutorListOptions {
173173
include_ordinal: false,
174174
});
175175
while let Some(rows) = rows_stream.next().await {
@@ -181,7 +181,7 @@ impl<'a> Dumper<'a> {
181181
.map(|s| urlencoding::encode(&s).into_owned())
182182
.join(":");
183183
s.truncate(
184-
(0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len()))
184+
(0..(FILENAME_PREFIX_MAX_LENGTH - import_op.name.as_str().len()))
185185
.rev()
186186
.find(|i| s.is_char_boundary(*i))
187187
.unwrap_or(0),
@@ -202,9 +202,9 @@ impl<'a> Dumper<'a> {
202202
Cow::Borrowed("")
203203
};
204204
let file_name =
205-
format!("{}@{}{}.yaml", source_op.name, filename_prefix, extra_id);
205+
format!("{}@{}{}.yaml", import_op.name, filename_prefix, extra_id);
206206
let file_path = output_dir.join(Path::new(&file_name));
207-
self.evaluate_and_dump_source_entry(source_op, key, file_path)
207+
self.evaluate_and_dump_source_entry(import_op, key, file_path)
208208
})
209209
});
210210
try_join_all(evaluate_futs).await?;
@@ -214,9 +214,9 @@ impl<'a> Dumper<'a> {
214214
async fn evaluate_and_dump(&self) -> Result<()> {
215215
try_join_all(
216216
self.plan
217-
.source_ops
217+
.import_ops
218218
.iter()
219-
.map(|source_op| self.evaluate_and_dump_for_source_op(source_op)),
219+
.map(|import_op| self.evaluate_and_dump_for_source(import_op)),
220220
)
221221
.await?;
222222
Ok(())

src/execution/evaluator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ async fn evaluate_op_scope(
440440

441441
pub async fn evaluate_source_entry(
442442
plan: &ExecutionPlan,
443-
source_op: &AnalyzedSourceOp,
443+
import_op: &AnalyzedImportOp,
444444
schema: &schema::DataSchema,
445445
key: &value::KeyValue,
446446
source_value: value::FieldValues,
@@ -455,7 +455,7 @@ pub async fn evaluate_source_entry(
455455
schema: root_schema,
456456
};
457457

458-
let collection_schema = match &root_schema.fields[source_op.output.field_idx as usize]
458+
let collection_schema = match &root_schema.fields[import_op.output.field_idx as usize]
459459
.value_type
460460
.typ
461461
{
@@ -468,7 +468,7 @@ pub async fn evaluate_source_entry(
468468
let scope_value =
469469
ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), collection_schema)?;
470470
root_scope_entry.define_field_w_builder(
471-
&source_op.output,
471+
&import_op.output,
472472
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
473473
);
474474

0 commit comments

Comments
 (0)