Skip to content

Commit 6fb0f65

Browse files
authored
feat(neo4j): __self_contained field for tighter existence control (#464)
1 parent 711de4a commit 6fb0f65

File tree

1 file changed

+43
-35
lines changed

1 file changed

+43
-35
lines changed

src/ops/storages/neo4j.rs

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ const SRC_PROPS_PARAM: &str = "source_props";
326326
const TGT_KEY_PARAM_PREFIX: &str = "target_key";
327327
const TGT_PROPS_PARAM: &str = "target_props";
328328
const CORE_ELEMENT_MATCHER_VAR: &str = "e";
329+
const SELF_CONTAINED_TAG_FIELD_NAME: &str = "__self_contained";
329330

330331
impl ExportContext {
331332
fn build_key_field_params_n_literal<'a>(
@@ -360,6 +361,8 @@ impl ExportContext {
360361
let delete_cypher = formatdoc! {"
361362
OPTIONAL MATCH (old_node:{label} {key_fields_literal})
362363
WITH old_node
364+
SET old_node.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL
365+
WITH old_node
363366
WHERE NOT (old_node)--()
364367
DELETE old_node
365368
FINISH
@@ -369,14 +372,14 @@ impl ExportContext {
369372

370373
let insert_cypher = formatdoc! {"
371374
MERGE (new_node:{label} {key_fields_literal})
372-
{optional_set_props}
375+
SET new_node.{SELF_CONTAINED_TAG_FIELD_NAME} = TRUE{optional_set_props}
373376
FINISH
374377
",
375378
label = node_spec.label,
376379
optional_set_props = if value_fields.is_empty() {
377380
"".to_string()
378381
} else {
379-
format!("SET new_node += ${CORE_PROPS_PARAM}\n")
382+
format!(", new_node += ${CORE_PROPS_PARAM}\n")
380383
},
381384
};
382385

@@ -402,24 +405,12 @@ impl ExportContext {
402405
403406
DELETE old_rel
404407
405-
WITH old_src, old_tgt
406-
CALL {{
407-
WITH old_src
408-
OPTIONAL MATCH (old_src)-[r]-()
409-
WITH old_src, count(r) AS rels
410-
WHERE rels = 0
411-
DELETE old_src
412-
RETURN 0 AS _1
413-
}}
414-
415-
CALL {{
416-
WITH old_tgt
417-
OPTIONAL MATCH (old_tgt)-[r]-()
418-
WITH old_tgt, count(r) AS rels
419-
WHERE rels = 0
420-
DELETE old_tgt
421-
RETURN 0 AS _2
422-
}}
408+
WITH collect(old_src) + collect(old_tgt) AS nodes_to_check
409+
UNWIND nodes_to_check AS node
410+
WITH DISTINCT node
411+
WHERE NOT COALESCE(node.{SELF_CONTAINED_TAG_FIELD_NAME}, FALSE)
412+
AND COUNT{{ (node)--() }} = 0
413+
DELETE node
423414
424415
FINISH
425416
",
@@ -478,7 +469,7 @@ impl ExportContext {
478469
create_order: 1,
479470
delete_cypher,
480471
insert_cypher,
481-
delete_before_upsert: false, // true
472+
delete_before_upsert: true,
482473
key_field_params,
483474
key_fields,
484475
value_fields,
@@ -936,20 +927,37 @@ impl ResourceSetupStatus for GraphElementDataSetupStatus {
936927
async fn clear_graph_element_data(
937928
graph: &Graph,
938929
key: &GraphElement,
939-
is_dependent: bool,
930+
is_self_contained: bool,
940931
) -> Result<()> {
941932
let var_name = CORE_ELEMENT_MATCHER_VAR;
942-
let optional_orphan_condition = if is_dependent {
943-
format!("WHERE NOT ({var_name})--()")
944-
} else {
945-
"".to_string()
933+
let matcher = key.typ.matcher(var_name);
934+
let query_string = match key.typ {
935+
ElementType::Node(_) => {
936+
let optional_reset_self_contained = if is_self_contained {
937+
formatdoc! {"
938+
WITH {var_name}
939+
SET {var_name}.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL
940+
"}
941+
} else {
942+
"".to_string()
943+
};
944+
formatdoc! {"
945+
CALL {{
946+
MATCH {matcher}
947+
{optional_reset_self_contained}
948+
WITH {var_name} WHERE NOT ({var_name})--() DELETE {var_name}
949+
}} IN TRANSACTIONS
950+
"}
951+
}
952+
ElementType::Relationship(_) => {
953+
formatdoc! {"
954+
CALL {{
955+
MATCH {matcher} WITH {var_name} DELETE {var_name}
956+
}} IN TRANSACTIONS
957+
"}
958+
}
946959
};
947-
let matcher = key.typ.matcher(CORE_ELEMENT_MATCHER_VAR);
948-
let delete_query = neo4rs::query(&formatdoc! {"
949-
CALL {{
950-
MATCH {matcher} WITH {var_name} {optional_orphan_condition} DELETE {var_name}
951-
}} IN TRANSACTIONS
952-
"});
960+
let delete_query = neo4rs::query(&query_string);
953961
graph.run(delete_query).await?;
954962
Ok(())
955963
}
@@ -1348,18 +1356,18 @@ impl StorageFactoryBase for Factory {
13481356
// Relationships have no dependency, so can be cleared first.
13491357
for (rel_type, conn_spec) in relationship_types.iter() {
13501358
let graph = self.graph_pool.get_graph(conn_spec).await?;
1351-
clear_graph_element_data(&graph, rel_type, false).await?;
1359+
clear_graph_element_data(&graph, rel_type, true).await?;
13521360
}
13531361
// Clear standalone nodes, which is simpler than dependent nodes.
13541362
for (node_label, conn_spec) in node_labels.iter() {
13551363
let graph = self.graph_pool.get_graph(conn_spec).await?;
1356-
clear_graph_element_data(&graph, node_label, false).await?;
1364+
clear_graph_element_data(&graph, node_label, true).await?;
13571365
}
13581366
// Clear dependent nodes if they're not covered by standalone nodes.
13591367
for (node_label, conn_spec) in dependent_node_labels.iter() {
13601368
if !node_labels.contains_key(node_label) {
13611369
let graph = self.graph_pool.get_graph(conn_spec).await?;
1362-
clear_graph_element_data(&graph, node_label, true).await?;
1370+
clear_graph_element_data(&graph, node_label, false).await?;
13631371
}
13641372
}
13651373

0 commit comments

Comments
 (0)