-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathregister.py
1200 lines (1052 loc) · 58 KB
/
register.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import os
import random
import string
import subprocess # nosec
import time
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, overload
import requests
import six
import transaction
import yaml
from pyramid.httpexceptions import HTTPException
from sqlalchemy.orm.session import Session
from ziggurat_foundations.models.services.group import GroupService
from ziggurat_foundations.models.services.resource import ResourceService
from ziggurat_foundations.models.services.user import UserService
from ziggurat_foundations.models.services.user_resource_permission import UserResourcePermissionService
from magpie import models
from magpie.api.schemas import (
GroupResourcePermissionsAPI,
GroupsAPI,
ServiceAPI,
ServiceResourcesAPI,
ServicesAPI,
SigninAPI,
SignoutAPI,
UserResourcePermissionsAPI,
UsersAPI
)
from magpie.config import validate_services_config
from magpie.constants import get_constant
from magpie.permissions import Permission, PermissionSet
from magpie.services import SERVICE_TYPE_DICT, ServiceWPS
from magpie.utils import (
bool2str,
get_admin_cookies,
get_json,
get_logger,
get_magpie_url,
get_phoenix_url,
get_twitcher_protected_service_url,
islambda,
print_log,
raise_log
)
if TYPE_CHECKING:
# pylint: disable=W0611,unused-import
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from magpie.typedefs import (
JSON,
AnyCookiesType,
AnyResolvedSettings,
AnyResponseType,
AnySettingsContainer,
CombinedConfig,
CookiesOrSessionType,
GroupsConfig,
GroupsSettings,
Literal,
MultiConfigs,
PermissionConfigItem,
PermissionsConfig,
ServicesConfig,
ServicesSettings,
Str,
UsersConfig,
UsersSettings,
WebhooksConfig
)
LOGGER = get_logger(__name__)
LOGIN_ATTEMPT = 5 # max attempts for login
LOGIN_TIMEOUT = 2 # delay (s) between each login attempt
CREATE_SERVICE_INTERVAL = 2 # delay (s) between creations to allow server to respond/process
GETCAPABILITIES_INTERVAL = 10 # delay (s) between 'GetCapabilities' Phoenix calls to validate service registration
GETCAPABILITIES_ATTEMPTS = 12 # max attempts for 'GetCapabilities' validations
# controls
SERVICES_MAGPIE = "MAGPIE"
SERVICES_PHOENIX = "PHOENIX"
SERVICES_PHOENIX_ALLOWED = [ServiceWPS.service_type]
class RegistrationError(RuntimeError):
"""
Generic error during registration operation.
"""
class RegistrationValueError(RegistrationError, ValueError):
"""
Registration error caused by an invalid value precondition.
"""
class RegistrationLoginError(RegistrationError):
"""
Registration error caused by a failure to complete required login operation.
"""
class RegistrationConfigurationError(RegistrationValueError):
"""
Registration error caused by an invalid configuration entry or definition.
"""
def _login_loop(login_url, cookies_file, data=None, message="Login response"):
cookies_dir = os.path.dirname(cookies_file)
if not os.path.isdir(cookies_dir):
os.makedirs(cookies_dir) # don't use "exist_ok" for backward compatibility (Python<3.5)
data_str = ""
if data is not None and isinstance(data, dict):
for key in data:
data_str = data_str + "&" + str(key) + "=" + str(data[key])
if isinstance(data, six.string_types):
data_str = data
attempt = 0
while True:
err, http = _request_curl(login_url, cookie_jar=cookies_file, form_params=data_str, msg=message)
if not err and http == 200:
break
attempt += 1
LOGGER.warning("Login failed, retrying in %ss (%s/%s)", LOGIN_TIMEOUT, attempt, LOGIN_ATTEMPT)
time.sleep(LOGIN_TIMEOUT)
if attempt >= LOGIN_ATTEMPT:
raise RegistrationLoginError("Cannot log in to {0}".format(login_url))
def _request_curl(url, cookie_jar=None, cookies=None, form_params=None, msg="Response"):
# type: (Str, Optional[Str], Optional[Str], Optional[Str], Optional[Str]) -> Tuple[int, int]
"""
Executes a request using cURL.
:returns: tuple of the returned system command code and the response http code
"""
# arg -k allows to ignore insecure SSL errors, ie: access 'https' page not configured for it
# curl_cmd = 'curl -k -L -s -o /dev/null -w "{msg_out} : %{{http_code}}\\n" {params} {url}'
# curl_cmd = curl_cmd.format(msg_out=msg, params=params, url=url)
msg_sep = msg + ": "
params = ["curl", "-k", "-L", "-s", "-o", "/dev/null", "-w", msg_sep + "%{http_code}"]
if cookie_jar is not None and cookies is not None:
raise RegistrationValueError("CookiesType and Cookie_Jar cannot be both set simultaneously")
if cookie_jar is not None:
params.extend(["--cookie-jar", cookie_jar]) # save cookies
if cookies is not None:
params.extend(["--cookie", cookies]) # use cookies
if form_params is not None:
params.extend(["--data", form_params])
params.extend([url])
with subprocess.Popen(params, stdout=subprocess.PIPE) as curl_proc: # nosec
curl_msg = curl_proc.communicate()[0] # type: Str
curl_err = curl_proc.returncode # type: int
http_code = int(six.ensure_text(curl_msg).split(msg_sep)[1])
print_log("[{url}] {response}".format(response=curl_msg, url=url), logger=LOGGER)
return curl_err, http_code
def _phoenix_update_services(services_dict):
# type: (JSON) -> bool
if not _phoenix_remove_services():
print_log("Could not remove services, aborting register sync services to Phoenix", logger=LOGGER)
return False
success, _ = _phoenix_register_services(services_dict)
if not success:
print_log("Failed services registration from Magpie to Phoenix\n"
"[warning: services could have been removed but could not be re-added]", logger=LOGGER)
return False
return True
def _phoenix_login(cookies_file):
# type: (Str) -> bool
"""
Performs Phoenix login using provided cookies.
"""
phoenix_pwd = get_constant("PHOENIX_PASSWORD")
phoenix_url = get_phoenix_url()
login_url = phoenix_url + "/account/login/phoenix"
login_data = {"password": phoenix_pwd, "submit": "submit"}
_login_loop(login_url, cookies_file, login_data, "Phoenix login response")
return _phoenix_login_check(cookies_file)
def _phoenix_login_check(cookies):
# type: (Str) -> bool
"""
Since Phoenix always return 200, even on invalid login, 'hack' check unauthorized access.
:param cookies: temporary cookies file storage used for login with :func:`_phoenix_login`.
:return: status indicating if login access was granted with defined credentials.
"""
no_access_error = "<ExceptionText>Unauthorized: Services failed permission check</ExceptionText>"
svc_url = get_phoenix_url() + "/services"
command = ["curl", "-s", "--cookie", cookies, svc_url]
with subprocess.Popen(command, stdout=subprocess.PIPE) as curl_process: # nosec
curl_http_resp = curl_process.communicate() # nosec
has_access = no_access_error not in curl_http_resp[0]
return has_access
def _phoenix_remove_services():
# type: () -> bool
"""
Removes the Phoenix services using temporary cookies retrieved from login with defined `PHOENIX` constants.
:returns: success status of the procedure.
"""
error = 0
try:
with NamedTemporaryFile() as phoenix_cookies_file:
if not _phoenix_login(phoenix_cookies_file.name):
print_log("Login unsuccessful from post-login check, aborting...", logger=LOGGER)
return False
phoenix_url = get_phoenix_url()
remove_services_url = phoenix_url + "/clear_services"
error, _ = _request_curl(remove_services_url, cookies=phoenix_cookies_file.name,
msg="Phoenix remove services")
except Exception as exc:
print_log("Exception during phoenix remove services: [{!r}]".format(exc), logger=LOGGER, level=logging.ERROR)
return error == 0
def _phoenix_register_services(services_dict, allowed_service_types=None):
# type: (Dict[Str, Dict[Str, Any]], Optional[List[Str]]) -> Tuple[bool, Dict[Str, int]]
success = False
statuses = {}
try:
with NamedTemporaryFile() as phoenix_cookies_file:
allowed_service_types = SERVICES_PHOENIX_ALLOWED if allowed_service_types is None else allowed_service_types
allowed_service_types = [svc.upper() for svc in allowed_service_types]
if not _phoenix_login(phoenix_cookies_file.name):
print_log("Login unsuccessful from post-login check, aborting...", logger=LOGGER, level=logging.WARN)
return False, {}
# Filter specific services to push
filtered_services_dict = {}
for svc in services_dict:
if str(services_dict[svc].get("type")).upper() in allowed_service_types:
filtered_services_dict[svc] = services_dict[svc]
filtered_services_dict[svc]["type"] = filtered_services_dict[svc]["type"].upper()
# Register services
success, statuses = _register_services(SERVICES_PHOENIX, filtered_services_dict,
phoenix_cookies_file.name, "Phoenix register service")
except Exception as exc:
print_log("Exception during phoenix register services: [{!r}]".format(exc), logger=LOGGER, level=logging.ERROR)
return success, statuses
def _register_services(where, # type: Optional[Str]
services_dict, # type: Dict[Str, Dict[Str, Str]]
cookies, # type: Str
message="Register response", # type: Optional[Str]
): # type: (...) -> Tuple[bool, Dict[Str, int]]
"""
Registers services on desired location using provided configurations and access cookies.
:returns: tuple of overall success and individual http response of each service registration.
"""
success = True
svc_url = None
statuses = {}
if where == SERVICES_MAGPIE:
svc_url_tag = "service_url"
register_service_url = get_magpie_url() + ServicesAPI.path
elif where == SERVICES_PHOENIX:
svc_url_tag = "url"
register_service_url = get_phoenix_url() + "/services/register"
else:
raise RegistrationValueError("Unknown location for service registration", where)
for service_name in services_dict:
cfg = services_dict[service_name]
cfg["public"] = bool2str(cfg.get("public"))
cfg["c4i"] = bool2str(cfg.get("c4i"))
cfg["url"] = cfg.get("url")
if where == SERVICES_MAGPIE:
svc_url = cfg["url"]
elif where == SERVICES_PHOENIX:
svc_url = get_twitcher_protected_service_url(service_name)
params = "service_name={name}&" \
"{svc_url_tag}={svc_url}&" \
"service_title={cfg[title]}&" \
"public={cfg[public]}&" \
"c4i={cfg[c4i]}&" \
"service_type={cfg[type]}&" \
"register=register" \
.format(name=service_name, cfg=cfg, svc_url_tag=svc_url_tag, svc_url=svc_url)
service_msg = "{msg} ({svc}) [{url}]".format(msg=message, svc=service_name, url=svc_url)
error, http_code = _request_curl(register_service_url, cookies=cookies, form_params=params, msg=service_msg)
statuses[service_name] = http_code
success = success and not error and ((where == SERVICES_PHOENIX and http_code == 200) or
(where == SERVICES_MAGPIE and http_code == 201))
time.sleep(CREATE_SERVICE_INTERVAL)
return success, statuses
def sync_services_phoenix(services, services_as_dicts=False):
# type: (Union[Iterable[models.Service], JSON], bool) -> bool
"""
Syncs Magpie services by pushing updates to Phoenix.
Services must be one of types specified in :py:data:`magpie.register.SERVICES_PHOENIX_ALLOWED`.
:param services:
An iterable of :class:`models.Service` by default, or a dictionary of ``{svc-name: {<service-info>}}`` JSON
objects containing each service's information if :paramref:`services_ad_dicts` is ``True``.
where ``<service-info>`` is defined as::
{"public_url": <url>, "service_name": <name>, "service_type": <type>}
:param services_as_dicts: indicate if services must be parsed as JSON definitions.
"""
services_dict = {}
for svc in services:
if services_as_dicts:
svc_dict = services[svc] # type: JSON
services_dict[svc] = {"url": svc_dict["public_url"], "title": svc_dict["service_name"],
"type": svc_dict["service_type"], "c4i": False, "public": True}
else:
services_dict[svc.resource_name] = {"url": svc.url, "title": svc.resource_name,
"type": svc.type, "c4i": False, "public": True}
return _phoenix_update_services(services_dict)
def _magpie_add_register_services_perms(services, statuses, curl_cookies, request_cookies, disable_getcapabilities):
# type: (ServicesSettings, Dict[Str, int], str, AnyCookiesType, bool) -> None
magpie_url = get_magpie_url()
anon_group = get_constant("MAGPIE_ANONYMOUS_GROUP")
for service_name in services:
svc_available_perms_url = "{magpie}/services/{svc}/permissions" \
.format(magpie=magpie_url, svc=service_name)
resp_available_perms = requests.get(svc_available_perms_url, cookies=request_cookies, timeout=5)
if resp_available_perms.status_code == 401:
raise_log("Invalid credentials, cannot update service permissions",
exception=RegistrationLoginError, logger=LOGGER)
available_perms = get_json(resp_available_perms).get("permission_names", [])
# only applicable to services supporting "GetCapabilities" request
if resp_available_perms.status_code and Permission.GET_CAPABILITIES.value in available_perms:
# enforce 'getcapabilities' permission if available for service just updated (200) or created (201)
# update 'getcapabilities' permission when the service existed and it allowed
if ((not disable_getcapabilities and statuses[service_name] == 409)
or statuses[service_name] == 200 or statuses[service_name] == 201):
svc_anonym_add_perms_url = "{magpie}/groups/{grp}/services/{svc}/permissions" \
.format(magpie=magpie_url, grp=anon_group, svc=service_name)
svc_anonym_perm_data = {"permission_name": Permission.GET_CAPABILITIES.value}
requests.post(svc_anonym_add_perms_url, data=svc_anonym_perm_data, cookies=request_cookies, timeout=5)
# check service response so Phoenix doesn't refuse registration
# try with both the 'direct' URL and the 'GetCapabilities' URL
attempt = 0
service_info_url = "{magpie}/services/{svc}".format(magpie=magpie_url, svc=service_name)
service_info_resp = requests.get(service_info_url, cookies=request_cookies, timeout=5)
service_url = get_json(service_info_resp).get(service_name).get("service_url")
svc_getcap_url = "{svc_url}/wps?service=WPS&version=1.0.0&request=GetCapabilities" \
.format(svc_url=service_url)
while True:
service_msg_direct = "Service response ({svc})".format(svc=service_name)
service_msg_getcap = "Service response ({svc}, GetCapabilities)".format(svc=service_name)
err, http = _request_curl(service_url, cookies=curl_cookies, msg=service_msg_direct)
if not err and http == 200:
break
err, http = _request_curl(svc_getcap_url, cookies=curl_cookies, msg=service_msg_getcap)
if not err and http == 200:
break
print_log("[{url}] Bad response from service '{svc}' retrying after {sec}s..."
.format(svc=service_name, url=service_url, sec=GETCAPABILITIES_INTERVAL), logger=LOGGER)
time.sleep(GETCAPABILITIES_INTERVAL)
attempt += 1
if attempt >= GETCAPABILITIES_ATTEMPTS:
msg = "[{url}] No response from service '{svc}' after {tries} attempts. Skipping..." \
.format(svc=service_name, url=service_url, tries=attempt)
print_log(msg, logger=LOGGER)
break
def _magpie_update_services_conflict(conflict_services, services_dict, request_cookies):
# type: (List[Str], ServicesSettings, AnyCookiesType) -> Dict[Str, int]
"""
Resolve conflicting services by name during registration by updating them only if pointing to different URL.
"""
magpie_url = get_magpie_url()
statuses = {}
for svc_name in conflict_services:
statuses[svc_name] = 409
svc_url_new = services_dict[svc_name]["url"]
svc_url_db = "{magpie}/services/{svc}".format(magpie=magpie_url, svc=svc_name)
svc_resp = requests.get(svc_url_db, cookies=request_cookies, timeout=5)
svc_info = get_json(svc_resp).get(svc_name)
svc_url_old = svc_info["service_url"]
if svc_url_old != svc_url_new:
svc_info["service_url"] = svc_url_new
res_svc_put = requests.patch(svc_url_db, data=svc_info, cookies=request_cookies, timeout=5)
statuses[svc_name] = res_svc_put.status_code
print_log("[{url_old}] => [{url_new}] Service URL update ({svc}): {resp}"
.format(svc=svc_name, url_old=svc_url_old, url_new=svc_url_new, resp=res_svc_put.status_code),
logger=LOGGER)
return statuses
def _magpie_register_services_with_requests(services_dict, push_to_phoenix, username, password, provider,
force_update=False, disable_getcapabilities=False):
# type: (ServicesSettings, bool, Str, Str, Str, bool, bool) -> bool
"""
Registers :term:`Services` of loaded ``providers`` configuration using API requests.
.. seealso::
:func:`magpie_register_services_from_config`
:param services_dict: services configuration definition.
:param push_to_phoenix: push registered Magpie services to Phoenix for synced configurations.
:param username: login username to use to obtain permissions for services registration.
:param password: login password to use to obtain permissions for services registration.
:param provider: login provider to use to obtain permissions for services registration.
:param force_update: override existing services matched by name
:param disable_getcapabilities: do not execute 'GetCapabilities' validation for applicable services.
:return: successful operation status
"""
magpie_url = get_magpie_url()
session = requests.Session()
success = False
try:
with NamedTemporaryFile() as magpie_cookies_file:
# Need to login first as admin
login_url = magpie_url + SigninAPI.path
login_data = {"user_name": username, "password": password, "provider_name": provider}
_login_loop(login_url, magpie_cookies_file.name, login_data, "Magpie login response")
login_resp = session.post(login_url, data=login_data)
if login_resp.status_code != 200:
raise_log("Failed login with specified credentials", exception=RegistrationLoginError, logger=LOGGER)
request_cookies = login_resp.cookies
# Register services
# Magpie will not overwrite existing services by default, 409 Conflict instead of 201 Created
success, statuses_register = _register_services(SERVICES_MAGPIE, services_dict,
magpie_cookies_file.name, "Magpie register service")
# Service URL update if conflicting and requested
if force_update and not success:
conflict_services = [svc_name for svc_name, http_code in statuses_register.items() if http_code == 409]
statuses_update = _magpie_update_services_conflict(conflict_services, services_dict, request_cookies)
statuses_register.update(statuses_update) # update previous statuses with new ones
# Add 'GetCapabilities' permissions on newly created services to allow 'ping' from Phoenix
# Phoenix doesn't register the service if it cannot be checked with this request
_magpie_add_register_services_perms(services_dict, statuses_register,
magpie_cookies_file.name, request_cookies, disable_getcapabilities)
session.get(magpie_url + SignoutAPI.path)
# Push updated services to Phoenix
if push_to_phoenix:
success = _phoenix_update_services(services_dict)
except Exception as exc:
print_log("Exception during magpie register services: [{!r}]".format(exc), logger=LOGGER, level=logging.ERROR)
finally:
session.cookies.clear()
return success
def _magpie_register_services_with_db_session(services_dict, db_session, push_to_phoenix=False,
force_update=False, update_getcapabilities_permissions=False):
# type: (ServicesSettings, Session, bool, bool, bool) -> bool
"""
Registration procedure of :term:`Services` from ``providers`` section using pre-established database session.
.. seealso::
:func:`magpie_register_services_from_config`
"""
db_session.begin(subtransactions=True)
existing_services_names = [n[0] for n in db_session.query(models.Service.resource_name)]
magpie_anonymous_user = get_constant("MAGPIE_ANONYMOUS_USER")
anonymous_user = UserService.by_user_name(magpie_anonymous_user, db_session=db_session)
for svc_name, svc_values in services_dict.items():
svc_new_url = svc_values["url"]
svc_type = svc_values["type"]
svc_config = svc_values.get("configuration")
svc_sync_type = svc_values.get("sync_type")
if force_update and svc_name in existing_services_names:
svc = models.Service.by_service_name(svc_name, db_session=db_session)
if svc.url == svc_new_url:
print_log("Service URL already properly set [{url}] ({svc})"
.format(url=svc.url, svc=svc_name), logger=LOGGER)
else:
print_log("Service URL update [{url_old}] => [{url_new}] ({svc})"
.format(url_old=svc.url, url_new=svc_new_url, svc=svc_name),
logger=LOGGER, level=logging.WARN)
svc.url = svc_new_url
if svc.type != svc_type:
print_log("Service type update [{type_old}] => [{type_new}] ({svc}). "
"If children resources/permissions are not compatible, this could break the instance."
.format(type_old=svc.type, type_new=svc_type, svc=svc_name),
logger=LOGGER, level=logging.WARN)
svc.type = svc_type
svc.sync_type = svc_sync_type
svc.configuration = svc_config
elif not force_update and svc_name in existing_services_names:
print_log("Skipping service [{svc}] (conflict)" .format(svc=svc_name), logger=LOGGER)
else:
print_log("Adding service [{svc}]".format(svc=svc_name), logger=LOGGER)
svc = models.Service(
resource_name=svc_name,
resource_type=models.Service.resource_type_name,
url=svc_new_url,
type=svc_type,
configuration=svc_config,
sync_type=svc_sync_type
)
db_session.add(svc)
getcap_perm = Permission.GET_CAPABILITIES
if update_getcapabilities_permissions and anonymous_user is None:
print_log("Cannot update 'getcapabilities' permission of non-existing anonymous user",
level=logging.WARN, logger=LOGGER)
elif update_getcapabilities_permissions and getcap_perm in SERVICE_TYPE_DICT[svc_type].permissions:
svc = db_session.query(models.Service.resource_id).filter_by(resource_name=svc_name).first()
svc_perm_getcapabilities = UserResourcePermissionService.by_resource_user_and_perm(
user_id=anonymous_user.id,
perm_name=getcap_perm.value,
resource_id=svc.resource_id,
db_session=db_session
)
if svc_perm_getcapabilities is None:
print_log("Adding '{}' permission to anonymous user.".format(getcap_perm.value), logger=LOGGER)
svc_perm_getcapabilities = models.UserResourcePermission(
user_id=anonymous_user.id,
perm_name=getcap_perm.value,
resource_id=svc.resource_id
)
db_session.add(svc_perm_getcapabilities)
transaction.commit()
if push_to_phoenix:
return _phoenix_update_services(services_dict)
return True
def _load_config(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Str, bool) -> Union
"""
Loads a YAML/JSON file path or pre-loaded dictionary configuration.
"""
try:
if isinstance(path_or_dict, six.string_types):
with open(path_or_dict, mode="r", encoding="utf-8") as yml_file:
cfg = yaml.safe_load(yml_file)
else:
cfg = path_or_dict
return _expand_all(cfg[section])
except KeyError:
msg = "Config file section [{!s}] not found.".format(section)
if allow_missing:
print_log(msg, level=logging.WARNING, logger=LOGGER)
return {}
raise_log(msg, exception=RegistrationError, logger=LOGGER)
except Exception as exc:
raise_log("Invalid config file [{!r}]".format(exc), exception=RegistrationError, logger=LOGGER)
CONFIG_KNOWN_EXTENSIONS = frozenset([".cfg", ".json", ".yml", ".yaml"])
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["groups"], bool) -> GroupsConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["users"], bool) -> UsersConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["permissions"], bool) -> PermissionsConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["services"], bool) -> ServicesConfig
...
@overload
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Literal["webhooks"], bool) -> WebhooksConfig
...
def get_all_configs(path_or_dict, section, allow_missing=False):
# type: (Union[Str, CombinedConfig], Str, bool) -> MultiConfigs
"""
Loads all matched configurations.
Configurations are considered a valid match if they have one of the :py:data:`CONFIG_KNOWN_EXTENSIONS` (if path)
and that loaded (or passed) configurations contain the specified :paramref:`section` name.
If the input is a directory path, loads any number of files contained in it that fulfill matching conditions.
If it is a path pointing to a single valid configuration file, loads it by itself.
If a dictionary is passed, returns it directly if it fulfills validation.
:param path_or_dict: directory path, file path or literal dictionary.
:param section: section name that must be inside every matched configuration file to be loaded.
:param allow_missing: allow to have no valid configuration after all are resolved, otherwise raises.
:raises RegistrationError: when no valid configuration can be found and empty one is not allowed.
:returns:
- list of configurations loaded if input was a directory path
- list of single configuration if input was a file path
- list of single configuration if input was a JSON dict
- empty list if none of the other cases where matched
.. note::
Order of file loading will be resolved by alphabetically sorted filename if specifying a directory path.
"""
if isinstance(path_or_dict, six.string_types):
if os.path.isdir(path_or_dict):
dir_path = os.path.abspath(path_or_dict)
cfg_names = list(sorted({fn for fn in os.listdir(dir_path)
if any([fn.endswith(ext) for ext in CONFIG_KNOWN_EXTENSIONS])}))
return [_load_config(os.path.join(dir_path, fn), section, allow_missing) for fn in cfg_names]
if os.path.isfile(path_or_dict):
return [_load_config(path_or_dict, section, allow_missing)]
elif isinstance(path_or_dict, dict):
return [_load_config(path_or_dict, section, allow_missing)]
return []
def _expand_all(config):
# type: (JSON) -> JSON
"""
Applies environment variable expansion recursively to all applicable fields of a configuration definition.
"""
if isinstance(config, dict):
for cfg in list(config):
cfg_key = os.path.expandvars(cfg)
if cfg_key != cfg:
config[cfg_key] = config.pop(cfg)
config[cfg_key] = _expand_all(config[cfg_key])
elif isinstance(config, (list, set)):
for i, cfg in enumerate(config):
config[i] = _expand_all(cfg)
elif isinstance(config, six.string_types):
config = os.path.expandvars(str(config))
elif isinstance(config, (int, bool, float, type(None))):
pass
else:
raise NotImplementedError("unknown parsing of config of type: {}".format(type(config)))
return config
def magpie_register_services_from_config(service_config_path, push_to_phoenix=False, skip_registration=False,
force_update=False, disable_getcapabilities=False, db_session=None):
# type: (Str, bool, bool, bool, bool, Optional[Session]) -> ServicesSettings
"""
Registers Magpie services from one or many `providers.cfg` file.
Uses the provided DB session to directly update service definitions, or uses API request routes as admin. Optionally
pushes updates to Phoenix.
:param service_config_path: where to look for `providers` configuration(s). Directory or file path.
:param push_to_phoenix: whether to push loaded service definitions to remote `Phoenix` service.
:param skip_registration: Load, validate and combine :term:`Service` configurations, but don't register them.
:param force_update: override service definitions that conflict by name with registered ones.
:param disable_getcapabilities:
Skip `GetCapabilities` request validation and permission update.
By default, any service with `type` that allows `GetCapabilities` permissions will be tested to ensure it can
be reached on the provided `url`. Once validated, this permission is applied to `anonymous` group to make its
entrypoint accessible by anyone.
Services that cannot have `GetCapabilities` permission are ignored regardless.
:param db_session: Use a pre-established database connection for registration. Otherwise, API requests are employed.
:returns: loaded service configurations.
"""
LOGGER.info("Starting services processing.")
services_configs = get_all_configs(service_config_path, "providers") # type: List[ServicesConfig]
services_config_count = len(services_configs)
LOGGER.log(logging.INFO if services_config_count else logging.WARNING,
"Found %s service configurations to process", services_config_count)
merged_service_configs = {}
for services in services_configs:
if not services:
LOGGER.warning("Services configuration are empty.")
continue
if force_update:
merged_service_configs.update(services)
else:
for svc, svc_cfg in services.items():
merged_service_configs.setdefault(svc, svc_cfg)
merged_service_configs = validate_services_config(merged_service_configs)
if not skip_registration:
# register services using API POSTs
if db_session is None:
admin_usr = get_constant("MAGPIE_ADMIN_USER")
admin_pwd = get_constant("MAGPIE_ADMIN_PASSWORD")
local_provider = get_constant("MAGPIE_DEFAULT_PROVIDER")
_magpie_register_services_with_requests(merged_service_configs, push_to_phoenix,
admin_usr, admin_pwd, local_provider,
force_update=force_update,
disable_getcapabilities=disable_getcapabilities)
# register services directly to db using session
else:
_magpie_register_services_with_db_session(merged_service_configs, db_session,
push_to_phoenix=push_to_phoenix, force_update=force_update,
update_getcapabilities_permissions=not disable_getcapabilities)
LOGGER.info("All services processed.")
return merged_service_configs
def _handle_permission(message, permission_index, trail=", skipping...", detail=None, permission=None,
level=logging.WARN, raise_errors=False):
# type: (Str, int, Str, Optional[Str], Optional[Str], Union[Str, int], bool) -> None
"""
Logs a message related to a 'permission' entry and raises an error if required.
Log message format is as follows (detail portion omitted if none provided)::
{message} [permission: #{permission_index}] [{permission}]{trail}
Detail: [{detail}]
Such that the following logging entry is generated (omitting any additional logging formatters)::
>> log_permission("test", 1, " skip test...", "just a test", "fake")
test [permission: #1] [fake] skip test...
Detail: [just a test]
:param message: base message to log
:param permission_index: index of the permission in the configuration list for traceability
:param trail: trailing message appended after the base message
:param detail: additional details appended after the trailing message after moving to another line.
:param permission: permission name to log just before the trailing message.
:param level: logging level (default: ``logging.WARN``)
:param raise_errors: raises errors related to permissions, instead of just logging the info.
.. seealso::
`magpie/config/permissions.cfg`
"""
trail = "{}\nDetail: [{!s}]".format(trail, detail) if detail else (trail or "")
permission = " [{!s}]".format(permission) if permission else ""
msg = "{} [permission #{}]{}{}".format(message, permission_index, permission, trail)
LOGGER.log(level, msg)
if raise_errors:
raise RegistrationConfigurationError(msg)
def _use_request(cookies_or_session):
return not isinstance(cookies_or_session, Session)
def _parse_resource_path(permission_config_entry, # type: PermissionConfigItem
entry_index, # type: int
service_info, # type: JSON
cookies_or_session=None, # type: CookiesOrSessionType
magpie_url=None, # type: Optional[Str]
raise_errors=False # type: bool
): # type: (...) -> Tuple[Optional[int], bool]
"""
Parses the `resource` field of a permission config entry and retrieves the final resource id. Creates missing
resources as necessary if they can be automatically resolved.
If `cookies` are provided, uses requests to a running `Magpie` instance (with ``magpie_url``) to apply permission.
If `session` to db is provided, uses direct db connection instead to apply permission.
:returns: tuple of found id (if any, ``None`` otherwise), and success status of the parsing operation (error)
"""
# pylint: disable=C0415 # avoid circular imports
if not magpie_url and _use_request(cookies_or_session):
raise RegistrationValueError("cannot use cookies without corresponding request URL")
resource = None
resource_path = permission_config_entry.get("resource", "").strip("/")
resource_type_config = permission_config_entry.get("type")
if resource_path:
try:
svc_name = service_info["service_name"]
svc_type = service_info["service_type"]
# Prepare a list of types that fits with the list of resources
resource_type_list = resource_type_config.strip("/").split("/") if resource_type_config else [None]
resource_list = resource_path.split("/")
if len(resource_type_list) == 1:
# if only one type specified, assume every path of the resource uses the same resource type
resource_type_list = resource_type_list * len(resource_list)
if len(resource_list) != len(resource_type_list):
raise RegistrationConfigurationError("Invalid resource type found in configuration : " +
permission_config_entry.get("type"))
res_path = None
if _use_request(cookies_or_session):
res_path = magpie_url + ServiceResourcesAPI.path.format(service_name=svc_name)
res_resp = requests.get(res_path, cookies=cookies_or_session, timeout=5)
res_dict = get_json(res_resp)[svc_name] # type: JSON
else:
from magpie.api.management.service.service_formats import format_service_resources
svc = models.Service.by_service_name(svc_name, db_session=cookies_or_session)
res_dict = format_service_resources(svc, show_all_children=True, db_session=cookies_or_session)
parent = res_dict["resource_id"]
child_resources = res_dict["resources"] # type: Dict[Str, JSON]
for res, resource_type in zip(resource_list, resource_type_list):
# search in existing children resources
if len(child_resources):
res_id = list(filter(lambda r: res in [r, child_resources[r]["resource_name"]], child_resources))
if res_id:
res_info = child_resources[res_id[0]] # type: Dict[Str, JSON]
child_resources = res_info["children"] # update next sub-resource iteration
parent = res_info["resource_id"]
continue
# missing resource, attempt creation
svc_res_types = SERVICE_TYPE_DICT[svc_type].resource_type_names
type_count = len(svc_res_types)
if type_count == 0:
_handle_permission("Cannot generate resource", entry_index, raise_errors=True,
detail="Service [{!s}] of type [{!s}] doesn't allow any sub-resource types. "
.format(svc_name, svc_type))
if type_count != 1 and not (isinstance(resource_type, six.string_types) and resource_type):
_handle_permission("Cannot automatically generate resource", entry_index, raise_errors=True,
detail="Service [{!s}] of type [{!s}] allows more than 1 sub-resource "
"types ({}). Type must be explicitly specified for auto-creation. "
"Available choices are: {}"
.format(svc_name, svc_type, type_count, svc_res_types))
if type_count != 1 and resource_type not in svc_res_types:
_handle_permission("Cannot generate resource", entry_index, raise_errors=True,
detail="Service [{!s}] of type [{!s}] allows more than 1 sub-resource "
"types ({}). Specified type [{!s}] doesn't match any of the allowed "
"resource types. Available choices are: {}"
.format(svc_name, svc_type, type_count, resource_type, svc_res_types))
res_type = resource_type or svc_res_types[0]
if _use_request(cookies_or_session):
body = {"resource_name": res, "resource_type": res_type, "parent_id": parent}
resp = requests.post(res_path, json=body, cookies=cookies_or_session, timeout=5)
else:
from magpie.api.management.resource.resource_utils import create_resource
resp = create_resource(res, res, res_type, parent, db_session=cookies_or_session)
if resp.status_code != 201:
resp.raise_for_status()
child_resources = {}
parent = get_json(resp)["resource"]["resource_id"]
resource = parent
if not resource:
raise RegistrationConfigurationError("Could not extract child resource from resource path.")
except HTTPException as exc:
detail = "{} ({}), {!s}".format(type(exc).__name__, exc.code, exc)
_handle_permission("Failed resources parsing.", entry_index, detail=detail, raise_errors=raise_errors)
return None, False
except Exception as exc:
_handle_permission("Failed resources parsing.", entry_index, detail=repr(exc), raise_errors=raise_errors)
return None, False
return resource, True
def _apply_permission_entry(permission_config_entry, # type: PermissionConfigItem
entry_index, # type: int
resource_id, # type: int
cookies_or_session, # type: CookiesOrSessionType
magpie_url, # type: Str
users, # type: UsersSettings
groups, # type: GroupsSettings
raise_errors=False, # type: bool
): # type: (...) -> None
"""
Applies the single permission entry retrieved from the permission configuration.
Assumes that permissions fields where pre-validated. Permission is applied for the user/group/resource using request
or db session accordingly to arguments.
"""
def _apply_request(_usr_name=None, _grp_name=None):
# type: (Optional[Str], Optional[Str]) -> Optional[AnyResponseType]
"""
Apply operation using HTTP request.
"""
action_oper = None
if _usr_name:
action_oper = UserResourcePermissionsAPI.path.format(user_name=_usr_name, resource_id=resource_id)
if _grp_name:
action_oper = GroupResourcePermissionsAPI.path.format(group_name=_grp_name, resource_id=resource_id)
if not action_oper:
return None
action_func = requests.post if create_perm else requests.delete
action_body = {"permission": perm.json()}
action_path = "{url}{path}".format(url=magpie_url, path=action_oper)
action_resp = action_func(action_path, json=action_body, cookies=cookies_or_session, timeout=5)
return action_resp
def _apply_session(_usr_name=None, _grp_name=None):
# type: (Optional[Str], Optional[Str]) -> AnyResponseType
"""
Apply operation using db session.
"""
# pylint: disable=C0415 # avoid circular imports
# pylint: disable=R1705 # aligned methods are easier to read
from magpie.api.management.group import group_utils as gt
from magpie.api.management.user import user_utils as ut
res = ResourceService.by_resource_id(resource_id, db_session=cookies_or_session)
if _usr_name:
usr = UserService.by_user_name(_usr_name, db_session=cookies_or_session)
if create_perm:
return ut.create_user_resource_permission_response(usr, res, perm, overwrite=True,
db_session=cookies_or_session)
else:
return ut.delete_user_resource_permission_response(usr, res, perm,
db_session=cookies_or_session)
if _grp_name:
grp = GroupService.by_group_name(_grp_name, db_session=cookies_or_session)
if create_perm:
return gt.create_group_resource_permission_response(grp, res, perm, overwrite=True,
db_session=cookies_or_session)
else:
return gt.delete_group_resource_permission_response(grp, res, perm,
db_session=cookies_or_session)
def _apply_profile(_usr_name=None, _grp_name=None):
# type: (Optional[Str], Optional[Str]) -> AnyResponseType
"""
Creates the user/group profile as required.
"""
password = pseudo_random_string(length=get_constant("MAGPIE_PASSWORD_MIN_LENGTH"))
usr_data = {
"user_name": _usr_name,
"password": users.get(_usr_name, {}).get("password", password),
"email": users.get(_usr_name, {}).get("email", "{}@mail.com".format(_usr_name)),
"group_name": users.get(_usr_name, {}).get("group", get_constant("MAGPIE_ANONYMOUS_GROUP"))
}
grp_data = {
"group_name": _grp_name,
"description": groups.get(_grp_name, {}).get("description", ""),
"discoverable": groups.get(_grp_name, {}).get("discoverable", False),
"terms": groups.get(_grp_name, {}).get("terms", "")
}
if _use_request(cookies_or_session):
if _usr_name:
path = "{url}{path}".format(url=magpie_url, path=UsersAPI.path)
return requests.post(path, json=usr_data, cookies=cookies_or_session, timeout=5)
if _grp_name:
path = "{url}{path}".format(url=magpie_url, path=GroupsAPI.path)
return requests.post(path, json=grp_data, cookies=cookies_or_session, timeout=5)
else:
if _usr_name:
from magpie.api.management.user.user_utils import create_user
usr_data["db_session"] = cookies_or_session # back-compatibility python 2 cannot have kw after **unpack
return create_user(**usr_data)
if _grp_name:
grp_data["db_session"] = cookies_or_session # back-compatibility python 2 cannot have kw after **unpack
from magpie.api.management.group.group_utils import create_group
return create_group(**grp_data)
def _validate_response(operation, is_create, item_type="Permission"):
# type: (Callable[[], Optional[AnyResponseType]], bool, str) -> None
"""
Validate action/operation applied and handles raised ``HTTPException`` as returned response.
"""
if not islambda(operation):
raise TypeError("invalid use of method")
try:
_resp = operation()
if _resp is None:
return
except HTTPException as exc:
_resp = exc
except Exception:
raise
# validation according to status code returned
if is_create:
if _resp.status_code in [200, 201]: # update/create
_handle_permission("{} successfully created.".format(item_type), entry_index,
level=logging.INFO, trail="")
elif _resp.status_code == 409:
_handle_permission("{} already exists.".format(item_type), entry_index, level=logging.INFO)
else:
_handle_permission("Unknown response [{}]".format(_resp.status_code), entry_index,
permission=permission_config_entry, level=logging.ERROR, raise_errors=raise_errors)
else:
if _resp.status_code == 200:
_handle_permission("{} successfully removed.".format(item_type), entry_index,
level=logging.INFO, trail="")
elif _resp.status_code == 404: