Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support ability to restore metrics via orchestrator #112

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions orchestrator/src/api/routes/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
use crate::api::server::AppState;
use actix_web::{
web::{self, get, Data},
web::{self, delete, get, post, Data, Path},
HttpResponse, Scope,
};
use serde::Deserialize;
use serde_json::json;

#[derive(Deserialize)]
struct ManualMetricEntry {
label: String,
value: f64,
}

#[derive(Deserialize)]
struct DeleteMetricRequest {
label: String,
address: String,
}
async fn get_metrics(app_state: Data<AppState>) -> HttpResponse {
let metrics = app_state
.store_context
Expand All @@ -13,6 +25,43 @@ async fn get_metrics(app_state: Data<AppState>) -> HttpResponse {
HttpResponse::Ok().json(json!({"success": true, "metrics": metrics}))
}

async fn get_all_metrics(app_state: Data<AppState>) -> HttpResponse {
let metrics = app_state.store_context.metrics_store.get_all_metrics();
HttpResponse::Ok().json(json!({"success": true, "metrics": metrics}))
}

// for potential backup restore purposes
async fn create_metric(
app_state: Data<AppState>,
metric: web::Json<ManualMetricEntry>,
) -> HttpResponse {
app_state
.store_context
.metrics_store
.store_manual_metrics(metric.label.clone(), metric.value);
HttpResponse::Ok().json(json!({"success": true}))
}

async fn delete_metric(
app_state: Data<AppState>,
task_id: Path<String>,
body: web::Json<DeleteMetricRequest>,
) -> HttpResponse {
let success =
app_state
.store_context
.metrics_store
.delete_metric(&task_id, &body.label, &body.address);

HttpResponse::Ok().json(json!({
"success": success
}))
}

pub fn metrics_routes() -> Scope {
web::scope("/metrics").route("", get().to(get_metrics))
web::scope("/metrics")
.route("", get().to(get_metrics))
.route("/all", get().to(get_all_metrics))
.route("", post().to(create_metric))
.route("/{task_id}", delete().to(delete_metric))
}
79 changes: 73 additions & 6 deletions orchestrator/src/store/domains/metrics_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,63 @@ impl MetricsStore {
}

for entry in metrics {
let task_id = if entry.key.task_id.is_empty() {
"manual".to_string()
} else {
entry.key.task_id
};

let cleaned_label = self.clean_label(&entry.key.label);
let redis_key = format!(
"{}:{}:{}",
ORCHESTRATOR_METRICS_STORE, entry.key.task_id, cleaned_label
ORCHESTRATOR_METRICS_STORE, task_id, cleaned_label
);
let mut con = self.redis.client.get_connection().unwrap();
if let Err(err) = con.hset(
redis_key,
sender_address.to_string(),
entry.value.to_string(),
) as RedisResult<()>

let address = if task_id == "manual" {
Address::ZERO.to_string()
} else {
sender_address.to_string()
};

if let Err(err) =
con.hset(redis_key, address, entry.value.to_string()) as RedisResult<()>
{
error!("Could not update metric value in redis: {}", err);
}
}
}

pub fn store_manual_metrics(&self, label: String, value: f64) {
self.store_metrics(
Some(vec![MetricEntry {
key: shared::models::metric::MetricKey {
task_id: "".to_string(),
label,
},
value,
}]),
Address::ZERO,
);
}

pub fn delete_metric(&self, task_id: &str, label: &str, address: &str) -> bool {
let mut con = self.redis.client.get_connection().unwrap();
let cleaned_label = self.clean_label(label);
let redis_key = format!(
"{}:{}:{}",
ORCHESTRATOR_METRICS_STORE, task_id, cleaned_label
);

match con.hdel::<_, _, i64>(redis_key, address.to_string()) {
Ok(deleted) => deleted == 1,
Err(err) => {
error!("Could not delete metric from redis: {}", err);
false
}
}
}

pub fn get_aggregate_metrics_for_task(&self, task_id: &str) -> HashMap<String, f64> {
let mut con = self.redis.client.get_connection().unwrap();
let all_keys: Vec<String> = con
Expand Down Expand Up @@ -125,6 +165,33 @@ impl MetricsStore {

result
}

pub fn get_all_metrics(&self) -> HashMap<String, HashMap<String, HashMap<String, f64>>> {
let mut con = self.redis.client.get_connection().unwrap();
let all_keys: Vec<String> = con
.keys(format!("{}:*:*", ORCHESTRATOR_METRICS_STORE))
.unwrap();
let mut result: HashMap<String, HashMap<String, HashMap<String, f64>>> = HashMap::new();

for key in all_keys {
if let [_, _, task_id, metric_name] = key.split(":").collect::<Vec<&str>>()[..] {
let values: HashMap<String, String> = con.hgetall(&key).unwrap();

for (node_addr, value) in values {
if let Ok(val) = value.parse::<f64>() {
result
.entry(task_id.to_string())
.or_default()
.entry(metric_name.to_string())
.or_default()
.insert(node_addr, val);
}
}
}
}

result
}
}

#[cfg(test)]
Expand Down