Skip to content

Commit b71c239

Browse files
committed
Merge hsfs Job into hopsworks_common
1 parent 48c2e8a commit b71c239

File tree

4 files changed

+118
-254
lines changed

4 files changed

+118
-254
lines changed

Diff for: python/hopsworks_common/job.py

+110-15
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
# limitations under the License.
1515
#
1616

17+
from __future__ import annotations
18+
1719
import json
1820
from datetime import datetime, timezone
1921

2022
import humps
2123
from hopsworks_common import util
24+
from hopsworks_common.client.exceptions import JobExecutionException
2225
from hopsworks_common.core import execution_api, job_api
2326
from hopsworks_common.engine import execution_engine
2427
from hopsworks_common.job_schedule import JobSchedule
@@ -52,6 +55,7 @@ def __init__(
5255
self._creator = creator
5356
self._executions = executions
5457
self._project_id = project_id
58+
self._href = href
5559
self._job_schedule = (
5660
JobSchedule.from_response_json(job_schedule)
5761
if job_schedule
@@ -64,7 +68,8 @@ def __init__(
6468
self._job_api = job_api.JobsApi(project_id, project_name)
6569

6670
@classmethod
67-
def from_response_json(cls, json_dict, project_id, project_name):
71+
def from_response_json(cls, json_dict, project_id=None, project_name=None):
72+
print(json_dict)
6873
if "items" in json_dict:
6974
jobs = []
7075
for job in json_dict["items"]:
@@ -131,36 +136,92 @@ def job_schedule(self):
131136
"""Return the Job schedule"""
132137
return self._job_schedule
133138

134-
def run(self, args: str = None, await_termination: bool = False):
135-
"""Run the job, with the option of passing runtime arguments.
139+
@property
140+
def executions(self):
141+
return self._executions
136142

137-
Example of a blocking execution and downloading logs once execution is finished.
143+
@property
144+
def href(self):
145+
return self._href
138146

139-
```python
147+
@property
148+
def config(self):
149+
"""Configuration for the job"""
150+
return self._config
140151

141-
# Run the job
142-
execution = job.run(await_termination=True)
152+
def run(self, args: str = None, await_termination: bool = True):
153+
"""Run the job.
143154
144-
# True if job executed successfully
145-
print(execution.success)
155+
Run the job, by default awaiting its completion, with the option of passing runtime arguments.
146156
147-
# Download logs
148-
out_log_path, err_log_path = execution.download_logs()
157+
!!! example
158+
```python
159+
# connect to the Feature Store
160+
fs = ...
161+
162+
# get the Feature Group instances
163+
fg = fs.get_or_create_feature_group(...)
164+
165+
# insert in to feature group
166+
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})
167+
168+
# run job
169+
execution = job.run()
170+
171+
# True if job executed successfully
172+
print(execution.success)
173+
174+
# Download logs
175+
out_log_path, err_log_path = execution.download_logs()
176+
```
149177
150-
```
151178
# Arguments
152-
args: optional runtime arguments for the job
153-
await_termination: if True wait until termination is complete
179+
args: Optional runtime arguments for the job.
180+
await_termination: Identifies if the client should wait for the job to complete, defaults to True.
154181
# Returns
155182
`Execution`. The execution object for the submitted run.
156183
"""
184+
print(f"Launching job: {self.name}")
157185
execution = self._execution_api._start(self, args=args)
158-
print(execution.get_url())
186+
print(
187+
f"Job started successfully, you can follow the progress at \n{execution.get_url()}"
188+
)
159189
if await_termination:
160190
return self._execution_engine.wait_until_finished(self, execution)
161191
else:
162192
return execution
163193

194+
def get_state(self):
195+
"""Get the state of the job.
196+
197+
# Returns
198+
`state`. Current state of the job, which can be one of the following:
199+
`INITIALIZING`, `INITIALIZATION_FAILED`, `FINISHED`, `RUNNING`, `ACCEPTED`,
200+
`FAILED`, `KILLED`, `NEW`, `NEW_SAVING`, `SUBMITTED`, `AGGREGATING_LOGS`,
201+
`FRAMEWORK_FAILURE`, `STARTING_APP_MASTER`, `APP_MASTER_START_FAILED`,
202+
`GENERATING_SECURITY_MATERIAL`, `CONVERTING_NOTEBOOK`
203+
"""
204+
last_execution = self._job_api.last_execution(self)
205+
if len(last_execution) != 1:
206+
raise JobExecutionException("No executions found for job")
207+
208+
return last_execution[0].state
209+
210+
def get_final_state(self):
211+
"""Get the final state of the job.
212+
213+
# Returns
214+
`final_state`. Final state of the job, which can be one of the following:
215+
`UNDEFINED`, `FINISHED`, `FAILED`, `KILLED`, `FRAMEWORK_FAILURE`,
216+
`APP_MASTER_START_FAILED`, `INITIALIZATION_FAILED`. `UNDEFINED` indicates
217+
that the job is still running.
218+
"""
219+
last_execution = self._job_api.last_execution(self)
220+
if len(last_execution) != 1:
221+
raise JobExecutionException("No executions found for job")
222+
223+
return last_execution[0].final_status
224+
164225
def get_executions(self):
165226
"""Retrieves all executions for the job ordered by submission time.
166227
@@ -239,6 +300,40 @@ def unschedule(self):
239300
self._job_api._delete_schedule_job(self._name)
240301
self._job_schedule = None
241302

303+
def resume_schedule(self):
304+
"""Resumes the schedule of a Job execution"""
305+
if self._job_schedule is None:
306+
raise JobExecutionException("No schedule found for job")
307+
308+
job_schedule = JobSchedule(
309+
id=self._job_schedule.id,
310+
start_date_time=self._job_schedule.start_date_time,
311+
cron_expression=self._job_schedule.cron_expression,
312+
end_time=self._job_schedule.end_date_time,
313+
enabled=False,
314+
)
315+
return self._update_schedule(job_schedule)
316+
317+
def pause_schedule(self):
318+
"""Pauses the schedule of a Job execution"""
319+
if self._job_schedule is None:
320+
raise JobExecutionException("No schedule found for job")
321+
322+
job_schedule = JobSchedule(
323+
id=self._job_schedule.id,
324+
start_date_time=self._job_schedule.start_date_time,
325+
cron_expression=self._job_schedule.cron_expression,
326+
end_time=self._job_schedule.end_date_time,
327+
enabled=True,
328+
)
329+
return self._update_schedule(job_schedule)
330+
331+
def _update_schedule(self, job_schedule):
332+
self._job_schedule = self._job_api.create_or_update_schedule_job(
333+
self._name, job_schedule.to_dict()
334+
)
335+
return self._job_schedule
336+
242337
def json(self):
243338
return json.dumps(self, cls=util.Encoder)
244339

0 commit comments

Comments
 (0)