1
1
import traceback
2
2
from datetime import datetime
3
- from typing import Any , Dict , List , Tuple , Union
3
+ from typing import Any , Dict , List , Tuple , Union , Optional
4
4
5
5
import argo_workflows
6
6
from argo_workflows .api import (
@@ -75,7 +75,7 @@ def __init__(self, dry_run: bool = False):
75
75
)
76
76
self .artifact_api_instance = artifact_service_api .ArtifactServiceApi (api_client )
77
77
78
- def _get_workflow_details_dict (self , workflow_name : Union [str , None ]) -> Dict :
78
+ def _get_workflow_details_dict (self , workflow_name : Optional [str ]) -> Dict :
79
79
return self .api_instance .get_workflow (
80
80
namespace = ARGO_NAMESPACE ,
81
81
name = workflow_name ,
@@ -84,7 +84,7 @@ def _get_workflow_details_dict(self, workflow_name: Union[str, None]) -> Dict:
84
84
_check_return_type = False ,
85
85
).to_dict ()
86
86
87
- def _get_archived_workflow_details_dict (self , uid : Union [str , None ]) -> Dict :
87
+ def _get_archived_workflow_details_dict (self , uid : Optional [str ]) -> Dict :
88
88
"""
89
89
Queries the archived workflows api.
90
90
Raises a argo_workflows.exceptions.NotFoundException if the workflow uid cannot be found
@@ -206,7 +206,7 @@ def _get_lock_for_user(self, username: str) -> Lock:
206
206
return self .user_locks [username ]
207
207
208
208
def get_workflow_details (
209
- self , workflow_name : Union [str , None ], uid : Union [str , None ] = None
209
+ self , workflow_name : Optional [str ], uid : Optional [str ] = None
210
210
) -> Union [Dict [str , Any ], str ]:
211
211
"""
212
212
Gets the workflow status
@@ -381,7 +381,7 @@ def _get_archived_workflow_wf_name_and_team_project(
381
381
return given_name , team_project , gen3username
382
382
383
383
def get_workflows_for_team_projects_and_user (
384
- self , team_projects : List [str ], auth_header : Union [str , None ]
384
+ self , team_projects : List [str ], auth_header : Optional [str ]
385
385
) -> List [Dict ]:
386
386
team_project_workflows = self .get_workflows_for_team_projects (team_projects )
387
387
user_workflows = self .get_workflows_for_user (auth_header )
@@ -418,7 +418,7 @@ def get_workflows_for_team_project(self, team_project: str) -> List[Dict]:
418
418
workflows = self .get_workflows_for_label_selector (label_selector = label_selector )
419
419
return workflows
420
420
421
- def get_workflows_for_user (self , auth_header : Union [str , None ]) -> List [Dict ]:
421
+ def get_workflows_for_user (self , auth_header : Optional [str ]) -> List [Dict ]:
422
422
"""
423
423
Get the list of all workflows for the current user. Each item in the list
424
424
contains the workflow name, its status, start and end time.
@@ -602,7 +602,7 @@ def get_workflow_logs(self, workflow_name: str, uid: str) -> List[Dict[str, Any]
602
602
f"could not get status of { workflow_name } , workflow does not exist"
603
603
)
604
604
605
- def workflow_submission (self , request_body : Dict , auth_header : Union [str , None ]):
605
+ def workflow_submission (self , request_body : Dict , auth_header : Optional [str ]):
606
606
# Lock function so only one can run at a time per user
607
607
username = argo_engine_helper .get_username_from_token (auth_header )
608
608
user_lock = self ._get_lock_for_user (username )
@@ -711,7 +711,7 @@ def check_user_info_for_billing_id_and_workflow_limit(self, request_token):
711
711
return None , None
712
712
713
713
def check_user_monthly_workflow_cap (
714
- self , request_token , billing_id , custom_limit : Union [int , None ]
714
+ self , request_token , billing_id , custom_limit : Optional [int ]
715
715
):
716
716
"""
717
717
Query Argo service to see how many workflow runs user already
0 commit comments