Skip to content

Commit 2d0883b

Browse files
authored
feat(setup): batch applying setup changes for the same target type (#461)
1 parent cbc1998 commit 2d0883b

File tree

10 files changed

+221
-93
lines changed

10 files changed

+221
-93
lines changed

src/execution/db_tracking_setup.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ impl TrackingTableSetupStatus {
102102
}
103103
}
104104

105-
#[async_trait]
106105
impl ResourceSetupStatus for TrackingTableSetupStatus {
107106
fn describe_changes(&self) -> Vec<String> {
108107
let mut changes: Vec<String> = vec![];
@@ -157,7 +156,13 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
157156
}
158157
}
159158

160-
async fn apply_change(&self) -> Result<()> {
159+
fn as_any(&self) -> &dyn Any {
160+
self as &dyn Any
161+
}
162+
}
163+
164+
impl TrackingTableSetupStatus {
165+
pub async fn apply_change(&self) -> Result<()> {
161166
let pool = &get_lib_context()?.builtin_db_pool;
162167
if let Some(desired) = &self.desired_state {
163168
for lagacy_name in self.legacy_table_names.iter() {

src/ops/factory_bases.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::prelude::*;
2+
use crate::setup::ResourceSetupStatus;
23
use std::fmt::Debug;
34
use std::hash::Hash;
45

@@ -288,6 +289,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
288289
type DeclarationSpec: DeserializeOwned + Send + Sync;
289290
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
290291
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
292+
type SetupStatus: ResourceSetupStatus;
291293
type ExportContext: Send + Sync + 'static;
292294

293295
fn name(&self) -> &str;
@@ -310,7 +312,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
310312
desired_state: Option<Self::SetupState>,
311313
existing_states: setup::CombinedState<Self::SetupState>,
312314
auth_registry: &Arc<AuthRegistry>,
313-
) -> Result<impl setup::ResourceSetupStatus + 'static>;
315+
) -> Result<Self::SetupStatus>;
314316

315317
fn check_state_compatibility(
316318
&self,
@@ -334,6 +336,11 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
334336
&self,
335337
mutations: Vec<ExportTargetMutationWithContext<'async_trait, Self::ExportContext>>,
336338
) -> Result<()>;
339+
340+
async fn apply_setup_changes(
341+
&self,
342+
setup_status: Vec<&'async_trait Self::SetupStatus>,
343+
) -> Result<()>;
337344
}
338345

339346
#[async_trait]
@@ -456,8 +463,25 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
456463
.collect::<Result<_>>()?;
457464
StorageFactoryBase::apply_mutation(self, mutations).await
458465
}
459-
}
460466

467+
async fn apply_setup_changes(
468+
&self,
469+
setup_status: Vec<&'async_trait dyn ResourceSetupStatus>,
470+
) -> Result<()> {
471+
StorageFactoryBase::apply_setup_changes(
472+
self,
473+
setup_status
474+
.into_iter()
475+
.map(|s| -> anyhow::Result<_> {
476+
Ok(s.as_any()
477+
.downcast_ref::<T::SetupStatus>()
478+
.ok_or_else(|| anyhow!("Unexpected setup status type"))?)
479+
})
480+
.collect::<Result<Vec<_>>>()?,
481+
)
482+
.await
483+
}
484+
}
461485
fn from_json_combined_state<T: Debug + Clone + Serialize + DeserializeOwned>(
462486
existing_states: setup::CombinedState<serde_json::Value>,
463487
) -> Result<setup::CombinedState<T>> {

src/ops/interface.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,11 @@ pub trait ExportTargetFactory: Send + Sync {
214214
&self,
215215
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
216216
) -> Result<()>;
217+
218+
async fn apply_setup_changes(
219+
&self,
220+
setup_status: Vec<&'async_trait dyn setup::ResourceSetupStatus>,
221+
) -> Result<()>;
217222
}
218223

219224
#[derive(Clone)]

src/ops/storages/neo4j.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::prelude::*;
22

33
use super::spec::{GraphDeclaration, GraphElementMapping, NodeFromFieldsSpec, TargetFieldMapping};
4-
use crate::setup::components::{self, State};
4+
use crate::setup::components::{self, apply_component_changes, State};
55
use crate::setup::{ResourceSetupStatus, SetupChangeType};
66
use crate::{ops::sdk::*, setup::CombinedState};
77

@@ -688,7 +688,7 @@ impl ComponentKind {
688688
}
689689
}
690690
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
691-
struct ComponentKey {
691+
pub struct ComponentKey {
692692
kind: ComponentKind,
693693
name: String,
694694
}
@@ -754,13 +754,13 @@ impl components::State<ComponentKey> for ComponentState {
754754
}
755755
}
756756

757-
struct SetupComponentOperator {
757+
pub struct SetupComponentOperator {
758758
graph_pool: Arc<GraphPool>,
759759
conn_spec: ConnectionSpec,
760760
}
761761

762762
#[async_trait]
763-
impl components::Operator for SetupComponentOperator {
763+
impl components::SetupOperator for SetupComponentOperator {
764764
type Key = ComponentKey;
765765
type State = ComponentState;
766766
type SetupState = SetupState;
@@ -855,7 +855,7 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin
855855
}
856856
#[derive(Derivative)]
857857
#[derivative(Debug)]
858-
struct SetupStatus {
858+
pub struct GraphElementDataSetupStatus {
859859
key: GraphElement,
860860
#[derivative(Debug = "ignore")]
861861
graph_pool: Arc<GraphPool>,
@@ -864,7 +864,7 @@ struct SetupStatus {
864864
change_type: SetupChangeType,
865865
}
866866

867-
impl SetupStatus {
867+
impl GraphElementDataSetupStatus {
868868
fn new(
869869
key: GraphElement,
870870
graph_pool: Arc<GraphPool>,
@@ -907,8 +907,7 @@ impl SetupStatus {
907907
}
908908
}
909909

910-
#[async_trait]
911-
impl ResourceSetupStatus for SetupStatus {
910+
impl ResourceSetupStatus for GraphElementDataSetupStatus {
912911
fn describe_changes(&self) -> Vec<String> {
913912
let mut result = vec![];
914913
if let Some(data_clear) = &self.data_clear {
@@ -934,6 +933,12 @@ impl ResourceSetupStatus for SetupStatus {
934933
self.change_type
935934
}
936935

936+
fn as_any(&self) -> &dyn Any {
937+
self
938+
}
939+
}
940+
941+
impl GraphElementDataSetupStatus {
937942
async fn apply_change(&self) -> Result<()> {
938943
let graph = self.graph_pool.get_graph(&self.conn_spec).await?;
939944
if let Some(data_clear) = &self.data_clear {
@@ -1077,6 +1082,10 @@ impl StorageFactoryBase for Factory {
10771082
type Spec = Spec;
10781083
type DeclarationSpec = Declaration;
10791084
type SetupState = SetupState;
1085+
type SetupStatus = (
1086+
GraphElementDataSetupStatus,
1087+
components::SetupStatus<SetupComponentOperator>,
1088+
);
10801089
type Key = GraphElement;
10811090
type ExportContext = ExportContext;
10821091

@@ -1255,24 +1264,24 @@ impl StorageFactoryBase for Factory {
12551264
desired: Option<SetupState>,
12561265
existing: CombinedState<SetupState>,
12571266
auth_registry: &Arc<AuthRegistry>,
1258-
) -> Result<impl ResourceSetupStatus + 'static> {
1267+
) -> Result<Self::SetupStatus> {
12591268
let conn_spec = auth_registry.get::<ConnectionSpec>(&key.connection)?;
1260-
let base = SetupStatus::new(
1269+
let data_status = GraphElementDataSetupStatus::new(
12611270
key,
12621271
self.graph_pool.clone(),
12631272
conn_spec.clone(),
12641273
desired.as_ref(),
12651274
&existing,
12661275
);
1267-
let comp = components::Status::create(
1276+
let components = components::SetupStatus::create(
12681277
SetupComponentOperator {
12691278
graph_pool: self.graph_pool.clone(),
12701279
conn_spec: conn_spec.clone(),
12711280
},
12721281
desired,
12731282
existing,
12741283
)?;
1275-
Ok(components::combine_setup_statuss(base, comp))
1284+
Ok((data_status, components))
12761285
}
12771286

12781287
fn check_state_compatibility(
@@ -1328,4 +1337,15 @@ impl StorageFactoryBase for Factory {
13281337
}
13291338
Ok(())
13301339
}
1340+
1341+
async fn apply_setup_changes(
1342+
&self,
1343+
changes: Vec<&'async_trait Self::SetupStatus>,
1344+
) -> Result<()> {
1345+
for change in changes.iter() {
1346+
change.0.apply_change().await?;
1347+
}
1348+
apply_component_changes(changes.iter().map(|c| &c.1).collect()).await?;
1349+
Ok(())
1350+
}
13311351
}

src/ops/storages/postgres.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,6 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String
738738
format!("{} {}", index_name, to_index_spec_sql(index_spec))
739739
}
740740

741-
#[async_trait]
742741
impl setup::ResourceSetupStatus for SetupStatus {
743742
fn describe_changes(&self) -> Vec<String> {
744743
let mut descriptions = vec![];
@@ -821,6 +820,12 @@ impl setup::ResourceSetupStatus for SetupStatus {
821820
}
822821
}
823822

823+
fn as_any(&self) -> &dyn Any {
824+
self
825+
}
826+
}
827+
828+
impl SetupStatus {
824829
async fn apply_change(&self) -> Result<()> {
825830
let table_name = &self.table_name;
826831
if self.drop_existing {
@@ -907,6 +912,7 @@ impl StorageFactoryBase for Factory {
907912
type Spec = Spec;
908913
type DeclarationSpec = ();
909914
type SetupState = SetupState;
915+
type SetupStatus = SetupStatus;
910916
type Key = TableId;
911917
type ExportContext = ExportContext;
912918

@@ -976,7 +982,7 @@ impl StorageFactoryBase for Factory {
976982
desired: Option<SetupState>,
977983
existing: setup::CombinedState<SetupState>,
978984
auth_registry: &Arc<AuthRegistry>,
979-
) -> Result<impl setup::ResourceSetupStatus + 'static> {
985+
) -> Result<SetupStatus> {
980986
Ok(SetupStatus::new(
981987
get_db_pool(key.database.as_ref(), auth_registry).await?,
982988
key.table_name,
@@ -1049,4 +1055,14 @@ impl StorageFactoryBase for Factory {
10491055
}
10501056
Ok(())
10511057
}
1058+
1059+
async fn apply_setup_changes(
1060+
&self,
1061+
setup_status: Vec<&'async_trait Self::SetupStatus>,
1062+
) -> Result<()> {
1063+
for setup_status in setup_status.iter() {
1064+
setup_status.apply_change().await?;
1065+
}
1066+
Ok(())
1067+
}
10521068
}

src/ops/storages/qdrant.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ impl StorageFactoryBase for Arc<Factory> {
332332
type Spec = Spec;
333333
type DeclarationSpec = ();
334334
type SetupState = ();
335+
type SetupStatus = Infallible;
335336
type Key = String;
336337
type ExportContext = ExportContext;
337338

@@ -390,7 +391,7 @@ impl StorageFactoryBase for Arc<Factory> {
390391
_desired: Option<()>,
391392
_existing: setup::CombinedState<()>,
392393
_auth_registry: &Arc<AuthRegistry>,
393-
) -> Result<impl setup::ResourceSetupStatus + 'static> {
394+
) -> Result<Self::SetupStatus> {
394395
Err(anyhow!("Set `setup_by_user` to `true` to export to Qdrant")) as Result<Infallible, _>
395396
}
396397

@@ -418,4 +419,11 @@ impl StorageFactoryBase for Arc<Factory> {
418419
}
419420
Ok(())
420421
}
422+
423+
async fn apply_setup_changes(
424+
&self,
425+
_setup_status: Vec<&'async_trait Self::SetupStatus>,
426+
) -> Result<()> {
427+
Err(anyhow!("Qdrant does not support setup changes"))
428+
}
421429
}

0 commit comments

Comments
 (0)