Skip to content

Commit 978eb71

Browse files
authored
Batch mutations for same type of export target. (#312)
1 parent eaa9cc3 commit 978eb71

File tree

10 files changed

+377
-208
lines changed

10 files changed

+377
-208
lines changed

src/builder/analyzer.rs

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -816,20 +816,11 @@ impl AnalyzerContext<'_> {
816816
&self,
817817
scope: &mut DataScopeBuilder,
818818
export_op: NamedSpec<ExportOpSpec>,
819+
export_factory: Arc<dyn ExportTargetFactory>,
819820
setup_state: Option<&mut FlowSetupState<DesiredMode>>,
820821
existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>,
821822
) -> Result<impl Future<Output = Result<AnalyzedExportOp>> + Send> {
822823
let export_target = export_op.spec.target;
823-
let export_factory = match self.registry.get(&export_target.kind) {
824-
Some(ExecutorFactory::ExportTarget(export_executor)) => export_executor,
825-
_ => {
826-
return Err(anyhow::anyhow!(
827-
"Export target kind not found: {}",
828-
export_target.kind
829-
))
830-
}
831-
};
832-
833824
let spec = serde_json::Value::Object(export_target.spec.clone());
834825
let (local_collector_ref, collector_schema) =
835826
scope.consume_collector(&export_op.spec.collector_name)?;
@@ -986,8 +977,8 @@ impl AnalyzerContext<'_> {
986977
.unwrap_or(false);
987978
Ok(async move {
988979
trace!("Start building executor for export op `{}`", export_op.name);
989-
let (executor, query_target) = setup_output
990-
.executor
980+
let executors = setup_output
981+
.executors
991982
.await
992983
.with_context(|| format!("Analyzing export op: {}", export_op.name))?;
993984
trace!(
@@ -999,8 +990,8 @@ impl AnalyzerContext<'_> {
999990
name,
1000991
target_id: target_id.unwrap_or_default(),
1001992
input: local_collector_ref,
1002-
executor,
1003-
query_target,
993+
export_context: executors.export_context,
994+
query_target: executors.query_target,
1004995
primary_key_def,
1005996
primary_key_type,
1006997
value_fields: value_fields_idx,
@@ -1127,18 +1118,36 @@ pub fn analyze_flow(
11271118
&flow_inst.reactive_ops,
11281119
RefList::Nil,
11291120
)?;
1130-
let export_ops_futs = flow_inst
1131-
.export_ops
1132-
.iter()
1133-
.map(|export_op| {
1134-
analyzer_ctx.analyze_export_op(
1135-
root_exec_scope.data,
1136-
export_op.clone(),
1137-
Some(&mut setup_state),
1138-
&target_states_by_name_type,
1139-
)
1140-
})
1141-
.collect::<Result<Vec<_>>>()?;
1121+
1122+
let mut target_groups = IndexMap::<String, AnalyzedExportTargetOpGroup>::new();
1123+
let mut export_ops_futs = vec![];
1124+
for (idx, export_op) in flow_inst.export_ops.iter().enumerate() {
1125+
let target_kind = export_op.spec.target.kind.clone();
1126+
let export_factory = match registry.get(&target_kind) {
1127+
Some(ExecutorFactory::ExportTarget(export_executor)) => export_executor,
1128+
_ => {
1129+
return Err(anyhow::anyhow!(
1130+
"Export target kind not found: {}",
1131+
export_op.spec.target.kind
1132+
))
1133+
}
1134+
};
1135+
export_ops_futs.push(analyzer_ctx.analyze_export_op(
1136+
root_exec_scope.data,
1137+
export_op.clone(),
1138+
export_factory.clone(),
1139+
Some(&mut setup_state),
1140+
&target_states_by_name_type,
1141+
)?);
1142+
target_groups
1143+
.entry(target_kind)
1144+
.or_insert_with(|| AnalyzedExportTargetOpGroup {
1145+
target_factory: export_factory.clone(),
1146+
op_idx: vec![],
1147+
})
1148+
.op_idx
1149+
.push(idx);
1150+
}
11421151

11431152
let tracking_table_setup = setup_state.tracking_table.clone();
11441153
let data_schema = root_data_scope.into_data_schema()?;
@@ -1160,6 +1169,7 @@ pub fn analyze_flow(
11601169
import_ops,
11611170
op_scope,
11621171
export_ops,
1172+
export_op_groups: target_groups.into_values().collect(),
11631173
})
11641174
};
11651175

src/builder/plan.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub struct AnalyzedExportOp {
100100
pub name: String,
101101
pub target_id: i32,
102102
pub input: AnalyzedLocalCollectorReference,
103-
pub executor: Arc<dyn ExportTargetExecutor>,
103+
pub export_context: Arc<dyn Any + Send + Sync>,
104104
pub query_target: Option<Arc<dyn QueryTarget>>,
105105
pub primary_key_def: AnalyzedPrimaryKeyDef,
106106
pub primary_key_type: schema::ValueType,
@@ -111,6 +111,11 @@ pub struct AnalyzedExportOp {
111111
pub value_stable: bool,
112112
}
113113

114+
pub struct AnalyzedExportTargetOpGroup {
115+
pub target_factory: Arc<dyn ExportTargetFactory + Send + Sync>,
116+
pub op_idx: Vec<usize>,
117+
}
118+
114119
pub enum AnalyzedReactiveOp {
115120
Transform(AnalyzedTransformOp),
116121
ForEach(AnalyzedForEachOp),
@@ -128,6 +133,7 @@ pub struct ExecutionPlan {
128133
pub import_ops: Vec<AnalyzedImportOp>,
129134
pub op_scope: AnalyzedOpScope,
130135
pub export_ops: Vec<AnalyzedExportOp>,
136+
pub export_op_groups: Vec<AnalyzedExportTargetOpGroup>,
131137
}
132138

133139
pub struct TransientExecutionPlan {

src/execution/row_indexer.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -554,16 +554,26 @@ pub async fn update_source_row(
554554

555555
// Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones.
556556
let mut target_mutations = precommit_output.target_mutations;
557-
let apply_futs = plan.export_ops.iter().filter_map(|export_op| {
558-
target_mutations
559-
.remove(&export_op.target_id)
560-
.and_then(|mutation| {
561-
if !mutation.is_empty() {
562-
Some(export_op.executor.apply_mutation(mutation))
563-
} else {
564-
None
565-
}
557+
let apply_futs = plan.export_op_groups.iter().filter_map(|export_op_group| {
558+
let mutations_w_ctx: Vec<_> = export_op_group
559+
.op_idx
560+
.iter()
561+
.filter_map(|export_op_idx| {
562+
let export_op = &plan.export_ops[*export_op_idx];
563+
target_mutations
564+
.remove(&export_op.target_id)
565+
.filter(|m| !m.is_empty())
566+
.map(|mutation| interface::ExportTargetMutationWithContext {
567+
mutation,
568+
export_context: export_op.export_context.as_ref(),
569+
})
566570
})
571+
.collect();
572+
(!mutations_w_ctx.is_empty()).then(|| {
573+
export_op_group
574+
.target_factory
575+
.apply_mutation(mutations_w_ctx)
576+
})
567577
});
568578

569579
// TODO: Handle errors.

src/ops/factory_bases.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,17 +264,23 @@ impl<T: SimpleFunctionFactoryBase> SimpleFunctionFactory for T {
264264
}
265265
}
266266

267-
pub struct ExportTargetBuildOutput<F: StorageFactoryBase + ?Sized> {
268-
pub executor:
269-
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
267+
pub struct TypedExportTargetExecutors<F: StorageFactoryBase + ?Sized> {
268+
pub export_context: Arc<F::ExportContext>,
269+
pub query_target: Option<Arc<dyn QueryTarget>>,
270+
}
271+
272+
pub struct TypedExportTargetBuildOutput<F: StorageFactoryBase + ?Sized> {
273+
pub executors: BoxFuture<'static, Result<TypedExportTargetExecutors<F>>>,
270274
pub setup_key: F::Key,
271275
pub desired_setup_state: F::SetupState,
272276
}
273277

278+
#[async_trait]
274279
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
275280
type Spec: DeserializeOwned + Send + Sync;
276281
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
277282
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
283+
type ExportContext: Send + Sync + 'static;
278284

279285
fn name(&self) -> &str;
280286

@@ -286,7 +292,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
286292
value_fields_schema: Vec<FieldSchema>,
287293
storage_options: IndexOptions,
288294
context: Arc<FlowInstanceContext>,
289-
) -> Result<ExportTargetBuildOutput<Self>>;
295+
) -> Result<TypedExportTargetBuildOutput<Self>>;
290296

291297
/// Will not be called if it's setup by user.
292298
/// It returns an error if the target only supports setup by user.
@@ -315,8 +321,14 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
315321
ExecutorFactory::ExportTarget(Arc::new(self)),
316322
)
317323
}
324+
325+
async fn apply_mutation(
326+
&self,
327+
mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
328+
) -> Result<()>;
318329
}
319330

331+
#[async_trait]
320332
impl<T: StorageFactoryBase> ExportTargetFactory for T {
321333
fn build(
322334
self: Arc<Self>,
@@ -337,10 +349,17 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
337349
storage_options,
338350
context,
339351
)?;
352+
let executors = async move {
353+
let executors = build_output.executors.await?;
354+
Ok(interface::ExportTargetExecutors {
355+
export_context: executors.export_context,
356+
query_target: executors.query_target,
357+
})
358+
};
340359
Ok(interface::ExportTargetBuildOutput {
341-
executor: build_output.executor,
342360
setup_key: serde_json::to_value(build_output.setup_key)?,
343361
desired_setup_state: serde_json::to_value(build_output.desired_setup_state)?,
362+
executors: executors.boxed(),
344363
})
345364
}
346365

@@ -383,6 +402,25 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
383402
)?;
384403
Ok(result)
385404
}
405+
406+
async fn apply_mutation(
407+
&self,
408+
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
409+
) -> Result<()> {
410+
let mutations = mutations
411+
.into_iter()
412+
.map(|m| {
413+
anyhow::Ok(ExportTargetMutationWithContext {
414+
mutation: m.mutation,
415+
export_context: m
416+
.export_context
417+
.downcast_ref::<T::ExportContext>()
418+
.ok_or_else(|| anyhow!("Unexpected export context type"))?,
419+
})
420+
})
421+
.collect::<Result<_>>()?;
422+
StorageFactoryBase::apply_mutation(self, mutations).await
423+
}
386424
}
387425

388426
fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(

src/ops/interface.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,10 @@ impl ExportTargetMutation {
138138
}
139139
}
140140

141-
#[async_trait]
142-
pub trait ExportTargetExecutor: Send + Sync {
143-
async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()>;
141+
#[derive(Debug)]
142+
pub struct ExportTargetMutationWithContext<'ctx, T: ?Sized + Send + Sync> {
143+
pub mutation: ExportTargetMutation,
144+
pub export_context: &'ctx T,
144145
}
145146

146147
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -156,14 +157,18 @@ pub enum SetupStateCompatibility {
156157
NotCompatible,
157158
}
158159

160+
pub struct ExportTargetExecutors {
161+
pub export_context: Arc<dyn Any + Send + Sync>,
162+
pub query_target: Option<Arc<dyn QueryTarget>>,
163+
}
159164
pub struct ExportTargetBuildOutput {
160-
pub executor:
161-
BoxFuture<'static, Result<(Arc<dyn ExportTargetExecutor>, Option<Arc<dyn QueryTarget>>)>>,
165+
pub executors: BoxFuture<'static, Result<ExportTargetExecutors>>,
162166
pub setup_key: serde_json::Value,
163167
pub desired_setup_state: serde_json::Value,
164168
}
165169

166-
pub trait ExportTargetFactory {
170+
#[async_trait]
171+
pub trait ExportTargetFactory: Send + Sync {
167172
fn build(
168173
self: Arc<Self>,
169174
name: String,
@@ -191,6 +196,11 @@ pub trait ExportTargetFactory {
191196
) -> Result<SetupStateCompatibility>;
192197

193198
fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;
199+
200+
async fn apply_mutation(
201+
&self,
202+
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
203+
) -> Result<()>;
194204
}
195205

196206
#[derive(Clone)]

src/ops/sdk.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub use crate::base::spec::*;
1111
pub use crate::base::value::*;
1212

1313
// Disambiguate the ExportTargetBuildOutput type.
14-
pub use super::factory_bases::ExportTargetBuildOutput;
14+
pub use super::factory_bases::TypedExportTargetBuildOutput;
1515
/// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories.
1616
pub trait TypeCore {
1717
fn into_type(self) -> ValueType;

0 commit comments

Comments
 (0)