Skip to content

Commit 375f199

Browse files
authored
Merge pull request #66 from eth-cscs/health-check-refactoring
Refactored health checks
2 parents 4cedb37 + 95530f3 commit 375f199

13 files changed

+121
-84
lines changed

f7t-api-config.local-env-tests.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,7 @@ storage:
6767
max_part_size: 1073741824 # 1G
6868
parallel_runs: 3
6969
tmp_folder: "tmp"
70-
max_ops_file_size: 1048576 # 1M
70+
max_ops_file_size: 1048576 # 1M
71+
probing:
72+
interval: 60
73+
timeout: 10

f7t-api-config.local-env.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,5 @@ storage:
9191
tmp_folder: "tmp"
9292
max_ops_file_size: 1048576 # 1M
9393
probing:
94+
interval: 60
9495
timeout: 10

src/firecrest/config.py

Lines changed: 50 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
BaseModel,
1313
ConfigDict,
1414
Field,
15+
SecretStr,
1516
field_validator,
1617
)
1718
from pydantic_settings import (
@@ -75,57 +76,6 @@ def to_json(self):
7576
}
7677

7778

78-
class StorageProbing(CamelModel):
79-
"""Probing configuration to check availability of the storage system."""
80-
81-
timeout: int = Field(
82-
..., description="Timeout for storage health probing in seconds."
83-
)
84-
85-
86-
class Storage(BaseModel):
87-
"""Object storage configuration, including credentials, endpoints, and upload behavior."""
88-
89-
name: str = Field(..., description="Name identifier for the storage.")
90-
private_url: str = Field(
91-
..., description="Private/internal endpoint URL for the storage."
92-
)
93-
public_url: str = Field(..., description="Public/external URL for the storage.")
94-
access_key_id: str = Field(
95-
..., description="Access key ID for S3-compatible storage."
96-
)
97-
secret_access_key: LoadFileSecretStr = Field(
98-
...,
99-
description=(
100-
"Secret access key for storage. You can give directly the "
101-
"content or the file path using `'secret_file:/path/to/file'`."
102-
),
103-
)
104-
region: str = Field(..., description="Region of the storage bucket.")
105-
ttl: int = Field(..., description="Time-to-live (in seconds) for generated URLs.")
106-
tenant: Optional[str] = Field(
107-
None, description="Optional tenant identifier for multi-tenant setups."
108-
)
109-
multipart: MultipartUpload = Field(
110-
default_factory=MultipartUpload,
111-
description="Settings for multipart upload, including chunk size and concurrency.",
112-
)
113-
bucket_lifecycle_configuration: BucketLifecycleConfiguration = Field(
114-
default_factory=BucketLifecycleConfiguration,
115-
description="Lifecycle policy settings for auto-deleting files after a given number of days.",
116-
)
117-
max_ops_file_size: int = Field(
118-
5 * 1024 * 1024,
119-
description=(
120-
"Maximum file size (in bytes) allowed for direct upload and "
121-
"download. Larger files will go through the staging area."
122-
),
123-
)
124-
probing: Optional[StorageProbing] = Field(
125-
None, description="Configuration for probing storage availability."
126-
)
127-
128-
12979
class SchedulerType(str, Enum):
13080
"""Supported job scheduler types."""
13181

@@ -230,7 +180,7 @@ class HealthCheckException(BaseServiceHealth):
230180
pass
231181

232182

233-
class ClusterProbing(CamelModel):
183+
class Probing(CamelModel):
234184
"""Cluster monitoring attributes."""
235185

236186
interval: int = Field(
@@ -239,6 +189,53 @@ class ClusterProbing(CamelModel):
239189
timeout: int = Field(..., description="Maximum time in seconds allowed per check.")
240190

241191

192+
class Storage(BaseModel):
193+
"""Object storage configuration, including credentials, endpoints, and upload behavior."""
194+
195+
name: str = Field(..., description="Name identifier for the storage.")
196+
private_url: SecretStr = Field(
197+
..., description="Private/internal endpoint URL for the storage."
198+
)
199+
public_url: str = Field(..., description="Public/external URL for the storage.")
200+
access_key_id: SecretStr = Field(
201+
..., description="Access key ID for S3-compatible storage."
202+
)
203+
secret_access_key: LoadFileSecretStr = Field(
204+
...,
205+
description=(
206+
"Secret access key for storage. You can give directly the "
207+
"content or the file path using `'secret_file:/path/to/file'`."
208+
),
209+
)
210+
region: str = Field(..., description="Region of the storage bucket.")
211+
ttl: int = Field(..., description="Time-to-live (in seconds) for generated URLs.")
212+
tenant: Optional[str] = Field(
213+
None, description="Optional tenant identifier for multi-tenant setups."
214+
)
215+
multipart: MultipartUpload = Field(
216+
default_factory=MultipartUpload,
217+
description="Settings for multipart upload, including chunk size and concurrency.",
218+
)
219+
bucket_lifecycle_configuration: BucketLifecycleConfiguration = Field(
220+
default_factory=BucketLifecycleConfiguration,
221+
description="Lifecycle policy settings for auto-deleting files after a given number of days.",
222+
)
223+
max_ops_file_size: int = Field(
224+
5 * 1024 * 1024,
225+
description=(
226+
"Maximum file size (in bytes) allowed for direct upload and "
227+
"download. Larger files will go through the staging area."
228+
),
229+
)
230+
probing: Optional[Probing] = Field(
231+
None, description="Configuration for probing storage availability."
232+
)
233+
servicesHealth: Optional[List[S3ServiceHealth]] = Field(
234+
None,
235+
description="Optional health information for different services in the cluster.",
236+
)
237+
238+
242239
class FileSystem(CamelModel):
243240
"""Defines a cluster file system and its type."""
244241

@@ -306,14 +303,13 @@ class HPCCluster(CamelModel):
306303
SchedulerServiceHealth
307304
| FilesystemServiceHealth
308305
| SSHServiceHealth
309-
| S3ServiceHealth
310306
| HealthCheckException
311307
]
312308
] = Field(
313309
None,
314310
description="Optional health information for different services in the cluster.",
315311
)
316-
probing: ClusterProbing = Field(
312+
probing: Probing = Field(
317313
..., description="Probing configuration for monitoring the cluster."
318314
)
319315
file_systems: List[FileSystem] = Field(

src/firecrest/dependencies.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,14 +321,14 @@ def __init__(
321321
if settings.storage:
322322
self.url = settings.storage.public_url
323323
if connection == S3ClientConnectionType.private:
324-
self.url = settings.storage.private_url
324+
self.url = settings.storage.private_url.get_secret_value()
325325

326326
async def __call__(self):
327327
async with get_session().create_client(
328328
"s3",
329329
region_name=settings.storage.region,
330330
aws_secret_access_key=settings.storage.secret_access_key.get_secret_value(),
331-
aws_access_key_id=settings.storage.access_key_id,
331+
aws_access_key_id=settings.storage.access_key_id.get_secret_value(),
332332
endpoint_url=self.url,
333333
config=AioConfig(signature_version="s3v4"),
334334
) as client:

src/firecrest/main.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from fastapi.exceptions import RequestValidationError
1818

1919
from starlette.exceptions import HTTPException as StarletteHTTPException
20+
from firecrest.status.health_check.health_checker_cluster import ClusterHealthChecker
21+
from firecrest.status.health_check.health_checker_storage import StorageHealthChecker
2022
from starlette_context import plugins
2123
from starlette_context.middleware import RawContextMiddleware
2224

@@ -34,9 +36,6 @@
3436
)
3537
from lib.ssh_clients.ssh_keygen_client import SSHKeygenClient
3638
from firecrest.dependencies import SSHClientDependency
37-
from firecrest.status.health_check.health_checker import (
38-
SchedulerHealthChecker,
39-
)
4039

4140
# routers
4241
from firecrest.status.router import (
@@ -111,10 +110,16 @@ async def schedule_tasks(scheduler: AsyncScheduler):
111110
for cluster in plugin_settings.clusters:
112111
if cluster.probing:
113112
await scheduler.add_schedule(
114-
SchedulerHealthChecker(cluster).check,
113+
ClusterHealthChecker(cluster).check,
115114
IntervalTrigger(seconds=cluster.probing.interval),
116115
id=f"check-cluster-{cluster.name}",
117116
)
117+
if settings.storage and settings.storage.probing:
118+
await scheduler.add_schedule(
119+
StorageHealthChecker(settings.storage).check,
120+
IntervalTrigger(seconds=settings.storage.probing.interval),
121+
id="check-storage",
122+
)
118123
await scheduler.add_schedule(
119124
SSHClientDependency.prune_client_pools,
120125
IntervalTrigger(seconds=5),

src/firecrest/status/health_check/checks/health_check_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
class HealthCheckBase(ABC):
1313

14-
def __init__(self, system: HPCCluster):
14+
def __init__(self, system: HPCCluster = None):
1515
self.system = system
1616

1717
async def check(self) -> BaseServiceHealth:

src/firecrest/status/health_check/checks/health_check_s3.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
# SPDX-License-Identifier: BSD-3-Clause
55

66
from firecrest.config import (
7-
HPCCluster,
87
S3ServiceHealth,
98
)
109
from firecrest.dependencies import (
@@ -16,8 +15,8 @@
1615

1716
class S3HealthCheck(HealthCheckBase):
1817

19-
def __init__(self, system: HPCCluster, timeout: int):
20-
super().__init__(system)
18+
def __init__(self, timeout: int):
19+
super().__init__()
2120
self.timeout = timeout
2221

2322
async def execute_check(self) -> S3ServiceHealth:

src/firecrest/status/health_check/health_checker.py renamed to src/firecrest/status/health_check/health_checker_cluster.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
from firecrest.status.health_check.checks.health_check_filesystem import (
1313
FilesystemHealthCheck,
1414
)
15-
from firecrest.status.health_check.checks.health_check_s3 import S3HealthCheck
1615
from firecrest.status.health_check.checks.health_check_scheduler import (
1716
SchedulerHealthCheck,
1817
)
@@ -25,7 +24,7 @@
2524
from firecrest.plugins import settings
2625

2726

28-
class SchedulerHealthChecker:
27+
class ClusterHealthChecker:
2928

3029
scheduler_client: SchedulerBaseClient = None
3130
cluster: HPCCluster = None
@@ -78,20 +77,12 @@ async def check(self) -> None:
7877
)
7978
checks += [filesystemCheck.check()]
8079

81-
if settings.storage and settings.storage.probing:
82-
s3Check = S3HealthCheck(
83-
system=self.cluster, timeout=settings.storage.probing.timeout
84-
)
85-
checks += [s3Check.check()]
86-
8780
results = await asyncio.gather(*checks, return_exceptions=True)
8881
self.cluster.servicesHealth = results
8982
except Exception as ex:
90-
error_message = (
91-
f"HealthChecker execution failed with error: {ex.__class__.__name__}"
92-
)
83+
error_message = f"Cluster HealthChecker execution failed with error: {ex.__class__.__name__}"
9384
if len(str(ex)) > 0:
94-
error_message = f"HealthChecker execution failed with error: {ex.__class__.__name__} - {str(ex)}"
85+
error_message = f"Cluster HealthChecker execution failed with error: {ex.__class__.__name__} - {str(ex)}"
9586
exception = HealthCheckException(service_type="exception")
9687
exception.healthy = False
9788
exception.last_checked = time.time()
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Copyright (c) 2025, ETH Zurich. All rights reserved.
2+
#
3+
# Please, refer to the LICENSE file in the root directory.
4+
# SPDX-License-Identifier: BSD-3-Clause
5+
6+
import asyncio
7+
import time
8+
9+
10+
from firecrest.config import HealthCheckException, Storage
11+
from firecrest.status.health_check.checks.health_check_s3 import S3HealthCheck
12+
from firecrest.plugins import settings
13+
14+
15+
class StorageHealthChecker:
16+
17+
storage: Storage = None
18+
19+
def __init__(self, storage: Storage):
20+
self.storage = storage
21+
22+
async def check(self) -> None:
23+
try:
24+
checks = []
25+
s3Check = S3HealthCheck(timeout=settings.storage.probing.timeout)
26+
checks += [s3Check.check()]
27+
28+
results = await asyncio.gather(*checks, return_exceptions=True)
29+
self.storage.servicesHealth = results
30+
except Exception as ex:
31+
error_message = f"Storage HealthChecker execution failed with error: {ex.__class__.__name__}"
32+
if len(str(ex)) > 0:
33+
error_message = f"Storage HealthChecker execution failed with error: {ex.__class__.__name__} - {str(ex)}"
34+
exception = HealthCheckException(service_type="exception")
35+
exception.healthy = False
36+
exception.last_checked = time.time()
37+
exception.message = error_message
38+
self.storage.servicesHealth = [exception]
39+
# Note: raising the exception might not be handled well by apscheduler.
40+
# Instead consider printing the exceotion with: traceback.print_exception(ex)
41+
raise ex

src/firecrest/status/models.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
# SPDX-License-Identifier: BSD-3-Clause
55

66
from datetime import datetime
7-
from typing import Dict, List
7+
from typing import Dict, List, Optional
88

99
# configs
10-
from firecrest.config import HPCCluster
10+
from firecrest.config import HPCCluster, Storage
1111

1212
# models
1313
from lib.models import CamelModel
@@ -25,6 +25,7 @@ class GetLiveness(CamelModel):
2525

2626
class GetSystemsResponse(CamelModel):
2727
systems: List[HPCCluster]
28+
storage: Optional[Storage] = None
2829

2930

3031
class GetNodesResponse(CamelModel):

src/firecrest/status/router.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
response_description="System list returned successfully",
6363
)
6464
async def get_systems() -> Any:
65-
return {"systems": settings.clusters}
65+
return {"systems": settings.clusters, "storage": settings.storage}
6666

6767

6868
@router_on_systen.get(

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ async def s3_client():
6666
"s3",
6767
region_name=settings.storage.region,
6868
aws_secret_access_key=settings.storage.secret_access_key,
69-
aws_access_key_id=settings.storage.access_key_id,
70-
endpoint_url=settings.storage.private_url,
69+
aws_access_key_id=settings.storage.access_key_id.get_secret_value(),
70+
endpoint_url=settings.storage.private_url.get_secret_value(),
7171
config=AioConfig(signature_version="s3v4"),
7272
) as client:
7373
global_s3_client = client

tests/health_check_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import pytest
1010
from aioresponses import aioresponses
1111
from pytest_httpx import HTTPXMock
12-
from firecrest.status.health_check.health_checker import SchedulerHealthChecker
12+
from firecrest.status.health_check.health_checker_cluster import ClusterHealthChecker
1313
from lib.auth.authN.OIDC_token_auth import OIDCTokenAuth
1414
from lib.models.apis.api_auth_model import ApiAuthModel
1515
from tests import mocked_api_responses
@@ -67,7 +67,7 @@ async def test_health_check(
6767
body=json.dumps(mocked_nodes_get_response),
6868
)
6969

70-
health_checker = SchedulerHealthChecker(
70+
health_checker = ClusterHealthChecker(
7171
slurm_cluster_with_api_config, token_decoder=TokenDecoderMock()
7272
)
7373
await health_checker.check()

0 commit comments

Comments
 (0)