12
12
import os
13
13
import subprocess
14
14
import tempfile
15
+ from typing import TYPE_CHECKING
16
+ from urllib .parse import urlparse
15
17
16
18
import mock
17
19
import six
18
20
19
21
from magpie .cli import batch_update_permissions , batch_update_users , magpie_helper_cli
20
22
from magpie .constants import get_constant
23
+ from magpie .permissions import Permission , PermissionType
24
+ from magpie .services import ServiceAPI , ServiceWPS
21
25
from tests import runner , utils
22
26
23
27
if six .PY2 :
24
28
from backports import tempfile as tempfile2 # noqa # pylint: disable=E0611,no-name-in-module # Python 2
25
29
else :
26
30
tempfile2 = tempfile # pylint: disable=C0103,invalid-name
31
+ if TYPE_CHECKING :
32
+ from magpie .typedefs import JSON
27
33
28
34
KNOWN_HELPERS = [
29
35
"batch_update_users" ,
@@ -120,34 +126,34 @@ def test_magpie_batch_update_permissions_command():
120
126
"-P" , test_admin_pwd ,
121
127
]
122
128
123
- # cleanup in case of previous failure
129
+ def mock_request (method , url , * args , ** kwargs ):
130
+ _path = urlparse (url ).path
131
+ # because CLI utility does multiple login tests, we must force TestApp logout to forget session
132
+ # otherwise, error is raised because of user session mismatch between previous login and new one requested
133
+ if _path .startswith ("/signin" ):
134
+ utils .check_or_try_logout_user (test_app )
135
+ return utils .test_request (test_app , method , _path , * args , ** kwargs )
136
+
124
137
_ , test_admin_cookies = utils .check_or_try_login_user (test_app , username = test_admin_usr , password = test_admin_pwd )
138
+
139
+ # cleanup in case of previous run
125
140
utils .TestSetup .delete_TestUser (test_app , override_user_name = test_usr , override_cookies = test_admin_cookies )
126
141
utils .TestSetup .delete_TestGroup (test_app , override_group_name = test_grp , override_cookies = test_admin_cookies )
127
142
utils .TestSetup .delete_TestService (test_app , override_service_name = test_api , override_cookies = test_admin_cookies )
128
143
utils .TestSetup .delete_TestService (test_app , override_service_name = test_wps , override_cookies = test_admin_cookies )
129
144
130
- def mock_request (* args , ** kwargs ):
131
- method , url , args = args [0 ], args [1 ], args [2 :]
132
- path = url .replace (test_url , "" )
133
- # because CLI utility does multiple login tests, we must force TestApp logout to forget session
134
- # otherwise, error is raised because of user session mismatch between previous login and new one requested
135
- if path .startswith ("/signin" ):
136
- utils .check_or_try_logout_user (test_app )
137
- return utils .test_request (test_app , method , path , * args , ** kwargs )
145
+ utils .TestSetup .create_TestService (test_app ,
146
+ override_service_name = test_api ,
147
+ override_service_type = ServiceAPI .service_type )
148
+ utils .TestSetup .create_TestService (test_app ,
149
+ override_service_name = test_wps ,
150
+ override_service_type = ServiceWPS .service_type )
138
151
139
- def run_command (operation_name , operation_args ):
140
- with tempfile2 .TemporaryDirectory () as tmpdir :
141
- with mock .patch ("requests.Session.request" , side_effect = mock_request ):
142
- with mock .patch ("requests.request" , side_effect = mock_request ):
143
- err_code = batch_update_permissions .main (operation_args )
144
- assert err_code == 0 , "failed execution due to invalid arguments"
145
- assert len (os .listdir (tmpdir )) == 1 , "utility should have produced 1 output file"
146
- file = os .path .join (tmpdir , os .listdir (tmpdir )[0 ])
147
- utils .check_val_is_in (operation_name , file )
148
- assert os .path .isfile (file )
149
- with open (file , mode = "r" , encoding = "utf-8" ) as fd :
150
- file_text = fd .read ()
152
+ def run_command (__operation_name , operation_args ):
153
+ with mock .patch ("requests.Session.request" , side_effect = mock_request ):
154
+ with mock .patch ("requests.request" , side_effect = mock_request ):
155
+ err_code = batch_update_permissions .main (operation_args )
156
+ assert err_code == 0 , "failed execution due to invalid arguments"
151
157
152
158
with contextlib .ExitStack () as stack :
153
159
tmp_file = stack .enter_context (tempfile .NamedTemporaryFile (mode = "w" , suffix = ".json" ))
@@ -156,8 +162,8 @@ def run_command(operation_name, operation_args):
156
162
{
157
163
"group" : test_grp ,
158
164
"service" : test_api ,
159
- "resource" : "/api/ v1" ,
160
- "permission" : "read" ,
165
+ "resource" : "/v1" ,
166
+ "permission" : Permission . READ . value ,
161
167
"action" : "create" ,
162
168
}
163
169
]
@@ -173,7 +179,7 @@ def run_command(operation_name, operation_args):
173
179
{
174
180
"user" : test_usr ,
175
181
"service" : test_wps ,
176
- "permission" : "getcapabilities" ,
182
+ "permission" : Permission . GET_CAPABILITIES . value ,
177
183
"action" : "create" ,
178
184
}
179
185
]
@@ -186,6 +192,34 @@ def run_command(operation_name, operation_args):
186
192
test_args += ["--verbose" ]
187
193
run_command ("batch_update_permissions" , test_args )
188
194
195
+ path = f"/groups/{ test_grp } /resources"
196
+ resp = utils .test_request (test_app , "GET" , path )
197
+ body = utils .check_response_basic_info (resp )
198
+ res_api = body ["resources" ][ServiceAPI .service_type ][test_api ] # type: JSON
199
+ res_wps = body ["resources" ][ServiceWPS .service_type ][test_wps ] # type: JSON
200
+ perms_res_api = res_api ["permissions" ]
201
+ perms_res_wps = res_wps ["permissions" ]
202
+ utils .check_val_equal (len (perms_res_wps ), 0 )
203
+ utils .check_val_equal (len (perms_res_api ), 0 , msg = "Permission should not be created directly on the service." )
204
+ res_res_api = res_api ["resources" ]
205
+ utils .check_val_equal (len (res_res_api ), 1 , msg = "Resource should have been created dynamically." )
206
+ perms_sub_api = res_res_api [list (res_res_api )[0 ]]["permissions" ] # type: JSON
207
+ utils .check_val_equal (len (perms_sub_api ), 1 )
208
+ utils .check_val_equal (perms_sub_api [0 ]["name" ], Permission .READ .value )
209
+ utils .check_val_equal (perms_sub_api [0 ]["type" ], PermissionType .INHERITED .value )
210
+
211
+ path = f"/users/{ test_usr } /resources"
212
+ resp = utils .test_request (test_app , "GET" , path )
213
+ body = utils .check_response_basic_info (resp )
214
+ res_api = body ["resources" ][ServiceAPI .service_type ][test_api ]
215
+ res_wps = body ["resources" ][ServiceWPS .service_type ][test_wps ]
216
+ perms_res_api = res_api ["permissions" ]
217
+ perms_res_wps = res_wps ["permissions" ]
218
+ utils .check_val_equal (len (perms_res_api ), 0 )
219
+ utils .check_val_equal (len (perms_res_wps ), 1 )
220
+ utils .check_val_equal (perms_res_wps [0 ]["name" ], Permission .GET_CAPABILITIES .value )
221
+ utils .check_val_equal (perms_res_wps [0 ]["type" ], PermissionType .DIRECT .value )
222
+
189
223
190
224
@runner .MAGPIE_TEST_CLI
191
225
@runner .MAGPIE_TEST_LOCAL
0 commit comments