diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index 53fc054f..25738c52 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -102,7 +102,6 @@ impl TrackingTableSetupStatus { } } -#[async_trait] impl ResourceSetupStatus for TrackingTableSetupStatus { fn describe_changes(&self) -> Vec { let mut changes: Vec = vec![]; @@ -157,7 +156,13 @@ impl ResourceSetupStatus for TrackingTableSetupStatus { } } - async fn apply_change(&self) -> Result<()> { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +impl TrackingTableSetupStatus { + pub async fn apply_change(&self) -> Result<()> { let pool = &get_lib_context()?.builtin_db_pool; if let Some(desired) = &self.desired_state { for lagacy_name in self.legacy_table_names.iter() { diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 36aba809..6a9aaf79 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -1,4 +1,5 @@ use crate::prelude::*; +use crate::setup::ResourceSetupStatus; use std::fmt::Debug; use std::hash::Hash; @@ -288,6 +289,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { type DeclarationSpec: DeserializeOwned + Send + Sync; type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync; type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync; + type SetupStatus: ResourceSetupStatus; type ExportContext: Send + Sync + 'static; fn name(&self) -> &str; @@ -310,7 +312,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { desired_state: Option, existing_states: setup::CombinedState, auth_registry: &Arc, - ) -> Result; + ) -> Result; fn check_state_compatibility( &self, @@ -334,6 +336,11 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { &self, mutations: Vec>, ) -> Result<()>; + + async fn apply_setup_changes( + &self, + setup_status: Vec<&'async_trait Self::SetupStatus>, + ) -> Result<()>; } #[async_trait] @@ -456,8 +463,25 @@ impl ExportTargetFactory for T { .collect::>()?; StorageFactoryBase::apply_mutation(self, mutations).await } -} + async fn apply_setup_changes( + &self, + setup_status: Vec<&'async_trait dyn ResourceSetupStatus>, + ) -> Result<()> { + StorageFactoryBase::apply_setup_changes( + self, + setup_status + .into_iter() + .map(|s| -> anyhow::Result<_> { + Ok(s.as_any() + .downcast_ref::() + .ok_or_else(|| anyhow!("Unexpected setup status type"))?) + }) + .collect::>>()?, + ) + .await + } +} fn from_json_combined_state( existing_states: setup::CombinedState, ) -> Result> { diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 9cec7d7e..6da61adf 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -214,6 +214,11 @@ pub trait ExportTargetFactory: Send + Sync { &self, mutations: Vec>, ) -> Result<()>; + + async fn apply_setup_changes( + &self, + setup_status: Vec<&'async_trait dyn setup::ResourceSetupStatus>, + ) -> Result<()>; } #[derive(Clone)] diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index 98bd5585..757de3f3 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -1,7 +1,7 @@ use crate::prelude::*; use super::spec::{GraphDeclaration, GraphElementMapping, NodeFromFieldsSpec, TargetFieldMapping}; -use crate::setup::components::{self, State}; +use crate::setup::components::{self, apply_component_changes, State}; use crate::setup::{ResourceSetupStatus, SetupChangeType}; use crate::{ops::sdk::*, setup::CombinedState}; @@ -688,7 +688,7 @@ impl ComponentKind { } } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct ComponentKey { +pub struct ComponentKey { kind: ComponentKind, name: String, } @@ -754,13 +754,13 @@ impl components::State for ComponentState { } } -struct SetupComponentOperator { +pub struct SetupComponentOperator { graph_pool: Arc, conn_spec: ConnectionSpec, } #[async_trait] -impl components::Operator for SetupComponentOperator { +impl components::SetupOperator for SetupComponentOperator { type Key = ComponentKey; type State = ComponentState; type SetupState = SetupState; @@ -855,7 +855,7 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin } #[derive(Derivative)] #[derivative(Debug)] -struct SetupStatus { +pub struct GraphElementDataSetupStatus { key: GraphElement, #[derivative(Debug = "ignore")] graph_pool: Arc, @@ -864,7 +864,7 @@ struct SetupStatus { change_type: SetupChangeType, } -impl SetupStatus { +impl GraphElementDataSetupStatus { fn new( key: GraphElement, graph_pool: Arc, @@ -907,8 +907,7 @@ impl SetupStatus { } } -#[async_trait] -impl ResourceSetupStatus for SetupStatus { +impl ResourceSetupStatus for GraphElementDataSetupStatus { fn describe_changes(&self) -> Vec { let mut result = vec![]; if let Some(data_clear) = &self.data_clear { @@ -934,6 +933,12 @@ impl ResourceSetupStatus for SetupStatus { self.change_type } + fn as_any(&self) -> &dyn Any { + self + } +} + +impl GraphElementDataSetupStatus { async fn apply_change(&self) -> Result<()> { let graph = self.graph_pool.get_graph(&self.conn_spec).await?; if let Some(data_clear) = &self.data_clear { @@ -1077,6 +1082,10 @@ impl StorageFactoryBase for Factory { type Spec = Spec; type DeclarationSpec = Declaration; type SetupState = SetupState; + type SetupStatus = ( + GraphElementDataSetupStatus, + components::SetupStatus, + ); type Key = GraphElement; type ExportContext = ExportContext; @@ -1255,16 +1264,16 @@ impl StorageFactoryBase for Factory { desired: Option, existing: CombinedState, auth_registry: &Arc, - ) -> Result { + ) -> Result { let conn_spec = auth_registry.get::(&key.connection)?; - let base = SetupStatus::new( + let data_status = GraphElementDataSetupStatus::new( key, self.graph_pool.clone(), conn_spec.clone(), desired.as_ref(), &existing, ); - let comp = components::Status::create( + let components = components::SetupStatus::create( SetupComponentOperator { graph_pool: self.graph_pool.clone(), conn_spec: conn_spec.clone(), @@ -1272,7 +1281,7 @@ impl StorageFactoryBase for Factory { desired, existing, )?; - Ok(components::combine_setup_statuss(base, comp)) + Ok((data_status, components)) } fn check_state_compatibility( @@ -1328,4 +1337,15 @@ impl StorageFactoryBase for Factory { } Ok(()) } + + async fn apply_setup_changes( + &self, + changes: Vec<&'async_trait Self::SetupStatus>, + ) -> Result<()> { + for change in changes.iter() { + change.0.apply_change().await?; + } + apply_component_changes(changes.iter().map(|c| &c.1).collect()).await?; + Ok(()) + } } diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 10b2af69..6e630a20 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -738,7 +738,6 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String format!("{} {}", index_name, to_index_spec_sql(index_spec)) } -#[async_trait] impl setup::ResourceSetupStatus for SetupStatus { fn describe_changes(&self) -> Vec { let mut descriptions = vec![]; @@ -821,6 +820,12 @@ impl setup::ResourceSetupStatus for SetupStatus { } } + fn as_any(&self) -> &dyn Any { + self + } +} + +impl SetupStatus { async fn apply_change(&self) -> Result<()> { let table_name = &self.table_name; if self.drop_existing { @@ -907,6 +912,7 @@ impl StorageFactoryBase for Factory { type Spec = Spec; type DeclarationSpec = (); type SetupState = SetupState; + type SetupStatus = SetupStatus; type Key = TableId; type ExportContext = ExportContext; @@ -976,7 +982,7 @@ impl StorageFactoryBase for Factory { desired: Option, existing: setup::CombinedState, auth_registry: &Arc, - ) -> Result { + ) -> Result { Ok(SetupStatus::new( get_db_pool(key.database.as_ref(), auth_registry).await?, key.table_name, @@ -1049,4 +1055,14 @@ impl StorageFactoryBase for Factory { } Ok(()) } + + async fn apply_setup_changes( + &self, + setup_status: Vec<&'async_trait Self::SetupStatus>, + ) -> Result<()> { + for setup_status in setup_status.iter() { + setup_status.apply_change().await?; + } + Ok(()) + } } diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index cac6b8eb..3b1e7894 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -332,6 +332,7 @@ impl StorageFactoryBase for Arc { type Spec = Spec; type DeclarationSpec = (); type SetupState = (); + type SetupStatus = Infallible; type Key = String; type ExportContext = ExportContext; @@ -390,7 +391,7 @@ impl StorageFactoryBase for Arc { _desired: Option<()>, _existing: setup::CombinedState<()>, _auth_registry: &Arc, - ) -> Result { + ) -> Result { Err(anyhow!("Set `setup_by_user` to `true` to export to Qdrant")) as Result } @@ -418,4 +419,11 @@ impl StorageFactoryBase for Arc { } Ok(()) } + + async fn apply_setup_changes( + &self, + _setup_status: Vec<&'async_trait Self::SetupStatus>, + ) -> Result<()> { + Err(anyhow!("Qdrant does not support setup changes")) + } } diff --git a/src/setup/components.rs b/src/setup/components.rs index 7036ce30..415f616e 100644 --- a/src/setup/components.rs +++ b/src/setup/components.rs @@ -7,7 +7,7 @@ pub trait State: Debug + Send + Sync { } #[async_trait] -pub trait Operator { +pub trait SetupOperator: 'static + Send + Sync { type Key: Debug + Hash + Eq + Clone + Send + Sync; type State: State; type SetupState: Send + Sync + IntoIterator; @@ -36,14 +36,14 @@ struct CompositeStateUpsert { #[derive(Derivative)] #[derivative(Debug)] -pub struct Status { +pub struct SetupStatus { #[derivative(Debug = "ignore")] desc: D, keys_to_delete: IndexSet, states_to_upsert: Vec>, } -impl Status { +impl SetupStatus { pub fn create( desc: D, desired: Option, @@ -108,8 +108,7 @@ impl Status { } } -#[async_trait] -impl ResourceSetupStatus for Status { +impl ResourceSetupStatus for SetupStatus { fn describe_changes(&self) -> Vec { let mut result = vec![]; @@ -144,42 +143,45 @@ impl ResourceSetupStatus for Status { } } - async fn apply_change(&self) -> Result<()> { - // First delete components that need to be removed - for key in &self.keys_to_delete { - self.desc.delete(key).await?; + fn as_any(&self) -> &dyn Any { + self + } +} + +pub async fn apply_component_changes( + changes: Vec<&SetupStatus>, +) -> Result<()> { + // First delete components that need to be removed + for change in changes.iter() { + for key in &change.keys_to_delete { + change.desc.delete(key).await?; } + } - // Then upsert components that need to be updated - for state in &self.states_to_upsert { + // Then upsert components that need to be updated + for change in changes.iter() { + for state in &change.states_to_upsert { if state.already_exists { - self.desc.update(&state.state).await?; + change.desc.update(&state.state).await?; } else { - self.desc.create(&state.state).await?; + change.desc.create(&state.state).await?; } } - - Ok(()) } -} -#[derive(Debug)] -struct CombinedStatus { - a: A, - b: B, + Ok(()) } -#[async_trait] -impl ResourceSetupStatus for CombinedStatus { +impl ResourceSetupStatus for (A, B) { fn describe_changes(&self) -> Vec { let mut result = vec![]; - result.extend(self.a.describe_changes()); - result.extend(self.b.describe_changes()); + result.extend(self.0.describe_changes()); + result.extend(self.1.describe_changes()); result } fn change_type(&self) -> SetupChangeType { - match (self.a.change_type(), self.b.change_type()) { + match (self.0.change_type(), self.1.change_type()) { (SetupChangeType::Invalid, _) | (_, SetupChangeType::Invalid) => { SetupChangeType::Invalid } @@ -188,15 +190,7 @@ impl ResourceSetupStatus for Com } } - async fn apply_change(&self) -> Result<()> { - self.a.apply_change().await?; - self.b.apply_change().await + fn as_any(&self) -> &dyn Any { + self } } - -pub fn combine_setup_statuss( - a: A, - b: B, -) -> impl ResourceSetupStatus { - CombinedStatus { a, b } -} diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index 39e4976f..64f67c8d 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -330,7 +330,6 @@ impl MetadataTableSetup { } } -#[async_trait] impl ResourceSetupStatus for MetadataTableSetup { fn describe_changes(&self) -> Vec { if self.metadata_table_missing { @@ -350,7 +349,13 @@ impl ResourceSetupStatus for MetadataTableSetup { } } - async fn apply_change(&self) -> Result<()> { + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +impl MetadataTableSetup { + pub async fn apply_change(&self) -> Result<()> { if !self.metadata_table_missing { return Ok(()); } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index b26cbf2f..2fd50529 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -1,4 +1,4 @@ -use crate::{lib_context::get_auth_registry, prelude::*}; +use crate::{lib_context::get_auth_registry, ops::interface::ExportTargetFactory, prelude::*}; use sqlx::PgPool; use std::{ @@ -8,8 +8,8 @@ use std::{ use super::{ db_metadata, CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatus, - ObjectSetupStatus, ObjectStatus, ResourceIdentifier, ResourceSetupInfo, - ResourceSetupStatus, SetupChangeType, StateChange, TargetSetupState, + ObjectSetupStatus, ObjectStatus, ResourceIdentifier, ResourceSetupInfo, ResourceSetupStatus, + SetupChangeType, StateChange, TargetSetupState, }; use super::{AllSetupState, AllSetupStatus}; use crate::execution::db_tracking_setup; @@ -75,6 +75,16 @@ fn from_metadata_record( }) } +fn get_export_target_factory( + target_type: &str, +) -> Option> { + let registry = executor_factory_registry(); + match registry.get(&target_type) { + Some(ExecutorFactory::ExportTarget(factory)) => Some(factory.clone()), + _ => None, + } +} + pub async fn get_existing_setup_state(pool: &PgPool) -> Result> { let setup_metadata_records = db_metadata::read_setup_metadata(pool).await?; @@ -116,12 +126,10 @@ pub async fn get_existing_setup_state(pool: &PgPool) -> Result { let normalized_key = { - let registry = executor_factory_registry(); - match registry.get(&target_type) { - Some(ExecutorFactory::ExportTarget(factory)) => { - factory.normalize_setup_key(&metadata_record.key)? - } - _ => metadata_record.key.clone(), + if let Some(factory) = get_export_target_factory(&target_type) { + factory.normalize_setup_key(&metadata_record.key)? + } else { + metadata_record.key.clone() } }; let combined_state = from_metadata_record( @@ -280,19 +288,14 @@ pub async fn check_flow_setup_status( desired_state.iter().flat_map(|d| d.targets.iter()), existing_state.iter().flat_map(|e| e.targets.iter()), )?; - let registry = executor_factory_registry(); for (resource_id, v) in grouped_target_resources.into_iter() { - let factory = match registry.get(&resource_id.target_kind) { + let factory = match get_export_target_factory(&resource_id.target_kind) { Some(factory) => factory, None => { unknown_resources.push(resource_id.clone()); continue; } }; - let factory = match factory { - ExecutorFactory::ExportTarget(factory) => factory, - _ => bail!("Unexpected factory type for {}", resource_id.target_kind), - }; let state = v.desired.clone(); let target_state = v .desired @@ -402,21 +405,35 @@ pub async fn drop_setup( }) } -async fn maybe_update_resource_setup( +async fn maybe_update_resource_setup< + 'a, + K: 'a, + S: 'a, + C: ResourceSetupStatus, + ChangeApplierResultFut: Future>, +>( + resource_kind: &str, write: &mut impl std::io::Write, - resource: &ResourceSetupInfo, + resources: impl Iterator>, + apply_change: impl FnOnce(Vec<&'a C>) -> ChangeApplierResultFut, ) -> Result<()> { - if let Some(setup_status) = &resource.setup_status { - if setup_status.change_type() != SetupChangeType::NoChange { - writeln!(write, "{}:", resource.description)?; - for change in setup_status.describe_changes() { - writeln!(write, " - {}", change)?; + let mut changes = Vec::new(); + for resource in resources { + if let Some(setup_status) = &resource.setup_status { + if setup_status.change_type() != SetupChangeType::NoChange { + changes.push(setup_status); + writeln!(write, "{}:", resource.description)?; + for change in setup_status.describe_changes() { + writeln!(write, " - {}", change)?; + } } - write!(write, "Pushing...")?; - setup_status.apply_change().await?; - writeln!(write, "DONE")?; } } + if !changes.is_empty() { + write!(write, "Pushing change for {resource_kind}...")?; + apply_change(changes).await?; + writeln!(write, "DONE")?; + } Ok(()) } @@ -425,7 +442,13 @@ pub async fn apply_changes( setup_status: &AllSetupStatus, pool: &PgPool, ) -> Result<()> { - maybe_update_resource_setup(write, &setup_status.metadata_table).await?; + maybe_update_resource_setup( + "metadata table", + write, + std::iter::once(&setup_status.metadata_table), + |setup_status| setup_status[0].apply_change(), + ) + .await?; for (flow_name, flow_status) in &setup_status.flows { if flow_status.is_up_to_date() { @@ -494,10 +517,38 @@ pub async fn apply_changes( .await?; if let Some(tracking_table) = &flow_status.tracking_table { - maybe_update_resource_setup(write, tracking_table).await?; + maybe_update_resource_setup( + "tracking table", + write, + std::iter::once(tracking_table), + |setup_status| setup_status[0].apply_change(), + ) + .await?; } + + let mut setup_status_by_target_kind = IndexMap::<&str, Vec<_>>::new(); for target_resource in &flow_status.target_resources { - maybe_update_resource_setup(write, target_resource).await?; + setup_status_by_target_kind + .entry(target_resource.key.target_kind.as_str()) + .or_default() + .push(target_resource); + } + for (target_kind, resources) in setup_status_by_target_kind.into_iter() { + maybe_update_resource_setup( + target_kind, + write, + resources.into_iter(), + |setup_status| async move { + let factory = get_export_target_factory(target_kind).ok_or_else(|| { + anyhow::anyhow!("No factory found for target kind: {}", target_kind) + })?; + factory + .apply_setup_changes(setup_status.into_iter().map(|s| s.as_ref()).collect()) + .await?; + Ok(()) + }, + ) + .await?; } let is_deletion = flow_status.status == ObjectStatus::Deleted; diff --git a/src/setup/states.rs b/src/setup/states.rs index ad508d35..9821930c 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -218,16 +218,17 @@ pub enum SetupChangeType { Invalid, } -#[async_trait] -pub trait ResourceSetupStatus: Send + Sync + Debug { +pub trait ResourceSetupStatus: Send + Sync + Debug + 'static { fn describe_changes(&self) -> Vec; fn change_type(&self) -> SetupChangeType; - async fn apply_change(&self) -> Result<()>; + // Workaround as Rust doesn't support dyn upcasting before 1.86. + // (https://github.com/rust-lang/rust/issues/65991) + // Can be replaced by a `Any` bound when we require Rust 1.86 or newer. + fn as_any(&self) -> &dyn Any; } -#[async_trait] impl ResourceSetupStatus for Box { fn describe_changes(&self) -> Vec { self.as_ref().describe_changes() @@ -237,12 +238,11 @@ impl ResourceSetupStatus for Box { self.as_ref().change_type() } - async fn apply_change(&self) -> Result<()> { - self.as_ref().apply_change().await + fn as_any(&self) -> &dyn Any { + self as &dyn Any } } -#[async_trait] impl ResourceSetupStatus for std::convert::Infallible { fn describe_changes(&self) -> Vec { unreachable!() @@ -252,8 +252,8 @@ impl ResourceSetupStatus for std::convert::Infallible { unreachable!() } - async fn apply_change(&self) -> Result<()> { - unreachable!() + fn as_any(&self) -> &dyn Any { + self as &dyn Any } }