generated from ministryofjustice/hmpps-template-kotlin
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathgenerate_schema_diagram.py
89 lines (74 loc) · 3.53 KB
/
generate_schema_diagram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Module to generate a schema parent-child relationship diagram, with corresponding outputs"""
import sys
import pandas as pd
from graphviz import Digraph
from constants import common
OUTPUT_FILE = common.SCHEMA_PARENT_CHILD_FILE
DIAGRAM_FILE = common.SCHEMA_DIAGRAM
def create_aggregate_data_frame(dict_object, schemas):
"""
Search through the keys of a dataframe, or a list of schema names,
and aggregate the data into a single dataframe.
Parameters:
dict_object (dictionary): Dictionary where keys are schema names
schemas (list): A list or iterable of the schemas
Returns:
aggregate_data_frame (pd.DataFrame): Data frame of all the parent-child schema relations
"""
aggregate_data_frame = pd.DataFrame()
for schema in schemas:
data_frame = common.find_parent_schema(dict_object, schema)
if not data_frame.empty:
aggregate_data_frame = pd.concat([aggregate_data_frame, data_frame], axis=0)
aggregate_data_frame.reset_index(drop=True)
return aggregate_data_frame
def main():
"""The main method, used to call the script. Command line arguments used as search terms"""
response_dict = common.extract_data(common.DEFAULT_URL)
common.prepare_directory(DIAGRAM_FILE)
aggregate_data_frame = pd.DataFrame()
is_full_schema_diagram = len(sys.argv) == 1
if is_full_schema_diagram:
aggregate_data_frame = create_aggregate_data_frame(response_dict,
response_dict["components"]["schemas"])
else:
aggregate_data_frame = create_aggregate_data_frame(response_dict, list(sys.argv[1:]))
#To ensure we don't break future logic we can remove all rows without a child
for index, row_data in aggregate_data_frame.iterrows():
if row_data[2] == "":
aggregate_data_frame.drop(index, inplace=True)
try:
print(aggregate_data_frame.groupby("Parent_Schema").count())
except KeyError as k_e:
print(f"{type(k_e)}", "- Which means no parent data was found in the search")
sys.exit()
#Parents of parents
aggregate_data_frame["Searched_bool"] = False
counter = 0
while counter < 8: #Up to 8 steps away parents can be found
counter +=1
for index, row_data in aggregate_data_frame.iterrows():
if row_data[3] is False:
aggregate_data_frame.at[index,'Searched_bool'] = True
data_frame = common.find_parent_schema(response_dict, row_data[0])
if not data_frame.empty:
data_frame["Searched_bool"] = False
aggregate_data_frame = pd.concat([aggregate_data_frame, data_frame], axis=0)
aggregate_data_frame.reset_index(drop=True)
elif row_data[3] is True:
continue
aggregate_data_frame.drop_duplicates(inplace=True)
aggregate_data_frame.dropna(inplace=True)
schema_graph = Digraph('schema_graph',
filename=DIAGRAM_FILE,
node_attr={'shape': 'record', 'height': '.1'},
graph_attr={'rankdir': 'LR'})
for index, row_data in aggregate_data_frame.dropna().iterrows():
schema_graph.node(row_data[0])
schema_graph.edge(row_data[0], row_data[2])
schema_graph.save(filename=DIAGRAM_FILE)
print(f"Visual saved to {DIAGRAM_FILE=}")
aggregate_data_frame.to_csv(OUTPUT_FILE)
print(f"Child-parent data saved to {OUTPUT_FILE=}")
if __name__ == "__main__":
main()