Skip to content

Commit 1d2dd80

Browse files
authored
feat(target-factory): support declaration in the framework (#367)
1 parent 6bc9ae5 commit 1d2dd80

File tree

5 files changed

+58
-22
lines changed

5 files changed

+58
-22
lines changed

python/cocoindex/flow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,12 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
290290
name, _spec_kind(target_spec), dump_engine_object(target_spec),
291291
dump_engine_object(index_options), self._engine_data_collector, setup_by_user)
292292

293+
def declare(self, spec: op.DeclarationSpec):
294+
"""
295+
Add a declaration to the flow.
296+
"""
297+
self._flow_builder_state.engine_flow_builder.declare(dump_engine_object(spec))
298+
293299

294300
_flow_name_builder = _NameBuilder()
295301

python/cocoindex/op.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class OpCategory(Enum):
1818
FUNCTION = "function"
1919
SOURCE = "source"
2020
STORAGE = "storage"
21-
21+
DECLARATION = "declaration"
2222
@dataclass_transform()
2323
class SpecMeta(type):
2424
"""Meta class for spec classes."""
@@ -41,6 +41,10 @@ class FunctionSpec(metaclass=SpecMeta, category=OpCategory.FUNCTION): # pylint:
4141
class StorageSpec(metaclass=SpecMeta, category=OpCategory.STORAGE): # pylint: disable=too-few-public-methods
4242
"""A storage spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
4343

44+
class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods
45+
"""A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
46+
kind: str
47+
4448
class Executor(Protocol):
4549
"""An executor for an operation."""
4650
op_category: OpCategory

src/base/spec.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,9 @@ pub struct FlowInstanceSpec {
274274

275275
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
276276
pub export_ops: Vec<NamedSpec<ExportOpSpec>>,
277+
278+
#[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
279+
pub declarations: Vec<OpSpec>,
277280
}
278281

279282
#[derive(Debug, Clone, Serialize, Deserialize)]

src/builder/analyzer.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ impl AnalyzerContext<'_> {
930930
scope: &mut DataScopeBuilder,
931931
flow_inst: &FlowInstanceSpec,
932932
export_op_group: &AnalyzedExportTargetOpGroup,
933+
declarations: Vec<serde_json::Value>,
933934
flow_setup_state: &mut FlowSetupState<DesiredMode>,
934935
existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>,
935936
) -> Result<Vec<impl Future<Output = Result<AnalyzedExportOp>> + Send>> {
@@ -1007,7 +1008,7 @@ impl AnalyzerContext<'_> {
10071008
}
10081009
let (data_collections_output, _) = export_op_group.target_factory.clone().build(
10091010
collection_specs,
1010-
vec![],
1011+
declarations,
10111012
self.flow_ctx.clone(),
10121013
)?;
10131014
if data_collections_output.len() != data_fields_infos.len() {
@@ -1162,37 +1163,49 @@ pub fn analyze_flow(
11621163
RefList::Nil,
11631164
)?;
11641165

1165-
let mut target_groups = IndexMap::<String, AnalyzedExportTargetOpGroup>::new();
1166+
#[derive(Default)]
1167+
struct TargetOpGroup {
1168+
export_op_ids: Vec<usize>,
1169+
declarations: Vec<serde_json::Value>,
1170+
}
1171+
let mut target_op_group = IndexMap::<String, TargetOpGroup>::new();
11661172
for (idx, export_op) in flow_inst.export_ops.iter().enumerate() {
1167-
let target_kind = export_op.spec.target.kind.clone();
1173+
target_op_group
1174+
.entry(export_op.spec.target.kind.clone())
1175+
.or_default()
1176+
.export_op_ids
1177+
.push(idx);
1178+
}
1179+
for declaration in flow_inst.declarations.iter() {
1180+
target_op_group
1181+
.entry(declaration.kind.clone())
1182+
.or_default()
1183+
.declarations
1184+
.push(serde_json::Value::Object(declaration.spec.clone()));
1185+
}
1186+
1187+
let mut export_ops_futs = vec![];
1188+
let mut analyzed_target_op_groups = vec![];
1189+
for (target_kind, op_ids) in target_op_group.into_iter() {
11681190
let export_factory = match registry.get(&target_kind) {
11691191
Some(ExecutorFactory::ExportTarget(export_executor)) => export_executor,
11701192
_ => {
1171-
return Err(anyhow::anyhow!(
1172-
"Export target kind not found: {}",
1173-
export_op.spec.target.kind
1174-
))
1193+
bail!("Export target kind not found: {target_kind}");
11751194
}
11761195
};
1177-
target_groups
1178-
.entry(target_kind)
1179-
.or_insert_with(|| AnalyzedExportTargetOpGroup {
1180-
target_factory: export_factory.clone(),
1181-
op_idx: vec![],
1182-
})
1183-
.op_idx
1184-
.push(idx);
1185-
}
1186-
1187-
let mut export_ops_futs = vec![];
1188-
for group in target_groups.values() {
1196+
let analyzed_target_op_group = AnalyzedExportTargetOpGroup {
1197+
target_factory: export_factory.clone(),
1198+
op_idx: op_ids.export_op_ids,
1199+
};
11891200
export_ops_futs.extend(analyzer_ctx.analyze_export_op_group(
11901201
root_exec_scope.data,
11911202
flow_inst,
1192-
group,
1203+
&analyzed_target_op_group,
1204+
op_ids.declarations,
11931205
&mut setup_state,
11941206
&target_states_by_name_type,
11951207
)?);
1208+
analyzed_target_op_groups.push(analyzed_target_op_group);
11961209
}
11971210

11981211
let tracking_table_setup = setup_state.tracking_table.clone();
@@ -1215,7 +1228,7 @@ pub fn analyze_flow(
12151228
import_ops,
12161229
op_scope,
12171230
export_ops,
1218-
export_op_groups: target_groups.into_values().collect(),
1231+
export_op_groups: analyzed_target_op_groups,
12191232
})
12201233
};
12211234

src/builder/flow_builder.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ pub struct FlowBuilder {
331331
import_ops: Vec<NamedSpec<spec::ImportOpSpec>>,
332332
export_ops: Vec<NamedSpec<spec::ExportOpSpec>>,
333333

334+
declarations: Vec<spec::OpSpec>,
335+
334336
next_generated_op_id: usize,
335337
}
336338

@@ -370,6 +372,8 @@ impl FlowBuilder {
370372
direct_input_fields: vec![],
371373
direct_output_value: None,
372374

375+
declarations: vec![],
376+
373377
next_generated_op_id: 0,
374378
};
375379
Ok(result)
@@ -612,6 +616,11 @@ impl FlowBuilder {
612616
Ok(())
613617
}
614618

619+
pub fn declare(&mut self, op_spec: py::Pythonized<spec::OpSpec>) -> PyResult<()> {
620+
self.declarations.push(op_spec.into_inner());
621+
Ok(())
622+
}
623+
615624
pub fn scope_field(
616625
&self,
617626
scope: DataScopeRef,
@@ -642,6 +651,7 @@ impl FlowBuilder {
642651
import_ops: self.import_ops.clone(),
643652
reactive_ops: self.reactive_ops.clone(),
644653
export_ops: self.export_ops.clone(),
654+
declarations: self.declarations.clone(),
645655
};
646656
let flow_instance_ctx = build_flow_instance_context(
647657
&self.flow_instance_name,

0 commit comments

Comments
 (0)