|
2 | 2 | from src.utils.arguments import arg_parser
|
3 | 3 | from src.utils.log import logger
|
4 | 4 | from pytimeparse2 import parse
|
| 5 | +from time import time |
5 | 6 | import requests
|
6 |
| -import time |
| 7 | + |
7 | 8 |
|
8 | 9 | prom_addr = arg_parser().get("prom.addr")
|
| 10 | +running_tasks = False |
9 | 11 |
|
10 | 12 |
|
11 |
| -def delete_series(policy_name: str, policy: dict) -> None: |
| 13 | +def delete_series(policy_name: str, policy: dict) -> bool: |
12 | 14 | """
|
13 |
| - This function calls two Prometheus endpoints: |
14 |
| - * POST /api/v1/admin/tsdb/delete_series |
15 |
| - * POST /api/v1/admin/tsdb/clean_tombstones |
| 15 | + This function calls following Prometheus endpoint: |
| 16 | + POST /api/v1/admin/tsdb/delete_series |
16 | 17 | User-defined policies passed to this function
|
17 |
| - perform cleanup based on the specified policy settings. |
| 18 | + perform clean-up based on the specified policy settings. |
18 | 19 | """
|
19 |
| - time_range = time.time() - parse(policy["keep_for"]) |
20 |
| - start_time = time.time() |
| 20 | + time_range = time() - parse(policy["keep_for"]) |
21 | 21 | try:
|
22 | 22 | r = requests.post(
|
23 | 23 | f'{prom_addr}/api/v1/admin/tsdb/delete_series?match[]={policy["match"]}&end={time_range}')
|
24 | 24 | except BaseException as e:
|
25 | 25 | logger.error(e, extra={"policy_name": policy_name})
|
26 | 26 | else:
|
27 |
| - if r.status_code != 204: |
28 |
| - logger.error(f"Failed to delete series, {r.json().get('error')}", extra={ |
29 |
| - "status": r.status_code, "policy_name": policy_name}) |
30 |
| - return |
31 |
| - try: |
32 |
| - r = requests.post( |
33 |
| - f'{prom_addr}/api/v1/admin/tsdb/clean_tombstones') |
34 |
| - except BaseException as e: |
35 |
| - logger.error(e, extra={"policy_name": policy_name}) |
36 |
| - return |
37 |
| - else: |
38 |
| - if r.status_code != 204: |
39 |
| - logger.error(f"Failed to clean tombstones, {r.json().get('error')}", extra={ |
40 |
| - "status": r.status_code, "policy_name": policy_name}) |
41 |
| - return |
42 |
| - exec_time = float("{:.2f}".format(time.time() - start_time)) |
43 |
| - logger.debug("Task cleanup time-series has been successfully completed", |
44 |
| - extra={"policy_name": policy_name, "exec_time": exec_time}) |
45 |
| - return |
46 |
| - |
47 |
| - |
48 |
| -def task_run_policies(): |
| 27 | + if r.status_code == 204: |
| 28 | + logger.debug("Task clean-up time-series has been successfully completed", |
| 29 | + extra={"policy_name": policy_name}) |
| 30 | + return True |
| 31 | + logger.error(f"Failed to delete series, {r.json().get('error')}", extra={ |
| 32 | + "status": r.status_code, "policy_name": policy_name}) |
| 33 | + return False |
| 34 | + |
| 35 | + |
| 36 | +def clean_tombstones() -> bool: |
| 37 | + """ |
| 38 | + This function calls following Prometheus endpoint: |
| 39 | + POST /api/v1/admin/tsdb/clean_tombstones |
| 40 | + Removes the deleted data from disk and |
| 41 | + cleans up the existing tombstones |
| 42 | + """ |
| 43 | + try: |
| 44 | + r = requests.post( |
| 45 | + f'{prom_addr}/api/v1/admin/tsdb/clean_tombstones') |
| 46 | + except BaseException as e: |
| 47 | + logger.error(e) |
| 48 | + else: |
| 49 | + if r.status_code == 204: |
| 50 | + return True |
| 51 | + logger.error(f"Failed to clean tombstones, {r.json().get('error')}", extra={ |
| 52 | + "status": r.status_code}) |
| 53 | + return False |
| 54 | + |
| 55 | + |
| 56 | +def run_policies() -> bool: |
49 | 57 | """
|
50 | 58 | This function loops over user-defined metrics lifecycle
|
51 |
| - policies and executes the cleanup job one by one |
| 59 | + policies and executes the clean-up job one by one |
52 | 60 | """
|
| 61 | + global running_tasks |
| 62 | + if running_tasks: |
| 63 | + logger.warning( |
| 64 | + "Cannot create a new task. Server is currently processing another task") |
| 65 | + return False |
| 66 | + |
53 | 67 | policies = load_policies()
|
54 | 68 | if policies:
|
55 | 69 | logger.debug(
|
56 | 70 | f"Found {len(policies)} metrics lifecycle {'policies' if len(policies) > 1 else 'policy'}. "
|
57 |
| - f"Starting job to cleanup time-series.") |
| 71 | + f"Starting job to clean-up time-series.") |
| 72 | + running_tasks = True |
| 73 | + start_time = time() |
58 | 74 | for p in policies:
|
59 | 75 | logger.debug(
|
60 |
| - "Task cleanup time-series is in progress", extra={ |
| 76 | + "Task clean-up series is in progress", extra={ |
61 | 77 | "policy_name": p, "match": policies[p]["match"],
|
62 | 78 | "keep_for": policies[p]["keep_for"]})
|
63 | 79 | delete_series(policy_name=p, policy=policies[p])
|
| 80 | + clean_tombstones() |
| 81 | + exec_time = float("{:.2f}".format(time() - start_time)) |
| 82 | + running_tasks = False |
| 83 | + logger.debug( |
| 84 | + "Task clean-up series has been completed", extra={ |
| 85 | + "duration": exec_time}) |
| 86 | + return True |
0 commit comments