Skip to content

feat(setup): batch applying setup changes for the same target type #461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl TrackingTableSetupStatus {
}
}

#[async_trait]
impl ResourceSetupStatus for TrackingTableSetupStatus {
fn describe_changes(&self) -> Vec<String> {
let mut changes: Vec<String> = vec![];
Expand Down Expand Up @@ -157,7 +156,13 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
}
}

async fn apply_change(&self) -> Result<()> {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
}

impl TrackingTableSetupStatus {
pub async fn apply_change(&self) -> Result<()> {
let pool = &get_lib_context()?.builtin_db_pool;
if let Some(desired) = &self.desired_state {
for lagacy_name in self.legacy_table_names.iter() {
Expand Down
28 changes: 26 additions & 2 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::prelude::*;
use crate::setup::ResourceSetupStatus;
use std::fmt::Debug;
use std::hash::Hash;

Expand Down Expand Up @@ -288,6 +289,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
type DeclarationSpec: DeserializeOwned + Send + Sync;
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
type SetupStatus: ResourceSetupStatus;
type ExportContext: Send + Sync + 'static;

fn name(&self) -> &str;
Expand All @@ -310,7 +312,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
desired_state: Option<Self::SetupState>,
existing_states: setup::CombinedState<Self::SetupState>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<impl setup::ResourceSetupStatus + 'static>;
) -> Result<Self::SetupStatus>;

fn check_state_compatibility(
&self,
Expand All @@ -334,6 +336,11 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
) -> Result<()>;

async fn apply_setup_changes(
&self,
setup_status: Vec<&'async_trait Self::SetupStatus>,
) -> Result<()>;
}

#[async_trait]
Expand Down Expand Up @@ -456,8 +463,25 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
.collect::<Result<_>>()?;
StorageFactoryBase::apply_mutation(self, mutations).await
}
}

async fn apply_setup_changes(
&self,
setup_status: Vec<&'async_trait dyn ResourceSetupStatus>,
) -> Result<()> {
StorageFactoryBase::apply_setup_changes(
self,
setup_status
.into_iter()
.map(|s| -> anyhow::Result<_> {
Ok(s.as_any()
.downcast_ref::<T::SetupStatus>()
.ok_or_else(|| anyhow!("Unexpected setup status type"))?)
})
.collect::<Result<Vec<_>>>()?,
)
.await
}
}
fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
existing_states: setup::CombinedState<serde_json::Value>,
) -> Result<setup::CombinedState<T>> {
Expand Down
5 changes: 5 additions & 0 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ pub trait ExportTargetFactory: Send + Sync {
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
) -> Result<()>;

async fn apply_setup_changes(
&self,
setup_status: Vec<&'async_trait dyn setup::ResourceSetupStatus>,
) -> Result<()>;
}

#[derive(Clone)]
Expand Down
44 changes: 32 additions & 12 deletions src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::prelude::*;

use super::spec::{GraphDeclaration, GraphElementMapping, NodeFromFieldsSpec, TargetFieldMapping};
use crate::setup::components::{self, State};
use crate::setup::components::{self, apply_component_changes, State};
use crate::setup::{ResourceSetupStatus, SetupChangeType};
use crate::{ops::sdk::*, setup::CombinedState};

Expand Down Expand Up @@ -688,7 +688,7 @@ impl ComponentKind {
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ComponentKey {
pub struct ComponentKey {
kind: ComponentKind,
name: String,
}
Expand Down Expand Up @@ -754,13 +754,13 @@ impl components::State<ComponentKey> for ComponentState {
}
}

struct SetupComponentOperator {
pub struct SetupComponentOperator {
graph_pool: Arc<GraphPool>,
conn_spec: ConnectionSpec,
}

#[async_trait]
impl components::Operator for SetupComponentOperator {
impl components::SetupOperator for SetupComponentOperator {
type Key = ComponentKey;
type State = ComponentState;
type SetupState = SetupState;
Expand Down Expand Up @@ -855,7 +855,7 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin
}
#[derive(Derivative)]
#[derivative(Debug)]
struct SetupStatus {
pub struct GraphElementDataSetupStatus {
key: GraphElement,
#[derivative(Debug = "ignore")]
graph_pool: Arc<GraphPool>,
Expand All @@ -864,7 +864,7 @@ struct SetupStatus {
change_type: SetupChangeType,
}

impl SetupStatus {
impl GraphElementDataSetupStatus {
fn new(
key: GraphElement,
graph_pool: Arc<GraphPool>,
Expand Down Expand Up @@ -907,8 +907,7 @@ impl SetupStatus {
}
}

#[async_trait]
impl ResourceSetupStatus for SetupStatus {
impl ResourceSetupStatus for GraphElementDataSetupStatus {
fn describe_changes(&self) -> Vec<String> {
let mut result = vec![];
if let Some(data_clear) = &self.data_clear {
Expand All @@ -934,6 +933,12 @@ impl ResourceSetupStatus for SetupStatus {
self.change_type
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GraphElementDataSetupStatus {
async fn apply_change(&self) -> Result<()> {
let graph = self.graph_pool.get_graph(&self.conn_spec).await?;
if let Some(data_clear) = &self.data_clear {
Expand Down Expand Up @@ -1077,6 +1082,10 @@ impl StorageFactoryBase for Factory {
type Spec = Spec;
type DeclarationSpec = Declaration;
type SetupState = SetupState;
type SetupStatus = (
GraphElementDataSetupStatus,
components::SetupStatus<SetupComponentOperator>,
);
type Key = GraphElement;
type ExportContext = ExportContext;

Expand Down Expand Up @@ -1255,24 +1264,24 @@ impl StorageFactoryBase for Factory {
desired: Option<SetupState>,
existing: CombinedState<SetupState>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<impl ResourceSetupStatus + 'static> {
) -> Result<Self::SetupStatus> {
let conn_spec = auth_registry.get::<ConnectionSpec>(&key.connection)?;
let base = SetupStatus::new(
let data_status = GraphElementDataSetupStatus::new(
key,
self.graph_pool.clone(),
conn_spec.clone(),
desired.as_ref(),
&existing,
);
let comp = components::Status::create(
let components = components::SetupStatus::create(
SetupComponentOperator {
graph_pool: self.graph_pool.clone(),
conn_spec: conn_spec.clone(),
},
desired,
existing,
)?;
Ok(components::combine_setup_statuss(base, comp))
Ok((data_status, components))
}

fn check_state_compatibility(
Expand Down Expand Up @@ -1328,4 +1337,15 @@ impl StorageFactoryBase for Factory {
}
Ok(())
}

async fn apply_setup_changes(
&self,
changes: Vec<&'async_trait Self::SetupStatus>,
) -> Result<()> {
for change in changes.iter() {
change.0.apply_change().await?;
}
apply_component_changes(changes.iter().map(|c| &c.1).collect()).await?;
Ok(())
}
}
20 changes: 18 additions & 2 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,6 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String
format!("{} {}", index_name, to_index_spec_sql(index_spec))
}

#[async_trait]
impl setup::ResourceSetupStatus for SetupStatus {
fn describe_changes(&self) -> Vec<String> {
let mut descriptions = vec![];
Expand Down Expand Up @@ -821,6 +820,12 @@ impl setup::ResourceSetupStatus for SetupStatus {
}
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl SetupStatus {
async fn apply_change(&self) -> Result<()> {
let table_name = &self.table_name;
if self.drop_existing {
Expand Down Expand Up @@ -907,6 +912,7 @@ impl StorageFactoryBase for Factory {
type Spec = Spec;
type DeclarationSpec = ();
type SetupState = SetupState;
type SetupStatus = SetupStatus;
type Key = TableId;
type ExportContext = ExportContext;

Expand Down Expand Up @@ -976,7 +982,7 @@ impl StorageFactoryBase for Factory {
desired: Option<SetupState>,
existing: setup::CombinedState<SetupState>,
auth_registry: &Arc<AuthRegistry>,
) -> Result<impl setup::ResourceSetupStatus + 'static> {
) -> Result<SetupStatus> {
Ok(SetupStatus::new(
get_db_pool(key.database.as_ref(), auth_registry).await?,
key.table_name,
Expand Down Expand Up @@ -1049,4 +1055,14 @@ impl StorageFactoryBase for Factory {
}
Ok(())
}

async fn apply_setup_changes(
&self,
setup_status: Vec<&'async_trait Self::SetupStatus>,
) -> Result<()> {
for setup_status in setup_status.iter() {
setup_status.apply_change().await?;
}
Ok(())
}
}
10 changes: 9 additions & 1 deletion src/ops/storages/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ impl StorageFactoryBase for Arc<Factory> {
type Spec = Spec;
type DeclarationSpec = ();
type SetupState = ();
type SetupStatus = Infallible;
type Key = String;
type ExportContext = ExportContext;

Expand Down Expand Up @@ -390,7 +391,7 @@ impl StorageFactoryBase for Arc<Factory> {
_desired: Option<()>,
_existing: setup::CombinedState<()>,
_auth_registry: &Arc<AuthRegistry>,
) -> Result<impl setup::ResourceSetupStatus + 'static> {
) -> Result<Self::SetupStatus> {
Err(anyhow!("Set `setup_by_user` to `true` to export to Qdrant")) as Result<Infallible, _>
}

Expand Down Expand Up @@ -418,4 +419,11 @@ impl StorageFactoryBase for Arc<Factory> {
}
Ok(())
}

async fn apply_setup_changes(
&self,
_setup_status: Vec<&'async_trait Self::SetupStatus>,
) -> Result<()> {
Err(anyhow!("Qdrant does not support setup changes"))
}
}
Loading