From 33c478e8f01f69d233212832e4f8a38a2174001e Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Thu, 6 Feb 2025 16:41:49 +0100 Subject: [PATCH] support ability to restore metrics via orchestrator --- orchestrator/src/api/routes/metrics.rs | 53 ++++++++++++- .../src/store/domains/metrics_store.rs | 79 +++++++++++++++++-- 2 files changed, 124 insertions(+), 8 deletions(-) diff --git a/orchestrator/src/api/routes/metrics.rs b/orchestrator/src/api/routes/metrics.rs index ce692c9..384b5cc 100644 --- a/orchestrator/src/api/routes/metrics.rs +++ b/orchestrator/src/api/routes/metrics.rs @@ -1,10 +1,22 @@ use crate::api::server::AppState; use actix_web::{ - web::{self, get, Data}, + web::{self, delete, get, post, Data, Path}, HttpResponse, Scope, }; +use serde::Deserialize; use serde_json::json; +#[derive(Deserialize)] +struct ManualMetricEntry { + label: String, + value: f64, +} + +#[derive(Deserialize)] +struct DeleteMetricRequest { + label: String, + address: String, +} async fn get_metrics(app_state: Data) -> HttpResponse { let metrics = app_state .store_context @@ -13,6 +25,43 @@ async fn get_metrics(app_state: Data) -> HttpResponse { HttpResponse::Ok().json(json!({"success": true, "metrics": metrics})) } +async fn get_all_metrics(app_state: Data) -> HttpResponse { + let metrics = app_state.store_context.metrics_store.get_all_metrics(); + HttpResponse::Ok().json(json!({"success": true, "metrics": metrics})) +} + +// for potential backup restore purposes +async fn create_metric( + app_state: Data, + metric: web::Json, +) -> HttpResponse { + app_state + .store_context + .metrics_store + .store_manual_metrics(metric.label.clone(), metric.value); + HttpResponse::Ok().json(json!({"success": true})) +} + +async fn delete_metric( + app_state: Data, + task_id: Path, + body: web::Json, +) -> HttpResponse { + let success = + app_state + .store_context + .metrics_store + .delete_metric(&task_id, &body.label, &body.address); + + HttpResponse::Ok().json(json!({ + "success": success + })) +} + pub fn metrics_routes() -> Scope { - web::scope("/metrics").route("", get().to(get_metrics)) + web::scope("/metrics") + .route("", get().to(get_metrics)) + .route("/all", get().to(get_all_metrics)) + .route("", post().to(create_metric)) + .route("/{task_id}", delete().to(delete_metric)) } diff --git a/orchestrator/src/store/domains/metrics_store.rs b/orchestrator/src/store/domains/metrics_store.rs index ad55d4b..53f0f65 100644 --- a/orchestrator/src/store/domains/metrics_store.rs +++ b/orchestrator/src/store/domains/metrics_store.rs @@ -32,23 +32,63 @@ impl MetricsStore { } for entry in metrics { + let task_id = if entry.key.task_id.is_empty() { + "manual".to_string() + } else { + entry.key.task_id + }; + let cleaned_label = self.clean_label(&entry.key.label); let redis_key = format!( "{}:{}:{}", - ORCHESTRATOR_METRICS_STORE, entry.key.task_id, cleaned_label + ORCHESTRATOR_METRICS_STORE, task_id, cleaned_label ); let mut con = self.redis.client.get_connection().unwrap(); - if let Err(err) = con.hset( - redis_key, - sender_address.to_string(), - entry.value.to_string(), - ) as RedisResult<()> + + let address = if task_id == "manual" { + Address::ZERO.to_string() + } else { + sender_address.to_string() + }; + + if let Err(err) = + con.hset(redis_key, address, entry.value.to_string()) as RedisResult<()> { error!("Could not update metric value in redis: {}", err); } } } + pub fn store_manual_metrics(&self, label: String, value: f64) { + self.store_metrics( + Some(vec![MetricEntry { + key: shared::models::metric::MetricKey { + task_id: "".to_string(), + label, + }, + value, + }]), + Address::ZERO, + ); + } + + pub fn delete_metric(&self, task_id: &str, label: &str, address: &str) -> bool { + let mut con = self.redis.client.get_connection().unwrap(); + let cleaned_label = self.clean_label(label); + let redis_key = format!( + "{}:{}:{}", + ORCHESTRATOR_METRICS_STORE, task_id, cleaned_label + ); + + match con.hdel::<_, _, i64>(redis_key, address.to_string()) { + Ok(deleted) => deleted == 1, + Err(err) => { + error!("Could not delete metric from redis: {}", err); + false + } + } + } + pub fn get_aggregate_metrics_for_task(&self, task_id: &str) -> HashMap { let mut con = self.redis.client.get_connection().unwrap(); let all_keys: Vec = con @@ -125,6 +165,33 @@ impl MetricsStore { result } + + pub fn get_all_metrics(&self) -> HashMap>> { + let mut con = self.redis.client.get_connection().unwrap(); + let all_keys: Vec = con + .keys(format!("{}:*:*", ORCHESTRATOR_METRICS_STORE)) + .unwrap(); + let mut result: HashMap>> = HashMap::new(); + + for key in all_keys { + if let [_, _, task_id, metric_name] = key.split(":").collect::>()[..] { + let values: HashMap = con.hgetall(&key).unwrap(); + + for (node_addr, value) in values { + if let Ok(val) = value.parse::() { + result + .entry(task_id.to_string()) + .or_default() + .entry(metric_name.to_string()) + .or_default() + .insert(node_addr, val); + } + } + } + } + + result + } } #[cfg(test)]