Skip to content

Commit 13da173

Browse files
authored
feat(neo4j): clear relationship before node during drop (#463)
1 parent 2d0883b commit 13da173

File tree

1 file changed

+69
-51
lines changed

1 file changed

+69
-51
lines changed

src/ops/storages/neo4j.rs

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -853,12 +853,9 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin
853853
format!("({})", strs)
854854
}
855855
}
856-
#[derive(Derivative)]
857-
#[derivative(Debug)]
856+
#[derive(Debug)]
858857
pub struct GraphElementDataSetupStatus {
859858
key: GraphElement,
860-
#[derivative(Debug = "ignore")]
861-
graph_pool: Arc<GraphPool>,
862859
conn_spec: ConnectionSpec,
863860
data_clear: Option<DataClearAction>,
864861
change_type: SetupChangeType,
@@ -867,7 +864,6 @@ pub struct GraphElementDataSetupStatus {
867864
impl GraphElementDataSetupStatus {
868865
fn new(
869866
key: GraphElement,
870-
graph_pool: Arc<GraphPool>,
871867
conn_spec: ConnectionSpec,
872868
desired_state: Option<&SetupState>,
873869
existing: &CombinedState<SetupState>,
@@ -899,7 +895,6 @@ impl GraphElementDataSetupStatus {
899895

900896
Self {
901897
key,
902-
graph_pool,
903898
conn_spec,
904899
data_clear,
905900
change_type,
@@ -938,43 +933,27 @@ impl ResourceSetupStatus for GraphElementDataSetupStatus {
938933
}
939934
}
940935

941-
impl GraphElementDataSetupStatus {
942-
async fn apply_change(&self) -> Result<()> {
943-
let graph = self.graph_pool.get_graph(&self.conn_spec).await?;
944-
if let Some(data_clear) = &self.data_clear {
945-
let delete_query = neo4rs::query(&formatdoc! {"
936+
async fn clear_graph_element_data(
937+
graph: &Graph,
938+
key: &GraphElement,
939+
is_dependent: bool,
940+
) -> Result<()> {
941+
let var_name = CORE_ELEMENT_MATCHER_VAR;
942+
let optional_orphan_condition = if is_dependent {
943+
format!("WHERE NOT ({var_name})--()")
944+
} else {
945+
"".to_string()
946+
};
947+
let matcher = key.typ.matcher(CORE_ELEMENT_MATCHER_VAR);
948+
let delete_query = neo4rs::query(&formatdoc! {"
946949
CALL {{
947-
MATCH {matcher}
948-
WITH {var_name}
949-
{optional_orphan_condition}
950-
DELETE {var_name}
950+
MATCH {matcher} WITH {var_name} {optional_orphan_condition} DELETE {var_name}
951951
}} IN TRANSACTIONS
952-
",
953-
matcher = self.key.typ.matcher(CORE_ELEMENT_MATCHER_VAR),
954-
var_name = CORE_ELEMENT_MATCHER_VAR,
955-
optional_orphan_condition = match self.key.typ {
956-
ElementType::Node(_) => format!("WHERE NOT ({CORE_ELEMENT_MATCHER_VAR})--()"),
957-
_ => "".to_string(),
958-
},
959-
});
960-
graph.run(delete_query).await?;
961-
962-
for node_label in &data_clear.dependent_node_labels {
963-
let delete_node_query = neo4rs::query(&formatdoc! {"
964-
CALL {{
965-
MATCH (n:{node_label})
966-
WHERE NOT (n)--()
967-
DELETE n
968-
}} IN TRANSACTIONS
969-
",
970-
node_label = node_label
971-
});
972-
graph.run(delete_node_query).await?;
973-
}
974-
}
975-
Ok(())
976-
}
952+
"});
953+
graph.run(delete_query).await?;
954+
Ok(())
977955
}
956+
978957
/// Factory for Neo4j relationships
979958
pub struct Factory {
980959
graph_pool: Arc<GraphPool>,
@@ -1266,17 +1245,12 @@ impl StorageFactoryBase for Factory {
12661245
auth_registry: &Arc<AuthRegistry>,
12671246
) -> Result<Self::SetupStatus> {
12681247
let conn_spec = auth_registry.get::<ConnectionSpec>(&key.connection)?;
1269-
let data_status = GraphElementDataSetupStatus::new(
1270-
key,
1271-
self.graph_pool.clone(),
1272-
conn_spec.clone(),
1273-
desired.as_ref(),
1274-
&existing,
1275-
);
1248+
let data_status =
1249+
GraphElementDataSetupStatus::new(key, conn_spec.clone(), desired.as_ref(), &existing);
12761250
let components = components::SetupStatus::create(
12771251
SetupComponentOperator {
12781252
graph_pool: self.graph_pool.clone(),
1279-
conn_spec: conn_spec.clone(),
1253+
conn_spec,
12801254
},
12811255
desired,
12821256
existing,
@@ -1342,10 +1316,54 @@ impl StorageFactoryBase for Factory {
13421316
&self,
13431317
changes: Vec<&'async_trait Self::SetupStatus>,
13441318
) -> Result<()> {
1345-
for change in changes.iter() {
1346-
change.0.apply_change().await?;
1319+
let (data_statuses, components): (Vec<_>, Vec<_>) =
1320+
changes.into_iter().map(|c| (&c.0, &c.1)).unzip();
1321+
1322+
// Relationships first, then nodes, as relationships need to be deleted before nodes they referenced.
1323+
let mut relationship_types = IndexMap::<&GraphElement, &ConnectionSpec>::new();
1324+
let mut node_labels = IndexMap::<&GraphElement, &ConnectionSpec>::new();
1325+
let mut dependent_node_labels = IndexMap::<GraphElement, &ConnectionSpec>::new();
1326+
for data_status in data_statuses.iter() {
1327+
if let Some(data_clear) = &data_status.data_clear {
1328+
match &data_status.key.typ {
1329+
ElementType::Relationship(_) => {
1330+
relationship_types.insert(&data_status.key, &data_status.conn_spec);
1331+
for label in &data_clear.dependent_node_labels {
1332+
dependent_node_labels.insert(
1333+
GraphElement {
1334+
connection: data_status.key.connection.clone(),
1335+
typ: ElementType::Node(label.clone()),
1336+
},
1337+
&data_status.conn_spec,
1338+
);
1339+
}
1340+
}
1341+
ElementType::Node(_) => {
1342+
node_labels.insert(&data_status.key, &data_status.conn_spec);
1343+
}
1344+
}
1345+
}
13471346
}
1348-
apply_component_changes(changes.iter().map(|c| &c.1).collect()).await?;
1347+
1348+
// Relationships have no dependency, so can be cleared first.
1349+
for (rel_type, conn_spec) in relationship_types.iter() {
1350+
let graph = self.graph_pool.get_graph(conn_spec).await?;
1351+
clear_graph_element_data(&graph, rel_type, false).await?;
1352+
}
1353+
// Clear standalone nodes, which is simpler than dependent nodes.
1354+
for (node_label, conn_spec) in node_labels.iter() {
1355+
let graph = self.graph_pool.get_graph(conn_spec).await?;
1356+
clear_graph_element_data(&graph, node_label, false).await?;
1357+
}
1358+
// Clear dependent nodes if they're not covered by standalone nodes.
1359+
for (node_label, conn_spec) in dependent_node_labels.iter() {
1360+
if !node_labels.contains_key(node_label) {
1361+
let graph = self.graph_pool.get_graph(conn_spec).await?;
1362+
clear_graph_element_data(&graph, node_label, true).await?;
1363+
}
1364+
}
1365+
1366+
apply_component_changes(components).await?;
13491367
Ok(())
13501368
}
13511369
}

0 commit comments

Comments
 (0)