Skip to content

Commit 950bd04

Browse files
authored
0.4.0 (#2)
* Custom node key support added * Relationship upload process updated to use tuple data and unwind. * Dedupe Nodes and relationship dictionary input payloads added * Upload now returns seconds to complete, nodes created, relationships created, and total properties set * Params added to query generation * Dedupe modified to CREATE new nodes if enabled * Target neo4j database name option added * Subprocessing functions moved to a utils file + nodes also using WITH + UNWIND statement * Batch processing added * Batch configuration from main upload function * version updated 0.4.0
1 parent 9870c60 commit 950bd04

9 files changed

+1084
-313
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ upload(credentials, data, node_key="name)
3535
* `node_key` - The name of the key in the nodes dictionary that contains the node name. Defaults to "name"
3636
* `dedupe_nodes` - If True, will not upload duplicate Nodes. Only updates existing with specified properties. Defaults to True
3737
* `dedupe_relationships` - If True, will not created duplicate Relationships. Only updates existing with specified properties. Defaults to True
38-
* `should_overwrite` - If True, will overwrite existing nodes and relationships. Defaults to False
38+
* `should_overwrite` - If True, will clear existing constraints, and will overwrite any existing Nodes and Relationships. Defaults to False.
3939

4040

4141
## Data Argument Format

neo4j_uploader/__init__.py

+35-302
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from timeit import default_timer as timer
22
from neo4j_uploader.n4j import execute_query, reset
3+
from neo4j_uploader.upload_utils import upload_nodes, upload_relationships
34
from neo4j_uploader.logger import ModuleLogger
45
import json
56

@@ -18,303 +19,16 @@ def stop_logging():
1819
ModuleLogger().info(f'Discontinuing logging')
1920
ModuleLogger().is_enabled = False
2021

21-
def prop_subquery(record: dict, exclude_keys: list[str] = [])-> str:
22-
23-
# Embed any prop data within brackets { }
24-
query = " {"
25-
26-
filtered_keys = [key for key in record.keys() if key not in exclude_keys]
27-
sorted_keys = sorted(list(filtered_keys))
28-
29-
for idx, a_key in enumerate(sorted_keys):
30-
value = record[a_key]
31-
32-
# Do not set properties with a None/Null/Empty value
33-
if value is None:
34-
continue
35-
if isinstance(value, str):
36-
if value.lower() == "none":
37-
continue
38-
if value.lower() == "null":
39-
continue
40-
if value.lower() == "empty":
41-
continue
42-
43-
if idx!= 0:
44-
query += ", "
45-
46-
# Using params better but requires creating a separate dictionary be created and passed up to the aggregate function call
47-
if isinstance(value, str):
48-
escaped_value = value.replace('"','\\"')
49-
query += f'`{a_key}`:"{escaped_value}"'
50-
else:
51-
query += f'`{a_key}`:{value}'
52-
53-
# Close out query
54-
query += "}"
55-
56-
return query
57-
58-
class HashableDict:
59-
def __init__(self, dictionary):
60-
self.dictionary = dictionary
61-
62-
def __hash__(self):
63-
return hash(frozenset(self.dictionary.items()))
64-
65-
def upload_node_records_query(
66-
label: str,
67-
nodes: list[dict],
68-
dedupe : bool = True
69-
):
70-
"""
71-
Generate Cypher Node update query.
72-
73-
Args:
74-
label: Label of Nodes to generate
75-
76-
nodes: A list of dictionaries representing Node properties
77-
78-
dedupe: Remove duplicate entries
79-
80-
Raises:
81-
Exceptions if data is not in the correct format or if the upload fails.
82-
"""
83-
84-
if nodes is None:
85-
return None
86-
if len(nodes) == 0:
87-
return None
88-
89-
query = ""
90-
count = 0 # Using count to distinguish node variables
91-
92-
if dedupe == True:
93-
nodes = [dict(t) for t in {tuple(n.items()) for n in nodes}]
94-
95-
for node_record in nodes:
96-
count += 1
97-
98-
# Add a newline if this is not the first node
99-
if count != 1:
100-
query += "\n"
101-
102-
# Convert contents into a subquery specifying node properties
103-
subquery = prop_subquery(node_record)
104-
105-
# Cypher does not support labels with whitespaces
106-
query += f"""MERGE (`{label}{count}`:`{label}`{subquery})"""
107-
108-
return query
109-
110-
def upload_nodes(
111-
neo4j_creds:(str, str, str),
112-
nodes: dict,
113-
dedupe : bool = True
114-
)-> (int, int):
115-
"""
116-
Uploads a list of dictionary objects as nodes.
117-
118-
Args:
119-
neo4j_credits: Tuple containing the hostname, username, and password of the target Neo4j instance
120-
121-
nodes: A dictionary of objects to upload. Each key is a unique node label and contains a list of records as dictionary objects.
122-
123-
dedupe: Remove duplicate entries. Default True
124-
125-
Returns:
126-
A tuple containing the number of nodes created and properties set.
127-
128-
Raises:
129-
Exceptions if data is not in the correct format or if the upload fails.
130-
"""
131-
if nodes is None:
132-
return None
133-
if len(nodes) == 0:
134-
return None
135-
136-
# For reporting
137-
nodes_created = 0
138-
props_set = 0
139-
140-
query = """"""
141-
expected_count = 0
142-
ModuleLogger().debug(f'Uploading node records: {nodes}')
143-
for node_label, nodes_list in nodes.items():
144-
# Process all similar labeled nodes together
145-
query += upload_node_records_query(node_label, nodes_list, dedupe=dedupe)
146-
expected_count += len(nodes_list)
147-
148-
ModuleLogger().debug(f'upload nodes query: {query}')
149-
150-
records, summary, keys = execute_query(neo4j_creds, query)
151-
152-
# Sample summary
153-
# {'metadata': {'query': '<query>', 'parameters': {}, 'query_type': 'w', 'plan': None, 'profile': None, 'notifications': None, 'counters': {'_contains_updates': True, 'labels_added': 17, 'nodes_created': 17, 'properties_set': 78}, 'result_available_after': 73, 'result_consumed_after': 0}
154-
nodes_created += summary.counters.nodes_created
155-
props_set += summary.counters.properties_set
156-
157-
ModuleLogger().info(f'Results from upload nodes: \n\tRecords: {records}\n\tSummary: {summary.__dict__}\n\tKeys: {keys}')
158-
159-
return (nodes_created, props_set)
160-
161-
162-
def with_relationship_elements(
163-
relationships: list[dict],
164-
nodes_key: str = "_uid",
165-
dedupe: bool = False
166-
) -> str:
167-
"""
168-
Returns elements to be added into a batch relationship creation query.
169-
170-
Args:
171-
relationships: A list of dictionary records for each relationship property.
172-
173-
nodes_key: The property key that uniquely identifies Nodes.
174-
175-
Raises:
176-
Exceptions if data is not in the correct format or if the upload ungracefully fails.
177-
"""
178-
# TODO: Possible cypher injection entry point?
179-
180-
result = []
181-
182-
if dedupe == True:
183-
relationships = [dict(t) for t in {tuple(r.items()) for r in relationships}]
184-
185-
for rel in relationships:
186-
187-
# Find from and to node key identifiers
188-
from_key = f"_from_{nodes_key}"
189-
to_key = f"_to_{nodes_key}"
190-
191-
# Get unique key of from and to nodes
192-
from_node = rel.get(from_key, None)
193-
if isinstance(from_node, str):
194-
from_node = f"'{from_node}'"
195-
196-
to_node = rel.get(to_key, None)
197-
if isinstance(to_node, str):
198-
to_node = f"'{to_node}'"
199-
200-
# Validate we from and to nodes to work with
201-
if from_node is None:
202-
ModuleLogger().warning(f'{type} Relationship missing {from_key} property. Skipping relationship {rel}')
203-
continue
204-
if to_node is None:
205-
ModuleLogger().warning(f'{type} Relationship missing {to_key} property. Skipping relationship {rel}')
206-
continue
207-
208-
# string for props
209-
props_string = prop_subquery(rel, exclude_keys=[from_key, to_key])
210-
211-
with_element = f"[{from_node},{to_node},{props_string}]"
212-
result.append(with_element)
213-
214-
return result
215-
216-
217-
218-
def upload_relationships(
219-
neo4j_creds:(str, str, str),
220-
relationships: dict,
221-
nodes_key: str = "_uid",
222-
dedupe : bool = True
223-
)-> (int, int):
224-
"""
225-
Uploads a list of dictionary objects as relationships.
226-
227-
Args:
228-
neo4j_credits: Tuple containing the hostname, username, and password of the target Neo4j instance
229-
230-
nodes: A dictionary of objects to upload. Each key is a unique relationship type and contains a list of records as dictionary objects.
231-
232-
nodes_key: The property key that uniquely identifies Nodes.
233-
234-
dedupe: False means a new relationship will always be created for the from and to nodes. True if existing relationships should only be updated. Note that if several relationships already exist, all matching relationships will get their properties updated. Default True.
235-
236-
Returns:
237-
A tuple of relationships created, properties set
238-
239-
Raises:
240-
Exceptions if data is not in the correct format or if the upload ungracefully fails.
241-
"""
242-
243-
# Final query needs to look something like this
244-
# WITH [['1202109692044412','1204806322568817', {name:"fish"}],['1202109692044411','test', {}]] AS from_to_id_pairs
245-
246-
# UNWIND from_to_id_pairs as pair
247-
248-
# MATCH (fromNode { `gid`: pair[0]})
249-
# MATCH (toNode { `gid`: pair[1]})
250-
251-
# CREATE (fromNode)-[r:`WITHIN`]->(toNode)
252-
# SET r = pair[2]
253-
254-
255-
# Validate
256-
if relationships is None:
257-
return None
258-
if len(relationships) ==0:
259-
return None
260-
261-
262-
ModuleLogger().debug(f'upload relationships source data: {relationships}')
263-
264-
# Upload counts
265-
relationships_created = 0
266-
props_set = 0
267-
268-
# Sort so we get a consistent output
269-
filtered_keys = [key for key in relationships.keys()]
270-
sorted_keys = sorted(list(filtered_keys))
271-
272-
# NOTE: Need to process each relationship type separately as batching this fails with no warning
273-
for idx, rel_type in enumerate(sorted_keys):
274-
275-
rel_list = relationships[rel_type]
276-
ModuleLogger().debug(f'Starting to process relationships type: {rel_type} ...')
277-
278-
# Process all similar labeled nodes together
279-
with_elements = with_relationship_elements(rel_list, nodes_key, dedupe=dedupe)
280-
281-
if len(with_elements) is None:
282-
ModuleLogger().warning(f'Could not process relationships type {rel_type}. Check if data exsists and matches expected schema')
283-
continue
284-
285-
with_elements_str = ",".join(with_elements)
286-
287-
# Assemble final query
288-
rel_upload_query = f"""WITH [{with_elements_str}] AS from_to_data\nUNWIND from_to_data AS tuple\nMATCH (fromNode {{`{nodes_key}`:tuple[0]}})\nMATCH (toNode {{`{nodes_key}`:tuple[1]}})"""
289-
290-
if dedupe == True:
291-
rel_upload_query += f"\nMERGE (fromNode)-[r:`{rel_type}`]->(toNode)"
292-
else:
293-
rel_upload_query += f"\nCREATE (fromNode)-[r:`{rel_type}`]->(toNode)"
294-
rel_upload_query +=f"\nSET r = tuple[2]"
295-
296-
records, summary, keys = execute_query(neo4j_creds, rel_upload_query)
297-
298-
# Sample summary result
299-
# {'metadata': {'query': "<rel_upload_query>", 'parameters': {}, 'query_type': 'w', 'plan': None, 'profile': None, 'notifications': None, 'counters': {'_contains_updates': True, 'relationships_created': 1, 'properties_set': 2}, 'result_available_after': 209, 'result_consumed_after': 0}
300-
301-
# Can we have optionals yet?
302-
relationships_created += summary.counters.relationships_created
303-
props_set += summary.counters.properties_set
304-
305-
ModuleLogger().info(f'Results from uploading relationships type: {rel_type}: \n\tRecords: {records}\n\tSummary: {summary.__dict__}\n\tKeys: {keys}')
306-
307-
return (relationships_created, props_set)
308-
309-
31022
def upload(
311-
neo4j_creds:(str, str, str),
312-
data: str | dict,
313-
node_key : str = "_uid",
314-
dedupe_nodes : bool = True,
315-
dedupe_relationships : bool = True,
316-
should_overwrite: bool = False
317-
)-> (float, int, int, int):
23+
neo4j_creds:(str, str, str),
24+
data: str | dict,
25+
node_key : str = "_uid",
26+
dedupe_nodes : bool = True,
27+
dedupe_relationships : bool = True,
28+
should_overwrite: bool = False,
29+
database_name: str = 'neo4j',
30+
max_batch_size: int = 500,
31+
)-> (float, int, int, int):
31832
"""
31933
Uploads a dictionary of records to a target Neo4j instance.
32034
@@ -344,6 +58,9 @@ def upload(
34458
except Exception as e:
34559
raise Exception(f'Input data string not a valid JSON format: {e}')
34660

61+
if node_key is None or node_key == "":
62+
raise Exception(f'node_key cannot be None or an empty string')
63+
34764
# Start clock
34865
start = timer()
34966

@@ -355,19 +72,35 @@ def upload(
35572
if should_overwrite is True:
35673
reset(neo4j_creds)
35774

358-
nodes_created, node_props_set = upload_nodes(neo4j_creds, nodes, dedupe=dedupe_nodes)
75+
nodes_created, node_props_set = upload_nodes(
76+
neo4j_creds,
77+
nodes,
78+
node_key= node_key,
79+
dedupe=dedupe_nodes,
80+
database= database_name,
81+
max_batch_size= max_batch_size)
82+
83+
all_props_set = node_props_set
35984
relationships_created = 0,
360-
relationship_props_set = 0
36185

36286
# Upload relationship data next
36387
rels = data.get('relationships', None)
36488
if rels is not None and len(rels) > 0:
36589
ModuleLogger().info(f'Begin processing relationships: {rels}')
366-
relationships_created, relationship_props_set = upload_relationships(neo4j_creds, rels, node_key, dedupe = dedupe_relationships)
90+
relationships_created, relationship_props_set = upload_relationships(
91+
neo4j_creds,
92+
rels,
93+
node_key,
94+
dedupe = dedupe_relationships,
95+
database=database_name,
96+
max_batch_size=max_batch_size)
97+
98+
all_props_set += relationship_props_set
36799

368-
# TODO: Verify uploads successful
369100
stop = timer()
370101
time_to_complete = round((stop - start), 4)
371-
all_props_set = node_props_set + relationship_props_set
372102

373-
return (time_to_complete, nodes_created, relationships_created, all_props_set)
103+
return time_to_complete, nodes_created, relationships_created, all_props_set
104+
105+
def clear_db(creds: (str, str, str), database: str):
106+
return reset(creds, database)
Binary file not shown.
1.58 KB
Binary file not shown.

0 commit comments

Comments
 (0)