Skip to content

Commit 50fe749

Browse files
added artifacts_dir and artifacts fields to job messages and results
1 parent 11e03d3 commit 50fe749

File tree

18 files changed

+107
-5
lines changed

18 files changed

+107
-5
lines changed

compute_horde/compute_horde/em_protocol/executor_requests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class V0FinishedRequest(BaseExecutorRequest, JobMixin):
5555
message_type: RequestType = RequestType.V0FinishedRequest
5656
docker_process_stdout: str # TODO: add max_length
5757
docker_process_stderr: str # TODO: add max_length
58+
artifacts: dict[str, str] | None = None
5859

5960

6061
class GenericError(BaseExecutorRequest):

compute_horde/compute_horde/em_protocol/miner_requests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class V0JobRequest(BaseMinerRequest, JobMixin):
4848
docker_run_cmd: list[str]
4949
volume: Volume | None = None
5050
output_upload: OutputUpload | None = None
51+
artifacts_dir: str | None = None
5152

5253
@model_validator(mode="after")
5354
def validate_at_least_docker_image_or_raw_script(self) -> Self:

compute_horde/compute_horde/fv_protocol/facilitator_requests.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class SignedFields(BaseModel):
122122
args: str
123123
env: dict[str, str]
124124
use_gpu: bool
125+
artifacts_dir: str
125126

126127
volumes: list[JsonValue]
127128
uploads: list[JsonValue]
@@ -137,6 +138,7 @@ def from_facilitator_sdk_json(data: JsonValue):
137138
args=str(data.get("args", "")),
138139
env=typing.cast(dict[str, str], data.get("env", None)),
139140
use_gpu=typing.cast(bool, data.get("use_gpu")),
141+
artifacts_dir=str(data.get("artifacts_dir", "")),
140142
volumes=typing.cast(list[JsonValue], data.get("volumes", [])),
141143
uploads=typing.cast(list[JsonValue], data.get("uploads", [])),
142144
)
@@ -166,6 +168,7 @@ class V2JobRequest(BaseModel, extra="forbid"):
166168
use_gpu: bool
167169
volume: Volume | None = None
168170
output_upload: OutputUpload | None = None
171+
artifacts_dir: str | None = None
169172
# !!! all fields above are included in the signed json payload
170173

171174
def get_args(self):
@@ -198,6 +201,7 @@ def get_signed_fields(self) -> SignedFields:
198201
args=" ".join(self.args),
199202
env=self.env,
200203
use_gpu=self.use_gpu,
204+
artifacts_dir=self.artifacts_dir or "",
201205
volumes=volumes,
202206
uploads=uploads,
203207
)

compute_horde/compute_horde/miner_client/organic.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ class OrganicJobDetails:
381381
total_job_timeout: int = 300
382382
volume: Volume | None = None
383383
output: OutputUpload | None = None
384+
artifacts_dir: str | None = None
384385

385386
def __post_init__(self):
386387
if (self.docker_image, self.raw_script) == (None, None):
@@ -470,6 +471,7 @@ async def run_organic_job(
470471
docker_run_cmd=job_details.docker_run_cmd,
471472
volume=job_details.volume,
472473
output_upload=job_details.output,
474+
artifacts_dir=job_details.artifacts_dir,
473475
)
474476
)
475477

@@ -487,7 +489,11 @@ async def run_organic_job(
487489
score=0, # no score for organic jobs (at least right now)
488490
)
489491

490-
return final_response.docker_process_stdout, final_response.docker_process_stderr
492+
return (
493+
final_response.docker_process_stdout,
494+
final_response.docker_process_stderr,
495+
final_response.artifacts,
496+
)
491497
except TimeoutError as exc:
492498
raise OrganicJobError(FailureReason.FINAL_RESPONSE_TIMED_OUT) from exc
493499
except Exception:

compute_horde/compute_horde/mv_protocol/miner_requests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class V0JobFinishedRequest(BaseMinerRequest, JobMixin):
9090
message_type: RequestType = RequestType.V0JobFinishedRequest
9191
docker_process_stdout: str # TODO: add max_length
9292
docker_process_stderr: str # TODO: add max_length
93+
artifacts: dict[str, str] | None = None
9394

9495

9596
class V0MachineSpecsRequest(BaseMinerRequest, JobMixin):

compute_horde/compute_horde/mv_protocol/validator_requests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class V0JobRequest(BaseValidatorRequest, JobMixin):
8686
docker_run_cmd: list[str]
8787
volume: Volume | None = None
8888
output_upload: OutputUpload | None = None
89+
artifacts_dir: str | None = None
8990

9091
@model_validator(mode="after")
9192
def validate_at_least_docker_image_or_raw_script(self) -> Self:

compute_horde/tests/test_run_organic_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ async def test_run_organic_job__success(keypair):
5656
transport=mock_transport,
5757
)
5858
job_details = OrganicJobDetails(job_uuid=JOB_UUID, docker_image="mock")
59-
stdout, stderr = await run_organic_job(client, job_details, wait_timeout=2)
59+
stdout, stderr, artifacts = await run_organic_job(client, job_details, wait_timeout=2)
6060

6161
assert stdout == "stdout"
6262
assert stderr == "stderr"

executor/app/src/compute_horde_executor/executor/management/commands/run_executor.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import csv
44
import io
55
import logging
6+
import os
67
import pathlib
78
import random
89
import re
@@ -69,6 +70,7 @@
6970
CVE_2022_0492_TIMEOUT_SECONDS = 120
7071
CVE_2024_0132_TIMEOUT_SECONDS = 120
7172
MAX_RESULT_SIZE_IN_RESPONSE = 2000
73+
MAX_ARTIFACT_SIZE = 16000
7274
TRUNCATED_RESPONSE_PREFIX_LEN = 1000
7375
TRUNCATED_RESPONSE_SUFFIX_LEN = 1000
7476
INPUT_VOLUME_UNPACK_TIMEOUT_SECONDS = 60 * 15
@@ -234,6 +236,7 @@ async def send_finished(self, job_result: "JobResult"):
234236
job_uuid=self.job_uuid,
235237
docker_process_stdout=job_result.stdout,
236238
docker_process_stderr=job_result.stderr,
239+
artifacts=job_result.artifacts,
237240
)
238241
)
239242

@@ -276,6 +279,7 @@ class JobResult(pydantic.BaseModel):
276279
timeout: bool
277280
stdout: str
278281
stderr: str
282+
artifacts: dict[str, str]
279283
specs: MachineSpecs | None = None
280284

281285

@@ -474,6 +478,7 @@ def __init__(self, initial_job_request: V0InitialJobRequest | V1InitialJobReques
474478
self.volume_mount_dir = self.temp_dir / "volume"
475479
self.output_volume_mount_dir = self.temp_dir / "output"
476480
self.specs_volume_mount_dir = self.temp_dir / "specs"
481+
self.artifacts_mount_dir = self.temp_dir / "artifacts"
477482
self.download_manager = DownloadManager()
478483

479484
self.job_container_name = f"ch-{settings.EXECUTOR_TOKEN}-job"
@@ -507,6 +512,7 @@ async def cleanup_potential_old_jobs(self):
507512
async def prepare(self):
508513
self.volume_mount_dir.mkdir(exist_ok=True)
509514
self.output_volume_mount_dir.mkdir(exist_ok=True)
515+
self.artifacts_mount_dir.mkdir(exist_ok=True)
510516

511517
logger.info("preparing in progress")
512518

@@ -554,6 +560,7 @@ async def start_job(self, job_request: V0JobRequest) -> JobResult | None:
554560
timeout=False,
555561
stdout=ex.description,
556562
stderr="",
563+
artifacts={},
557564
)
558565

559566
docker_image = job_request.docker_image_name
@@ -582,6 +589,7 @@ async def start_job(self, job_request: V0JobRequest) -> JobResult | None:
582589
timeout=False,
583590
stdout="",
584591
stderr="",
592+
artifacts={},
585593
exit_status=None,
586594
)
587595

@@ -595,6 +603,12 @@ async def start_job(self, job_request: V0JobRequest) -> JobResult | None:
595603
)
596604
await process.wait()
597605

606+
if job_request.artifacts_dir:
607+
extra_volume_flags += [
608+
"-v",
609+
f"{self.artifacts_mount_dir.as_posix()}/:{job_request.artifacts_dir}",
610+
]
611+
598612
self.cmd = [
599613
"docker",
600614
"run",
@@ -648,6 +662,7 @@ async def start_job(self, job_request: V0JobRequest) -> JobResult | None:
648662
stdout="",
649663
stderr=msg,
650664
exit_status=None,
665+
artifacts={},
651666
)
652667

653668
return None
@@ -708,6 +723,18 @@ async def wait_for_job(self, job_request: V0JobRequest) -> JobResult:
708723

709724
success = exit_status == 0
710725

726+
artifacts = {}
727+
for artifact_filename in os.listdir(self.artifacts_mount_dir):
728+
artifact_path = self.artifacts_mount_dir / artifact_filename
729+
if os.path.isfile(artifact_path):
730+
with open(artifact_path, "rb") as f:
731+
content = f.read()
732+
artifact_size = len(content)
733+
if artifact_size < MAX_ARTIFACT_SIZE:
734+
artifacts[artifact_filename] = base64.b64encode(content).decode()
735+
else:
736+
logger.error(f"Artefact {artifact_filename} too large: {artifact_size:,} bytes")
737+
711738
if success:
712739
# upload the output if requested and job succeeded
713740
if job_request.output_upload:
@@ -740,6 +767,7 @@ async def wait_for_job(self, job_request: V0JobRequest) -> JobResult:
740767
timeout=timeout,
741768
stdout=stdout,
742769
stderr=stderr,
770+
artifacts=artifacts,
743771
)
744772

745773
async def clean(self):

miner/app/src/compute_horde_miner/miner/liveness_check.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ async def drive_executor() -> float:
182182
contents=get_dummy_inline_zip_volume(),
183183
),
184184
output_upload=None,
185+
artifacts_dir=None,
185186
)
186187

187188
validator, _ = await Validator.objects.aget_or_create(
@@ -227,6 +228,7 @@ async def drive_executor() -> float:
227228
docker_run_cmd=job_request.docker_run_cmd,
228229
volume=job_request.volume,
229230
output_upload=job_request.output_upload,
231+
artifacts_dir=job_request.artifacts_dir,
230232
).model_dump(),
231233
},
232234
)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Generated by Django 4.2.16 on 2025-02-07 13:52
2+
3+
from django.db import migrations, models
4+
5+
import compute_horde_miner.miner.models
6+
7+
8+
class Migration(migrations.Migration):
9+
dependencies = [
10+
("miner", "0011_acceptedjob_executor_address"),
11+
]
12+
13+
operations = [
14+
migrations.AddField(
15+
model_name="acceptedjob",
16+
name="artifacts",
17+
field=models.JSONField(encoder=compute_horde_miner.miner.models.EnumEncoder, null=True),
18+
),
19+
]

miner/app/src/compute_horde_miner/miner/miner_consumer/executor_interface.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,15 @@ async def handle(self, msg: BaseExecutorRequest):
131131
self.job.status = AcceptedJob.Status.FINISHED
132132
self.job.stderr = msg.docker_process_stderr
133133
self.job.stdout = msg.docker_process_stdout
134+
self.job.artifacts = msg.artifacts or {}
134135

135136
await self.job.asave()
136137
await self.send_executor_finished(
137138
job_uuid=msg.job_uuid,
138139
executor_token=self.executor_token,
139140
stdout=msg.docker_process_stdout,
140141
stderr=msg.docker_process_stderr,
142+
artifacts=msg.artifacts,
141143
)
142144
if isinstance(msg, executor_requests.V0MachineSpecsRequest):
143145
await self.send_executor_specs(
@@ -168,6 +170,7 @@ async def _miner_job_request(self, msg: JobRequest):
168170
docker_run_cmd=msg.docker_run_cmd,
169171
volume=msg.volume,
170172
output_upload=msg.output_upload,
173+
artifacts_dir=msg.artifacts_dir,
171174
).model_dump_json()
172175
)
173176

miner/app/src/compute_horde_miner/miner/miner_consumer/layer_utils.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class JobRequest(pydantic.BaseModel):
4545
docker_run_cmd: list[str]
4646
volume: Volume | None = None
4747
output_upload: OutputUpload | None = None
48+
artifacts_dir: str | None = None
4849

4950
@model_validator(mode="after")
5051
def validate_at_least_docker_image_or_raw_script(self) -> Self:
@@ -62,6 +63,7 @@ class ExecutorFinished(pydantic.BaseModel):
6263
job_uuid: str
6364
docker_process_stdout: str
6465
docker_process_stderr: str
66+
artifacts: dict[str, str] | None
6567

6668

6769
class ExecutorFailed(pydantic.BaseModel):
@@ -180,6 +182,7 @@ async def send_job_request(self, executor_token, job_request: validator_requests
180182
docker_run_cmd=job_request.docker_run_cmd,
181183
volume=job_request.volume,
182184
output_upload=job_request.output_upload,
185+
artifacts_dir=job_request.artifacts_dir,
183186
).model_dump(),
184187
},
185188
)
@@ -252,7 +255,12 @@ async def send_executor_specs(self, job_uuid: str, executor_token: str, specs: M
252255
)
253256

254257
async def send_executor_finished(
255-
self, job_uuid: str, executor_token: str, stdout: str, stderr: str
258+
self,
259+
job_uuid: str,
260+
executor_token: str,
261+
stdout: str,
262+
stderr: str,
263+
artifacts: dict[str, str] | None,
256264
):
257265
group_name = ValidatorInterfaceMixin.group_name(executor_token)
258266
await self.channel_layer.group_send(
@@ -263,6 +271,7 @@ async def send_executor_finished(
263271
job_uuid=job_uuid,
264272
docker_process_stdout=stdout,
265273
docker_process_stderr=stderr,
274+
artifacts=artifacts,
266275
).model_dump(),
267276
},
268277
)

miner/app/src/compute_horde_miner/miner/miner_consumer/validator_interface.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ async def handle_authentication(self, msg: validator_requests.V0AuthenticateRequ
228228
job_uuid=str(job.job_uuid),
229229
docker_process_stdout=job.stdout,
230230
docker_process_stderr=job.stderr,
231+
artifacts=job.artifacts,
231232
).model_dump_json()
232233
)
233234
logger.debug(
@@ -568,6 +569,7 @@ async def _executor_finished(self, msg: ExecutorFinished):
568569
job_uuid=msg.job_uuid,
569570
docker_process_stdout=msg.docker_process_stdout,
570571
docker_process_stderr=msg.docker_process_stderr,
572+
artifacts=msg.artifacts,
571573
).model_dump_json()
572574
)
573575
logger.debug(f"Finished job {msg.job_uuid} reported to validator {self.validator_key}")

miner/app/src/compute_horde_miner/miner/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class Status(models.TextChoices):
5656
created_at = models.DateTimeField(auto_now_add=True)
5757
updated_at = models.DateTimeField(auto_now=True)
5858
executor_address = models.TextField(null=True)
59+
artifacts = models.JSONField(encoder=EnumEncoder, null=True)
5960

6061
def __str__(self):
6162
return (

validator/app/src/compute_horde_validator/validator/management/commands/debug_run_mock_streaming_job_to_miner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ async def run_streaming_job(options, wait_timeout: int = 300):
127127
docker_run_cmd=job_details.docker_run_cmd,
128128
volume=job_details.volume,
129129
output_upload=job_details.output,
130+
artifacts_dir=job_details.artifacts_dir,
130131
)
131132
)
132133

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Generated by Django 4.2.16 on 2025-02-07 13:53
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
dependencies = [
8+
("validator", "0049_alter_syntheticjobbatch_block_and_more"),
9+
]
10+
11+
operations = [
12+
migrations.AddField(
13+
model_name="organicjob",
14+
name="artifacts",
15+
field=models.JSONField(blank=True, default=dict),
16+
),
17+
]

validator/app/src/compute_horde_validator/validator/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ class SyntheticJob(JobBase):
233233
class OrganicJob(JobBase):
234234
stdout = models.TextField(blank=True, default="")
235235
stderr = models.TextField(blank=True, default="")
236+
artifacts = models.JSONField(blank=True, default=dict)
236237
block = models.BigIntegerField(
237238
null=True, help_text="Block number on which this job is scheduled"
238239
)

0 commit comments

Comments
 (0)