Skip to content

feat(neo4j): __self_contained field for tighter existence control #464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 43 additions & 35 deletions src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ const SRC_PROPS_PARAM: &str = "source_props";
const TGT_KEY_PARAM_PREFIX: &str = "target_key";
const TGT_PROPS_PARAM: &str = "target_props";
const CORE_ELEMENT_MATCHER_VAR: &str = "e";
const SELF_CONTAINED_TAG_FIELD_NAME: &str = "__self_contained";

impl ExportContext {
fn build_key_field_params_n_literal<'a>(
Expand Down Expand Up @@ -360,6 +361,8 @@ impl ExportContext {
let delete_cypher = formatdoc! {"
OPTIONAL MATCH (old_node:{label} {key_fields_literal})
WITH old_node
SET old_node.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL
WITH old_node
WHERE NOT (old_node)--()
DELETE old_node
FINISH
Expand All @@ -369,14 +372,14 @@ impl ExportContext {

let insert_cypher = formatdoc! {"
MERGE (new_node:{label} {key_fields_literal})
{optional_set_props}
SET new_node.{SELF_CONTAINED_TAG_FIELD_NAME} = TRUE{optional_set_props}
FINISH
",
label = node_spec.label,
optional_set_props = if value_fields.is_empty() {
"".to_string()
} else {
format!("SET new_node += ${CORE_PROPS_PARAM}\n")
format!(", new_node += ${CORE_PROPS_PARAM}\n")
},
};

Expand All @@ -402,24 +405,12 @@ impl ExportContext {

DELETE old_rel

WITH old_src, old_tgt
CALL {{
WITH old_src
OPTIONAL MATCH (old_src)-[r]-()
WITH old_src, count(r) AS rels
WHERE rels = 0
DELETE old_src
RETURN 0 AS _1
}}

CALL {{
WITH old_tgt
OPTIONAL MATCH (old_tgt)-[r]-()
WITH old_tgt, count(r) AS rels
WHERE rels = 0
DELETE old_tgt
RETURN 0 AS _2
}}
WITH collect(old_src) + collect(old_tgt) AS nodes_to_check
UNWIND nodes_to_check AS node
WITH DISTINCT node
WHERE NOT COALESCE(node.{SELF_CONTAINED_TAG_FIELD_NAME}, FALSE)
AND COUNT{{ (node)--() }} = 0
DELETE node

FINISH
",
Expand Down Expand Up @@ -478,7 +469,7 @@ impl ExportContext {
create_order: 1,
delete_cypher,
insert_cypher,
delete_before_upsert: false, // true
delete_before_upsert: true,
key_field_params,
key_fields,
value_fields,
Expand Down Expand Up @@ -936,20 +927,37 @@ impl ResourceSetupStatus for GraphElementDataSetupStatus {
async fn clear_graph_element_data(
graph: &Graph,
key: &GraphElement,
is_dependent: bool,
is_self_contained: bool,
) -> Result<()> {
let var_name = CORE_ELEMENT_MATCHER_VAR;
let optional_orphan_condition = if is_dependent {
format!("WHERE NOT ({var_name})--()")
} else {
"".to_string()
let matcher = key.typ.matcher(var_name);
let query_string = match key.typ {
ElementType::Node(_) => {
let optional_reset_self_contained = if is_self_contained {
formatdoc! {"
WITH {var_name}
SET {var_name}.{SELF_CONTAINED_TAG_FIELD_NAME} = NULL
"}
} else {
"".to_string()
};
formatdoc! {"
CALL {{
MATCH {matcher}
{optional_reset_self_contained}
WITH {var_name} WHERE NOT ({var_name})--() DELETE {var_name}
}} IN TRANSACTIONS
"}
}
ElementType::Relationship(_) => {
formatdoc! {"
CALL {{
MATCH {matcher} WITH {var_name} DELETE {var_name}
}} IN TRANSACTIONS
"}
}
};
let matcher = key.typ.matcher(CORE_ELEMENT_MATCHER_VAR);
let delete_query = neo4rs::query(&formatdoc! {"
CALL {{
MATCH {matcher} WITH {var_name} {optional_orphan_condition} DELETE {var_name}
}} IN TRANSACTIONS
"});
let delete_query = neo4rs::query(&query_string);
graph.run(delete_query).await?;
Ok(())
}
Expand Down Expand Up @@ -1348,18 +1356,18 @@ impl StorageFactoryBase for Factory {
// Relationships have no dependency, so can be cleared first.
for (rel_type, conn_spec) in relationship_types.iter() {
let graph = self.graph_pool.get_graph(conn_spec).await?;
clear_graph_element_data(&graph, rel_type, false).await?;
clear_graph_element_data(&graph, rel_type, true).await?;
}
// Clear standalone nodes, which is simpler than dependent nodes.
for (node_label, conn_spec) in node_labels.iter() {
let graph = self.graph_pool.get_graph(conn_spec).await?;
clear_graph_element_data(&graph, node_label, false).await?;
clear_graph_element_data(&graph, node_label, true).await?;
}
// Clear dependent nodes if they're not covered by standalone nodes.
for (node_label, conn_spec) in dependent_node_labels.iter() {
if !node_labels.contains_key(node_label) {
let graph = self.graph_pool.get_graph(conn_spec).await?;
clear_graph_element_data(&graph, node_label, true).await?;
clear_graph_element_data(&graph, node_label, false).await?;
}
}

Expand Down