From 36721f14ce85c47bde4615163d1ffc73ec1690a6 Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 10 May 2025 13:49:14 -0700 Subject: [PATCH] feat(neo4j): `__self_contained` field for tighter existence control --- src/ops/storages/neo4j.rs | 78 +++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index bad7fc09..4084aed8 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -326,6 +326,7 @@ const SRC_PROPS_PARAM: &str = "source_props"; const TGT_KEY_PARAM_PREFIX: &str = "target_key"; const TGT_PROPS_PARAM: &str = "target_props"; const CORE_ELEMENT_MATCHER_VAR: &str = "e"; +const SELF_CONTAINED_TAG_FIELD_NAME: &str = "__self_contained"; impl ExportContext { fn build_key_field_params_n_literal<'a>( @@ -360,6 +361,8 @@ impl ExportContext { let delete_cypher = formatdoc! {" OPTIONAL MATCH (old_node:{label} {key_fields_literal}) WITH old_node + SET old_node.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL + WITH old_node WHERE NOT (old_node)--() DELETE old_node FINISH @@ -369,14 +372,14 @@ impl ExportContext { let insert_cypher = formatdoc! {" MERGE (new_node:{label} {key_fields_literal}) - {optional_set_props} + SET new_node.{SELF_CONTAINED_TAG_FIELD_NAME} = TRUE{optional_set_props} FINISH ", label = node_spec.label, optional_set_props = if value_fields.is_empty() { "".to_string() } else { - format!("SET new_node += ${CORE_PROPS_PARAM}\n") + format!(", new_node += ${CORE_PROPS_PARAM}\n") }, }; @@ -402,24 +405,12 @@ impl ExportContext { DELETE old_rel - WITH old_src, old_tgt - CALL {{ - WITH old_src - OPTIONAL MATCH (old_src)-[r]-() - WITH old_src, count(r) AS rels - WHERE rels = 0 - DELETE old_src - RETURN 0 AS _1 - }} - - CALL {{ - WITH old_tgt - OPTIONAL MATCH (old_tgt)-[r]-() - WITH old_tgt, count(r) AS rels - WHERE rels = 0 - DELETE old_tgt - RETURN 0 AS _2 - }} + WITH collect(old_src) + collect(old_tgt) AS nodes_to_check + UNWIND nodes_to_check AS node + WITH DISTINCT node + WHERE NOT COALESCE(node.{SELF_CONTAINED_TAG_FIELD_NAME}, FALSE) + AND COUNT{{ (node)--() }} = 0 + DELETE node FINISH ", @@ -478,7 +469,7 @@ impl ExportContext { create_order: 1, delete_cypher, insert_cypher, - delete_before_upsert: false, // true + delete_before_upsert: true, key_field_params, key_fields, value_fields, @@ -936,20 +927,37 @@ impl ResourceSetupStatus for GraphElementDataSetupStatus { async fn clear_graph_element_data( graph: &Graph, key: &GraphElement, - is_dependent: bool, + is_self_contained: bool, ) -> Result<()> { let var_name = CORE_ELEMENT_MATCHER_VAR; - let optional_orphan_condition = if is_dependent { - format!("WHERE NOT ({var_name})--()") - } else { - "".to_string() + let matcher = key.typ.matcher(var_name); + let query_string = match key.typ { + ElementType::Node(_) => { + let optional_reset_self_contained = if is_self_contained { + formatdoc! {" + WITH {var_name} + SET {var_name}.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL + "} + } else { + "".to_string() + }; + formatdoc! {" + CALL {{ + MATCH {matcher} + {optional_reset_self_contained} + WITH {var_name} WHERE NOT ({var_name})--() DELETE {var_name} + }} IN TRANSACTIONS + "} + } + ElementType::Relationship(_) => { + formatdoc! {" + CALL {{ + MATCH {matcher} WITH {var_name} DELETE {var_name} + }} IN TRANSACTIONS + "} + } }; - let matcher = key.typ.matcher(CORE_ELEMENT_MATCHER_VAR); - let delete_query = neo4rs::query(&formatdoc! {" - CALL {{ - MATCH {matcher} WITH {var_name} {optional_orphan_condition} DELETE {var_name} - }} IN TRANSACTIONS - "}); + let delete_query = neo4rs::query(&query_string); graph.run(delete_query).await?; Ok(()) } @@ -1348,18 +1356,18 @@ impl StorageFactoryBase for Factory { // Relationships have no dependency, so can be cleared first. for (rel_type, conn_spec) in relationship_types.iter() { let graph = self.graph_pool.get_graph(conn_spec).await?; - clear_graph_element_data(&graph, rel_type, false).await?; + clear_graph_element_data(&graph, rel_type, true).await?; } // Clear standalone nodes, which is simpler than dependent nodes. for (node_label, conn_spec) in node_labels.iter() { let graph = self.graph_pool.get_graph(conn_spec).await?; - clear_graph_element_data(&graph, node_label, false).await?; + clear_graph_element_data(&graph, node_label, true).await?; } // Clear dependent nodes if they're not covered by standalone nodes. for (node_label, conn_spec) in dependent_node_labels.iter() { if !node_labels.contains_key(node_label) { let graph = self.graph_pool.get_graph(conn_spec).await?; - clear_graph_element_data(&graph, node_label, true).await?; + clear_graph_element_data(&graph, node_label, false).await?; } }