-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreport.py
380 lines (310 loc) · 13.7 KB
/
report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# Reports on Confluence workflow status including failures
import argparse
import datetime
import json
import logging
import pathlib
import sys
import tempfile
import boto3
logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S',
level=logging.INFO
)
S3 = boto3.client("s3")
SFN = boto3.client("stepfunctions")
SNS = boto3.client("sns")
TASKS = [
"init-workflow-subset",
"init-workflow-global",
"combine-data-subset",
"combine-data-global"
]
TOPIC_STRING = "confluence-reports"
def aggregate_run_data(exe_id, name, temporal_range):
"""Aggregate stats and failure data to create and return job stats and failures."""
module_data = { "name": name, "temporal_range": temporal_range, "modules": [] }
retrieve_total_time(exe_id, module_data)
retrieve_task_data(exe_id, module_data)
failure_data = retrieve_map_data(exe_id, module_data)
return module_data, failure_data
def retrieve_total_time(exe_id, module_data):
"""Retrieve total execution time."""
exe_data = SFN.describe_execution(
executionArn=exe_id
)
# Get current time as the workflow is still execution and stopDate is not populated
module_data["total_time"] = str(datetime.datetime.now(datetime.timezone.utc) - exe_data["startDate"])
def retrieve_task_data(exe_id, module_data):
"""Retrieve execution data for specific state machine tasks."""
exe_data = SFN.get_execution_history(
executionArn=exe_id,
maxResults=1000
)
events = exe_data["events"]
if "nextToken" in exe_data.keys():
while exe_data.get("nextToken"):
exe_data = SFN.get_execution_history(
executionArn=exe_id,
maxResults=1000,
nextToken=exe_data.get("nextToken")
)
events.extend(exe_data["events"])
for event in events:
if event["type"] == "TaskSucceeded" or event["type"] == "TaskFailed":
task_stats = get_task_stats(event)
module_data["modules"].append(task_stats)
return module_data
def get_task_stats(event):
"""Retreive task stats."""
event_data = {}
if "taskSucceededEventDetails" in event.keys():
event_data = json.loads(event["taskSucceededEventDetails"]["output"])
if "taskFailedEventDetails" in event.keys():
event_data = json.loads(event["taskFailedEventDetails"]["output"])
task_stats = {}
if event_data:
job_name = "-".join(event_data["JobName"].split("-")[1:4])
if job_name in TASKS:
start_time = datetime.datetime.fromtimestamp(event_data["CreatedAt"] / 1000, tz=datetime.timezone.utc)
end_time = datetime.datetime.fromtimestamp(event_data["StoppedAt"] / 1000, tz=datetime.timezone.utc)
job_time = end_time - start_time
module_name = job_name.replace("-", " ").title()
failed = 1 if "taskFailedEventDetails" in event.keys() else 0
task_stats = {
"Module": module_name,
"Total Jobs": 1,
"Succeeded": 1 if "taskSucceededEventDetails" in event.keys() else 0,
"Failed": failed,
"Percentage Failed": (failed/1)*100,
"Execution Time": str(job_time)
}
return task_stats
def retrieve_map_data(exe_id, module_data):
"""Retrieve data for map runs and return failures."""
map_run_arns = SFN.list_map_runs(
executionArn=exe_id
)
map_run_arns = [ map_run['mapRunArn'] for map_run in map_run_arns['mapRuns']]
logging.info("Total modules executed: %s", len(map_run_arns))
failure_data = {}
for map_run_arn in map_run_arns:
module_name = map_run_arn.split(":")[-2].split("/")[-1].replace("_", " ").title()
logging.info("Retrieving data for: %s.", module_name)
map_run_data = SFN.describe_map_run(mapRunArn=map_run_arn)
map_stats = get_map_stats(map_run_arn, module_name)
module_data["modules"].append(map_stats)
if map_stats["Failed"]:
failure_data[module_name] = get_failures(map_run_arn)
return failure_data
def get_map_stats(map_run_arn, module_name):
"""Retrieve run stats for map run."""
map_run_data = SFN.describe_map_run(mapRunArn=map_run_arn)
total = map_run_data["executionCounts"]["total"]
failed = map_run_data["executionCounts"]["failed"]
return {
"Module": module_name,
"Total Jobs": total,
"Succeeded": map_run_data["executionCounts"]["succeeded"],
"Failed": failed,
"Percentage Failed": (failed/total)*100,
"Execution Time": str(map_run_data["stopDate"] - map_run_data["startDate"])
}
def get_failures(map_run_arn):
"""Retrieve failed job indexes and identifiers."""
map_run_list = SFN.list_executions(
maxResults=1000,
mapRunArn=map_run_arn
)
executions = map_run_list['executions']
map_run_name = map_run_arn.split("/")[-1].split(":")[0].replace("_", " ").title()
logging.info("Checking failures for %s.", map_run_name)
if "nextToken" in map_run_list.keys():
while map_run_list.get("nextToken"):
map_run_list = SFN.list_executions(
maxResults=1000,
mapRunArn=map_run_arn,
nextToken=map_run_list.get("nextToken")
)
executions.extend(map_run_list['executions'])
logging.debug("%s results so far: %s", map_run_name, len(executions))
logging.info("%s Total executions located: %s", map_run_name, len(executions))
failures = []
for execution in executions:
if execution["status"] == "FAILED":
exe_data = SFN.describe_execution(
executionArn=execution["executionArn"]
)
input_data = json.loads(exe_data["input"])
failures.append({
"Index": input_data["context_index"],
"Value": input_data["context_value"]
})
return failures
def write_module_data(module_data, failure_data, data_dir):
"""Write out module data JSON files to data directory."""
module_file = data_dir.joinpath("module_data_report.json")
with open(module_file, "w") as jf:
json.dump(module_data, jf, indent=2)
logging.info("Wrote module data: %s.", module_file)
failure_file = data_dir.joinpath("module_data_failures.json")
with open(failure_file, "w") as jf:
json.dump(failure_data, jf, indent=2)
logging.info("Wrote module data: %s.", failure_file)
return module_file, failure_file
def upload_module_data(module_file, failure_file, bucket, bucket_key, data_dir):
"""Upload reports to JSON S3 bucket."""
S3.upload_file(
str(module_file),
bucket,
f"{bucket_key}/{module_file.name}",
ExtraArgs={"ServerSideEncryption": "AES256"}
)
logging.info("Uploaded: %s/%s/%s.", bucket, bucket_key, module_file.name)
S3.upload_file(
str(failure_file),
bucket,
f"{bucket_key}/{failure_file.name}",
ExtraArgs={"ServerSideEncryption": "AES256"}
)
logging.info("Uploaded: %s/%s/%s.", bucket, bucket_key, failure_file.name)
return f"{bucket}/{bucket_key}/{module_file.name}", f"{bucket}/{bucket_key}/{failure_file.name}"
def send_module_report(sos_bucket, run_type, bucket_report, bucket_failures, workflow, name):
"""E-mail report module data to SNS topic and include failure if applicable."""
total_stats = format_output(module_data, workflow)
logging.debug(total_stats)
version = name.split("-")[2]
sos_s3_files = get_sos_s3(sos_bucket, run_type, version)
logging.debug(sos_s3_files)
topics = SNS.list_topics()
topic_arn = ""
for topic in topics["Topics"]:
if TOPIC_STRING in topic["TopicArn"]:
topic_arn = topic["TopicArn"]
if topic_arn:
date_str = datetime.datetime.now(datetime.timezone.utc).strftime("%a %b %d %H:%M:%S %Y")
if failure:
subject = f"!FAILURE! Confluence workflow report {date_str} UTC"
message = "CONFLUENCE WORKFLOW FAILURE.\n\n" \
+ f"To locate failures: See AWS Step Function: {workflow} with execution: {name}.\n\n" \
+ "Please visit the following link for documentation on how to troubleshoot: [https://wiki.jpl.nasa.gov/display/PD/Confluence#Confluence-howtohandleerrors].\n\n"
else:
subject = f"Confluence workflow report {date_str} UTC"
message = "CONFLUENCE WORKFLOW SUCCESSFULLY COMPLETED.\n\n"
message += total_stats \
+ f"{sos_s3_files}\n" \
+ f"Report written to: s3://{bucket_report}.\n" \
+ f"Failures written to: s3://{bucket_failures}.\n"
response = SNS.publish(
TopicArn = topic_arn,
Message = message,
Subject = subject
)
logging.info("Email sent to: %s.", topic_arn)
succeedeed = True
else:
logging.error("No SNS Topic was located; an e-mail notification will not be sent.")
succeedeed = False
return succeedeed
def format_output(module_data, workflow):
"""Format output as a string representation of a table for e-mailing."""
total_jobs = 0
total_succeeded = 0
total_failed = 0
for module in module_data["modules"]:
total_jobs += module["Total Jobs"]
total_succeeded += module["Succeeded"]
total_failed += module["Failed"]
total_failed_percentage = (total_failed / total_jobs) * 100
total_execution_time = module_data["total_time"]
totals = f"{workflow} execution: '{module_data["name"]}'\n\n" \
+ f"- Total Jobs: {'{:,}'.format(total_jobs)}\n" \
+ f"- Total Succeeded: {'{:,}'.format(total_succeeded)}\n" \
+ f"- Total Failed: {'{:,}'.format(total_failed)}\n" \
+ f"- Total Failed Percentage: {'{:,.2f}'.format(total_failed_percentage)}\n" \
+ f"- Total Execution Time: {total_execution_time}\n\n"
return totals
def get_sos_s3(sos_bucket, run_type, version):
"""Return name of granules created in SOS bucket."""
s3_files = S3.list_objects_v2(
Bucket=sos_bucket,
MaxKeys=1000,
Prefix=f"{run_type}/{version}"
)
if "Contents" in s3_files.keys():
sos_s3_files = [sos_file["Key"].split("/")[-1] for sos_file in s3_files["Contents"]]
sos_s3_string = f"SOS granules stored in s3://{sos_bucket}/{run_type}/{version}\n\n"
for sos_s3_file in sos_s3_files:
sos_s3_string += f"- {sos_s3_file}\n"
else:
sos_s3_string = f"No SOS granules stored in S3://{sos_bucket}/{run_type}/{version}.\n"
return sos_s3_string
def create_args():
"""Create and return argparser with arguments."""
arg_parser = argparse.ArgumentParser(description="Report on Confluence workflow state")
arg_parser.add_argument("-e",
"--exeid",
type=str,
help="Step Function state machine execution ID.")
arg_parser.add_argument("-t",
"--temporalrange",
type=str,
help="Time parameter used to search Hydrocron.")
arg_parser.add_argument("-s",
"--sosbucket",
type=str,
help="S3 bucket with SOS granules.")
arg_parser.add_argument("-r",
"--runtype",
type=str,
choices=["unconstrained", "constrained"],
help="Run type for Confluence worfklow.")
arg_parser.add_argument("-b",
"--bucket",
type=str,
help="S3 bucket to upload reports to.")
arg_parser.add_argument("-k",
"--bucketkey",
type=str,
help="Unique prefix to upload reports to.")
arg_parser.add_argument("-f",
"--fail",
action="store_true",
help="Indicates workflow state failure.")
return arg_parser
if __name__ == "__main__":
start = datetime.datetime.now()
arg_parser = create_args()
args = arg_parser.parse_args()
exe_id = args.exeid
workflow = exe_id.split(":")[-2]
name = exe_id.split(":")[-1]
temporal_range = args.temporalrange
sos_bucket = args.sosbucket
run_type = args.runtype
bucket = args.bucket
bucket_key = args.bucketkey
failure = args.fail
logging.info("Execution ID: %s", exe_id)
logging.info("Workflow: %s", workflow)
logging.info("Name: %s", name)
logging.info("Temporal range: %s", temporal_range)
logging.info("SOS bucket: %s", sos_bucket)
logging.info("Run type: %s", run_type)
logging.info("S3 bucket: %s", bucket)
logging.info("Bucket key: %s", bucket_key)
logging.info("Failure state: %s", failure)
module_data, failure_data = aggregate_run_data(exe_id, name, temporal_range)
with tempfile.TemporaryDirectory() as temp_dir:
data_dir = pathlib.Path(temp_dir)
data_dir = pathlib.Path("/Users/tebaldi/Documents/workspace/confluence/data/modules/report")
module_file, failure_file = write_module_data(module_data, failure_data, data_dir)
bucket_report, bucket_failures = upload_module_data(module_file, failure_file, bucket, bucket_key, data_dir)
succeeded = send_module_report(sos_bucket, run_type, bucket_report, bucket_failures, workflow, name)
if not succeeded:
logging.info("Error encountered; exiting now...")
sys.exit(1)
end = datetime.datetime.now()
logging.info("Elapsed time: %s", end - start)