Skip to content

Commit fdb53ad

Browse files
committed
Partial merge of hsfs job/execution
1 parent b71c239 commit fdb53ad

File tree

13 files changed

+54
-55
lines changed

13 files changed

+54
-55
lines changed

Diff for: python/auto_doc.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@
5757
},
5858
"api/jobs.md": {
5959
"job_api_handle": ["hopsworks.project.Project.get_jobs_api"],
60-
"job_create": ["hopsworks.core.job_api.JobsApi.create_job"],
61-
"job_get": ["hopsworks.core.job_api.JobsApi.get_job"],
62-
"job_get_all": ["hopsworks.core.job_api.JobsApi.get_jobs"],
60+
"job_create": ["hopsworks.core.job_api.JobApi.create_job"],
61+
"job_get": ["hopsworks.core.job_api.JobApi.get_job"],
62+
"job_get_all": ["hopsworks.core.job_api.JobApi.get_jobs"],
6363
"job_properties": keras_autodoc.get_properties("hopsworks.job.Job"),
64-
"job_config": ["hopsworks.core.job_api.JobsApi.get_configuration"],
64+
"job_config": ["hopsworks.core.job_api.JobApi.get_configuration"],
6565
"job_methods": keras_autodoc.get_methods(
6666
"hopsworks.job.Job", exclude=["from_response_json", "json"]
6767
),

Diff for: python/hopsworks/core/execution_api.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
#
1616

1717
from hopsworks_common.core.execution_api import (
18-
ExecutionsApi,
18+
ExecutionApi,
1919
)
2020

2121

2222
__all__ = [
23-
ExecutionsApi,
23+
ExecutionApi,
2424
]

Diff for: python/hopsworks/core/job_api.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
#
1616

1717
from hopsworks_common.core.job_api import (
18-
JobsApi,
18+
JobApi,
1919
)
2020

2121

2222
__all__ = [
23-
JobsApi,
23+
JobApi,
2424
]

Diff for: python/hopsworks/project.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def __init__(
6262

6363
self._opensearch_api = opensearch_api.OpenSearchApi(project_id, project_name)
6464
self._kafka_api = kafka_api.KafkaApi(project_id, project_name)
65-
self._jobs_api = job_api.JobsApi(project_id, project_name)
65+
self._jobs_api = job_api.JobApi(project_id, project_name)
6666
self._flink_cluster_api = flink_cluster_api.FlinkClusterApi(
6767
project_id, project_name
6868
)
@@ -230,16 +230,16 @@ def get_opensearch_api(self):
230230
_client.download_certs(self.name)
231231
return self._opensearch_api
232232

233-
def get_jobs_api(self):
234-
"""Get the jobs api for the project.
233+
def get_job_api(self):
234+
"""Get the job API for the project.
235235
236236
# Returns
237-
`JobsApi`: The Jobs Api handle
237+
`JobApi`: The Job Api handle
238238
"""
239239
return self._jobs_api
240240

241241
def get_flink_cluster_api(self):
242-
"""Get the flink cluster api for the project.
242+
"""Get the flink cluster API for the project.
243243
244244
# Returns
245245
`FlinkClusterApi`: The Flink Cluster Api handle

Diff for: python/hopsworks_common/core/execution_api.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from hopsworks_common import client, execution
1818

1919

20-
class ExecutionsApi:
20+
class ExecutionApi:
2121
def __init__(
2222
self,
2323
project_id,

Diff for: python/hopsworks_common/core/flink_cluster_api.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(
3030
):
3131
self._project_id = project_id
3232
self._project_name = project_name
33-
self._job_api = job_api.JobsApi(project_id, project_name)
33+
self._job_api = job_api.JobApi(project_id, project_name)
3434

3535
def get_configuration(self):
3636
"""Get configuration for the Flink cluster.

Diff for: python/hopsworks_common/core/job_api.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from hopsworks_common.client.exceptions import RestAPIError
2121

2222

23-
class JobsApi:
23+
class JobApi:
2424
def __init__(
2525
self,
2626
project_id,

Diff for: python/hopsworks_common/engine/execution_engine.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
class ExecutionEngine:
2727
def __init__(self, project_id=None):
2828
self._dataset_api = dataset_api.DatasetApi(project_id)
29-
self._execution_api = execution_api.ExecutionsApi(project_id)
29+
self._execution_api = execution_api.ExecutionApi(project_id)
3030
self._log = logging.getLogger(__name__)
3131

3232
def download_logs(self, execution, path=None):

Diff for: python/hopsworks_common/execution.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def __init__(
6363
self._project_id = project_id
6464

6565
self._execution_engine = execution_engine.ExecutionEngine(project_id)
66-
self._execution_api = execution_api.ExecutionsApi(project_id)
66+
self._execution_api = execution_api.ExecutionApi(project_id)
6767

6868
@classmethod
6969
def from_response_json(cls, json_dict, project_id=None, job=None):

Diff for: python/hopsworks_common/flink_cluster.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def __init__(
3535
self._execution_id = None
3636

3737
self._execution_engine = execution_engine.ExecutionEngine(project_id)
38-
self._execution_api = execution_api.ExecutionsApi(project_id)
38+
self._execution_api = execution_api.ExecutionApi(project_id)
3939
self._flink_cluster_api = flink_cluster_api.FlinkClusterApi(
4040
project_id, project_name
4141
)

Diff for: python/hopsworks_common/job.py

+7-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import humps
2323
from hopsworks_common import util
24-
from hopsworks_common.client.exceptions import JobExecutionException
24+
from hopsworks_common.client.exceptions import JobException
2525
from hopsworks_common.core import execution_api, job_api
2626
from hopsworks_common.engine import execution_engine
2727
from hopsworks_common.job_schedule import JobSchedule
@@ -63,13 +63,12 @@ def __init__(
6363
)
6464

6565
self._execution_engine = execution_engine.ExecutionEngine(project_id)
66-
self._execution_api = execution_api.ExecutionsApi(project_id)
66+
self._execution_api = execution_api.ExecutionApi(project_id)
6767
self._execution_engine = execution_engine.ExecutionEngine(project_id)
68-
self._job_api = job_api.JobsApi(project_id, project_name)
68+
self._job_api = job_api.JobApi(project_id, project_name)
6969

7070
@classmethod
7171
def from_response_json(cls, json_dict, project_id=None, project_name=None):
72-
print(json_dict)
7372
if "items" in json_dict:
7473
jobs = []
7574
for job in json_dict["items"]:
@@ -203,7 +202,7 @@ def get_state(self):
203202
"""
204203
last_execution = self._job_api.last_execution(self)
205204
if len(last_execution) != 1:
206-
raise JobExecutionException("No executions found for job")
205+
raise JobException("No executions found for job")
207206

208207
return last_execution[0].state
209208

@@ -218,7 +217,7 @@ def get_final_state(self):
218217
"""
219218
last_execution = self._job_api.last_execution(self)
220219
if len(last_execution) != 1:
221-
raise JobExecutionException("No executions found for job")
220+
raise JobException("No executions found for job")
222221

223222
return last_execution[0].final_status
224223

@@ -303,7 +302,7 @@ def unschedule(self):
303302
def resume_schedule(self):
304303
"""Resumes the schedule of a Job execution"""
305304
if self._job_schedule is None:
306-
raise JobExecutionException("No schedule found for job")
305+
raise JobException("No schedule found for job")
307306

308307
job_schedule = JobSchedule(
309308
id=self._job_schedule.id,
@@ -317,7 +316,7 @@ def resume_schedule(self):
317316
def pause_schedule(self):
318317
"""Pauses the schedule of a Job execution"""
319318
if self._job_schedule is None:
320-
raise JobExecutionException("No schedule found for job")
319+
raise JobException("No schedule found for job")
321320

322321
job_schedule = JobSchedule(
323322
id=self._job_schedule.id,

Diff for: python/tests/core/test_job.py

+27-27
Original file line numberDiff line numberDiff line change
@@ -49,84 +49,84 @@ def test_from_response_json_empty(self, backend_fixtures):
4949

5050
def test_wait_for_job(self, mocker, backend_fixtures):
5151
# Arrange
52-
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
52+
mocker.patch("hopsworks_common.client.get_instance")
53+
mock_execution_api = mocker.patch(
54+
"hopsworks_common.core.execution_api.ExecutionApi",
55+
)
5356

5457
json = backend_fixtures["job"]["get"]["response"]
55-
j = job.Job.from_response_json(json)
58+
x = job.Job.from_response_json(json).run(await_termination=False)
5659

5760
# Act
58-
j._wait_for_job()
61+
x.await_termination()
5962

6063
# Assert
61-
assert mock_job_api.return_value.last_execution.call_count == 1
64+
assert mock_execution_api.return_value._get.call_count == 1
6265

6366
def test_wait_for_job_wait_for_job_false(self, mocker, backend_fixtures):
6467
# Arrange
65-
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
68+
mock_job_api = mocker.patch("hopsworks_common.core.execution_api.ExecutionApi")
6669

6770
json = backend_fixtures["job"]["get"]["response"]
68-
j = job.Job.from_response_json(json)
69-
70-
# Act
71-
j._wait_for_job(False)
71+
job.Job.from_response_json(json).run(await_termination=False)
7272

7373
# Assert
74-
assert mock_job_api.return_value.last_execution.call_count == 0
74+
assert mock_job_api.return_value._get.call_count == 0
7575

7676
def test_wait_for_job_final_status_succeeded(self, mocker, backend_fixtures):
7777
# Arrange
78-
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
78+
mock_job_api = mocker.patch("hopsworks_common.core.execution_api.ExecutionApi")
7979

8080
json = backend_fixtures["job"]["get"]["response"]
81-
j = job.Job.from_response_json(json)
81+
x = job.Job.from_response_json(json).run(await_termination=False)
8282

83-
mock_job_api.return_value.last_execution.return_value = [
83+
mock_job_api.return_value._get.return_value = [
8484
execution.Execution(id=1, state=None, final_status="succeeded")
8585
]
8686

8787
# Act
88-
j._wait_for_job()
88+
x.await_termination()
8989

9090
# Assert
91-
assert mock_job_api.return_value.last_execution.call_count == 1
91+
assert mock_job_api.return_value._get.call_count == 1
9292

9393
def test_wait_for_job_final_status_failed(self, mocker, backend_fixtures):
9494
# Arrange
95-
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
95+
mock_job_api = mocker.patch("hopsworks_common.core.execution_api.ExecutionApi")
9696

9797
json = backend_fixtures["job"]["get"]["response"]
98-
j = job.Job.from_response_json(json)
98+
x = job.Job.from_response_json(json).run(await_termination=False)
9999

100-
mock_job_api.return_value.last_execution.return_value = [
100+
mock_job_api.return_value._get.return_value = [
101101
execution.Execution(id=1, state=None, final_status="failed")
102102
]
103103

104104
# Act
105-
with pytest.raises(exceptions.FeatureStoreException) as e_info:
106-
j._wait_for_job()
105+
with pytest.raises(exceptions.JobException) as e_info:
106+
x.await_termination()
107107

108108
# Assert
109-
assert mock_job_api.return_value.last_execution.call_count == 1
109+
assert mock_job_api.return_value._get.call_count == 1
110110
assert (
111111
str(e_info.value)
112112
== "The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
113113
)
114114

115115
def test_wait_for_job_final_status_killed(self, mocker, backend_fixtures):
116116
# Arrange
117-
mock_job_api = mocker.patch("hsfs.core.job_api.JobApi")
117+
mock_job_api = mocker.patch("hopsworks_common.core.execution_api.ExecutionApi")
118118

119119
json = backend_fixtures["job"]["get"]["response"]
120-
j = job.Job.from_response_json(json)
120+
x = job.Job.from_response_json(json).run(await_termination=False)
121121

122-
mock_job_api.return_value.last_execution.return_value = [
122+
mock_job_api.return_value._get.return_value = [
123123
execution.Execution(id=1, state=None, final_status="killed")
124124
]
125125

126126
# Act
127-
with pytest.raises(exceptions.FeatureStoreException) as e_info:
128-
j._wait_for_job()
127+
with pytest.raises(exceptions.JobException) as e_info:
128+
x.await_termination()
129129

130130
# Assert
131-
assert mock_job_api.return_value.last_execution.call_count == 1
131+
assert mock_job_api.return_value._get.call_count == 1
132132
assert str(e_info.value) == "The Hopsworks Job was stopped"

Diff for: python/tests/engine/test_python.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2354,7 +2354,7 @@ def test_write_training_dataset(self, mocker):
23542354
mock_td_api = mocker.patch("hsfs.core.training_dataset_api.TrainingDatasetApi")
23552355
mocker.patch("hsfs.util.get_job_url")
23562356
mock_python_engine_wait_for_job = mocker.patch(
2357-
"hsfs.core.job.Job._wait_for_job"
2357+
"hopsworks_common.engine.execution_engine.ExecutionEngine.wait_until_finished"
23582358
)
23592359

23602360
python_engine = python.Engine()

0 commit comments

Comments
 (0)