|
1 | 1 | import logging
|
2 |
| -from http import HTTPStatus |
3 |
| -from time import sleep |
4 | 2 |
|
5 | 3 | import requests
|
6 | 4 | from celery import shared_task
|
7 | 5 | from celery.exceptions import SoftTimeLimitExceeded
|
8 | 6 | from django.conf import settings
|
9 |
| -from django.core.cache import cache |
10 | 7 | from django.core.exceptions import ObjectDoesNotExist
|
11 |
| -from openwisp_notifications.signals import notify |
12 |
| -from requests.exceptions import RequestException |
13 | 8 | from swapper import load_model
|
14 | 9 |
|
15 |
| -from openwisp_controller.config.api.zerotier_service import ZerotierService |
16 | 10 | from openwisp_utils.tasks import OpenwispCeleryTask
|
17 | 11 |
|
18 |
| -from .settings import API_TASK_RETRY_OPTIONS |
19 |
| - |
20 | 12 | logger = logging.getLogger(__name__)
|
21 | 13 |
|
22 | 14 |
|
23 |
| -class OpenwispApiTask(OpenwispCeleryTask): |
24 |
| - |
25 |
| - _RECOVERABLE_API_CODES = [ |
26 |
| - HTTPStatus.TOO_MANY_REQUESTS, # 429 |
27 |
| - HTTPStatus.INTERNAL_SERVER_ERROR, # 500 |
28 |
| - HTTPStatus.BAD_GATEWAY, # 502 |
29 |
| - HTTPStatus.SERVICE_UNAVAILABLE, # 503 |
30 |
| - HTTPStatus.GATEWAY_TIMEOUT, # 504 |
31 |
| - ] |
32 |
| - |
33 |
| - def _send_api_task_notification(self, type, **kwargs): |
34 |
| - vpn = kwargs.get('instance') |
35 |
| - action = kwargs.get('action').replace('_', ' ') |
36 |
| - status_code = kwargs.get('status_code') |
37 |
| - # Adding some delay here to prevent overlapping |
38 |
| - # of the django success message container |
39 |
| - # with the ow-notification container |
40 |
| - # https://github.com/openwisp/openwisp-notifications/issues/264 |
41 |
| - sleep(2) |
42 |
| - notify.send( |
43 |
| - type=f'api_task_{type}', |
44 |
| - sender=vpn, |
45 |
| - target=vpn, |
46 |
| - action=action, |
47 |
| - status_code=status_code, |
48 |
| - ) |
49 |
| - |
50 |
| - def handle_api_call(self, fn, *args, send_notification=True, **kwargs): |
51 |
| - """ |
52 |
| - This method handles API calls and their responses |
53 |
| - and triggers appropriate web notifications, which include: |
54 |
| -
|
55 |
| - Error notification |
56 |
| - - Sent on any unrecoverable API call failure |
57 |
| - Recovery notification |
58 |
| - - Sent only when an error notification was previously triggered |
59 |
| -
|
60 |
| - Also raises an exception for recoverable |
61 |
| - API calls leading to the retrying of the API task |
62 |
| -
|
63 |
| - NOTE: The method utilizes a cache key |
64 |
| - to prevent flooding of similar task notifications |
65 |
| -
|
66 |
| - Parameters: |
67 |
| - fn: API service method |
68 |
| - *args: Arguments for the API service method |
69 |
| - send_notification: If True, send notifications for API tasks |
70 |
| - **kwargs: Arguments used by the _send_api_task_notification method |
71 |
| - """ |
72 |
| - updated_config = None |
73 |
| - err_msg = kwargs.get('err') |
74 |
| - info_msg = kwargs.get('info') |
75 |
| - vpn = kwargs.get('instance') |
76 |
| - if send_notification: |
77 |
| - task_key = f'{self.name}_{vpn.pk.hex}_last_operation' |
78 |
| - # Execute API call and get response |
79 |
| - response = fn(*args) |
80 |
| - if isinstance(response, tuple): |
81 |
| - response, updated_config = response |
82 |
| - try: |
83 |
| - response.raise_for_status() |
84 |
| - logger.info(info_msg) |
85 |
| - if send_notification: |
86 |
| - task_result = cache.get(task_key) |
87 |
| - if task_result == 'error': |
88 |
| - self._send_api_task_notification('recovery', **kwargs) |
89 |
| - cache.set(task_key, 'success', None) |
90 |
| - except RequestException as exc: |
91 |
| - if response.status_code in self._RECOVERABLE_API_CODES: |
92 |
| - retry_logger = logger.warn |
93 |
| - # When retry limit is reached, use error logging |
94 |
| - if self.request.retries == self.max_retries: |
95 |
| - retry_logger = logger.error |
96 |
| - retry_logger( |
97 |
| - f'Try [{self.request.retries}/{self.max_retries}] ' |
98 |
| - f'{err_msg}, Error: {exc}' |
99 |
| - ) |
100 |
| - raise exc |
101 |
| - logger.error(f'{err_msg}, Error: {exc}') |
102 |
| - if send_notification: |
103 |
| - task_result = cache.get(task_key) |
104 |
| - if task_result in (None, 'success'): |
105 |
| - cache.set(task_key, 'error', None) |
106 |
| - self._send_api_task_notification( |
107 |
| - 'error', status_code=response.status_code, **kwargs |
108 |
| - ) |
109 |
| - return (response, updated_config) if updated_config else response |
110 |
| - |
111 |
| - |
112 | 15 | @shared_task(soft_time_limit=7200)
|
113 | 16 | def update_template_related_config_status(template_pk):
|
114 | 17 | """
|
@@ -211,150 +114,6 @@ def trigger_vpn_server_endpoint(endpoint, auth_token, vpn_id):
|
211 | 114 | )
|
212 | 115 |
|
213 | 116 |
|
214 |
| -@shared_task( |
215 |
| - bind=True, |
216 |
| - base=OpenwispApiTask, |
217 |
| - autoretry_for=(RequestException,), |
218 |
| - **API_TASK_RETRY_OPTIONS, |
219 |
| -) |
220 |
| -def trigger_zerotier_server_update(self, config, vpn_id): |
221 |
| - Vpn = load_model('config', 'Vpn') |
222 |
| - vpn = Vpn.objects.get(pk=vpn_id) |
223 |
| - service_method = ZerotierService( |
224 |
| - vpn.host, vpn.auth_token, vpn.subnet.subnet |
225 |
| - ).update_network |
226 |
| - response, updated_config = self.handle_api_call( |
227 |
| - service_method, |
228 |
| - config, |
229 |
| - vpn.network_id, |
230 |
| - instance=vpn, |
231 |
| - action='update', |
232 |
| - info=( |
233 |
| - f'Successfully updated the configuration of ' |
234 |
| - f'ZeroTier VPN Server with UUID: {vpn_id}' |
235 |
| - ), |
236 |
| - err=( |
237 |
| - f'Failed to update ZeroTier VPN Server configuration, ' |
238 |
| - f'VPN Server UUID: {vpn_id}' |
239 |
| - ), |
240 |
| - ) |
241 |
| - if response.status_code == 200: |
242 |
| - vpn.network_id = updated_config.pop('id', None) |
243 |
| - vpn.config = {**vpn.config, 'zerotier': [updated_config]} |
244 |
| - # Update zerotier network controller |
245 |
| - trigger_zerotier_server_update_member.delay(vpn_id) |
246 |
| - |
247 |
| - |
248 |
| -@shared_task( |
249 |
| - bind=True, |
250 |
| - base=OpenwispApiTask, |
251 |
| - autoretry_for=(RequestException,), |
252 |
| - **API_TASK_RETRY_OPTIONS, |
253 |
| -) |
254 |
| -def trigger_zerotier_server_update_member(self, vpn_id): |
255 |
| - Vpn = load_model('config', 'Vpn') |
256 |
| - vpn = Vpn.objects.get(pk=vpn_id) |
257 |
| - service_method = ZerotierService( |
258 |
| - vpn.host, |
259 |
| - vpn.auth_token, |
260 |
| - ).update_network_member |
261 |
| - self.handle_api_call( |
262 |
| - service_method, |
263 |
| - vpn.node_id, |
264 |
| - vpn.network_id, |
265 |
| - vpn.ip.ip_address, |
266 |
| - instance=vpn, |
267 |
| - action='update_member', |
268 |
| - info=( |
269 |
| - f'Successfully updated ZeroTier network member: {vpn.node_id}, ' |
270 |
| - f'ZeroTier network: {vpn.network_id}, ' |
271 |
| - f'ZeroTier VPN server UUID: {vpn_id}' |
272 |
| - ), |
273 |
| - err=( |
274 |
| - f'Failed to update ZeroTier network member: {vpn.node_id}, ' |
275 |
| - f'ZeroTier network: {vpn.network_id}, ' |
276 |
| - f'ZeroTier VPN server UUID: {vpn_id}' |
277 |
| - ), |
278 |
| - ) |
279 |
| - |
280 |
| - |
281 |
| -@shared_task( |
282 |
| - bind=True, |
283 |
| - base=OpenwispApiTask, |
284 |
| - autoretry_for=(RequestException,), |
285 |
| - **API_TASK_RETRY_OPTIONS, |
286 |
| -) |
287 |
| -def trigger_zerotier_server_join(self, vpn_id): |
288 |
| - Vpn = load_model('config', 'Vpn') |
289 |
| - vpn = Vpn.objects.get(pk=vpn_id) |
290 |
| - service_method = ZerotierService( |
291 |
| - vpn.host, |
292 |
| - vpn.auth_token, |
293 |
| - ).join_network |
294 |
| - response = self.handle_api_call( |
295 |
| - service_method, |
296 |
| - vpn.network_id, |
297 |
| - instance=vpn, |
298 |
| - action='network_join', |
299 |
| - info=( |
300 |
| - f'Successfully joined the ZeroTier network: {vpn.network_id}, ' |
301 |
| - f'ZeroTier VPN Server UUID: {vpn_id}' |
302 |
| - ), |
303 |
| - err=( |
304 |
| - f'Failed to join ZeroTier network: {vpn.network_id}, ' |
305 |
| - f'VPN Server UUID: {vpn_id}' |
306 |
| - ), |
307 |
| - ) |
308 |
| - if response.ok: |
309 |
| - # Update zerotier network controller |
310 |
| - trigger_zerotier_server_update_member.delay(vpn_id) |
311 |
| - |
312 |
| - |
313 |
| -@shared_task( |
314 |
| - bind=True, |
315 |
| - base=OpenwispApiTask, |
316 |
| - autoretry_for=(RequestException,), |
317 |
| - **API_TASK_RETRY_OPTIONS, |
318 |
| -) |
319 |
| -def trigger_zerotier_server_delete(self, host, auth_token, network_id, vpn_id): |
320 |
| - service_method = ZerotierService(host, auth_token).delete_network |
321 |
| - response = self.handle_api_call( |
322 |
| - service_method, |
323 |
| - network_id, |
324 |
| - info=( |
325 |
| - f'Successfully deleted the ZeroTier VPN Server ' |
326 |
| - f'with UUID: {vpn_id}, Network ID: {network_id}' |
327 |
| - ), |
328 |
| - err=( |
329 |
| - 'Failed to delete ZeroTier VPN Server ' |
330 |
| - f'with UUID: {vpn_id}, Network ID: {network_id}, ' |
331 |
| - 'as it does not exist on the ZeroTier Controller Networks' |
332 |
| - ), |
333 |
| - send_notification=False, |
334 |
| - ) |
335 |
| - # In case of successful deletion of the network |
336 |
| - # we should also remove controller node from the network |
337 |
| - if response.status_code == 200: |
338 |
| - trigger_zerotier_server_leave.delay(host, auth_token, network_id) |
339 |
| - |
340 |
| - |
341 |
| -@shared_task( |
342 |
| - bind=True, |
343 |
| - base=OpenwispApiTask, |
344 |
| - autoretry_for=(RequestException,), |
345 |
| - **API_TASK_RETRY_OPTIONS, |
346 |
| -) |
347 |
| -def trigger_zerotier_server_leave(self, host, auth_token, network_id): |
348 |
| - service_method = ZerotierService(host, auth_token).leave_network |
349 |
| - self.handle_api_call( |
350 |
| - service_method, |
351 |
| - network_id, |
352 |
| - info=f'Successfully left the ZeroTier Network with ID: {network_id}', |
353 |
| - err=f'Failed to leave ZeroTier Network with ID: {network_id}', |
354 |
| - send_notification=False, |
355 |
| - ) |
356 |
| - |
357 |
| - |
358 | 117 | @shared_task(base=OpenwispCeleryTask)
|
359 | 118 | def change_devices_templates(instance_id, model_name, **kwargs):
|
360 | 119 | Device = load_model('config', 'Device')
|
|
0 commit comments