From 88dcd16679c56429b3c535687f84e5721244ddc6 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Wed, 5 Feb 2025 15:08:36 +0100 Subject: [PATCH 1/5] add ability to get metrics for a single node --- orchestrator/src/api/routes/nodes.rs | 41 ++++++++++ .../src/store/domains/metrics_store.rs | 77 ++++++++++++++++++- 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/orchestrator/src/api/routes/nodes.rs b/orchestrator/src/api/routes/nodes.rs index 566ac67..4b51e46 100644 --- a/orchestrator/src/api/routes/nodes.rs +++ b/orchestrator/src/api/routes/nodes.rs @@ -133,11 +133,20 @@ async fn get_node_logs(node_id: web::Path, app_state: Data) -> } } +async fn get_node_metrics(node_id: web::Path, app_state: Data) -> HttpResponse { + println!("get_node_metrics: {}", node_id); + let node_address = Address::from_str(&node_id).unwrap(); + let metrics = app_state.store_context.metrics_store.get_metrics_for_node(node_address); + HttpResponse::Ok().json(json!({"success": true, "metrics": metrics})) +} + + pub fn nodes_routes() -> Scope { web::scope("/nodes") .route("", get().to(get_nodes)) .route("/{node_id}/restart", post().to(restart_node_task)) .route("/{node_id}/logs", get().to(get_node_logs)) + .route("/{node_id}/metrics", get().to(get_node_metrics)) } #[cfg(test)] @@ -201,4 +210,36 @@ mod tests { nodes_array[0]["address"] ); } + + #[actix_web::test] + async fn test_get_metrics_for_node_not_exist() { + let app_state = create_test_app_state().await; + let app = test::init_service( + App::new() + .app_data(app_state.clone()) + .route("/nodes/{node_id}/metrics", get().to(get_node_metrics)), + ) + .await; + + let node_id = "0x0000000000000000000000000000000000000000"; + let req = test::TestRequest::get() + .uri(&format!("/nodes/{}/metrics", node_id)) + .to_request(); + let resp = test::call_service(&app, req).await; + + + let body = test::read_body(resp).await; + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + println!("json {:?}", json); + assert_eq!( + json["success"], true, + "Expected success to be true but got {:?}", + json["success"] + ); + assert_eq!( + json["metrics"], json!({}), + "Expected empty metrics object but got {:?}", + json["metrics"] + ); + } } diff --git a/orchestrator/src/store/domains/metrics_store.rs b/orchestrator/src/store/domains/metrics_store.rs index 4464b65..1246faa 100644 --- a/orchestrator/src/store/domains/metrics_store.rs +++ b/orchestrator/src/store/domains/metrics_store.rs @@ -94,6 +94,35 @@ impl MetricsStore { } println!("result {:?}", result); + result + } + + pub fn get_metrics_for_node(&self, node_address: Address) -> HashMap> { + let mut con = self.redis.client.get_connection().unwrap(); + let all_keys: Vec = con + .keys(format!("{}:*:*", ORCHESTRATOR_METRICS_STORE)) + .unwrap(); + let mut result: HashMap> = HashMap::new(); + + for key in all_keys { + let values: HashMap = con.hgetall(&key).unwrap(); + + // Get the metric value for this specific node address + if let Some(value) = values.get(&node_address.to_string()) { + if let Ok(val) = value.parse::() { + // Extract task ID and metric name from the key + let parts: Vec<&str> = key.split(":").collect(); + let task_id = parts[2].to_string(); + let metric_name = parts[3].to_string(); + + result + .entry(task_id) + .or_insert_with(HashMap::new) + .insert(metric_name, val); + } + } + } + result } } @@ -103,6 +132,8 @@ mod tests { use super::*; use crate::api::tests::helper::create_test_app_state; use shared::models::metric::MetricKey; + use shared::models::metric::MetricEntry; + use std::str::FromStr; #[tokio::test] async fn test_store_metrics() { @@ -136,9 +167,53 @@ mod tests { metrics_store.store_metrics(Some(metrics), Address::ZERO); let metrics: HashMap = metrics_store.get_aggregate_metrics_for_task("task_1"); - println!("metrics {:?}", metrics); assert_eq!(metrics.get("cpu_usage"), Some(&1.0)); let metrics: HashMap = metrics_store.get_aggregate_metrics_for_all_tasks(); assert_eq!(metrics.get("cpu_usage"), Some(&3.0)); } + + #[tokio::test] + async fn test_get_metrics_for_node() { + let app_state = create_test_app_state().await; + let metrics_store = app_state.store_context.metrics_store.clone(); + + let node_addr_0 = Address::ZERO; + let node_addr_1 = Address::from_str("0x1234567890123456789012345678901234567890").unwrap(); + + let mut metrics = Vec::new(); + let task_id = "task_1"; + let metric_key = MetricKey { + task_id: task_id.to_string(), + label: "cpu_usage".to_string(), + }; + let metric = MetricEntry { + key: metric_key, + value: 1.0, + }; + metrics.push(metric); + let metrics2 = metrics.clone(); + metrics_store.store_metrics(Some(metrics), node_addr_0); + metrics_store.store_metrics(Some(metrics2), node_addr_1); + + let mut metrics = Vec::new(); + let task_id = "task_2"; + let metric_key = MetricKey { + task_id: task_id.to_string(), + label: "cpu_usage".to_string(), + }; + let metric = MetricEntry { + key: metric_key, + value: 1.0, + }; + metrics.push(metric); + metrics_store.store_metrics(Some(metrics), node_addr_1); + + let metrics = metrics_store.get_metrics_for_node(node_addr_0); + assert_eq!(metrics.get("task_1").unwrap().get("cpu_usage"), Some(&1.0)); + assert_eq!(metrics.get("task_2"), None); + + let metrics_1 = metrics_store.get_metrics_for_node(node_addr_1); + assert_eq!(metrics_1.get("task_1").unwrap().get("cpu_usage"), Some(&1.0)); + assert_eq!(metrics_1.get("task_2").unwrap().get("cpu_usage"), Some(&1.0)); + } } From 3b7fb10725a9db1c6789d07ef9b3c1319685ce97 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Wed, 5 Feb 2025 15:32:28 +0100 Subject: [PATCH 2/5] properly sort nodes, add version to heartbeat and nodes --- miner/src/cli/command.rs | 2 + miner/src/operations/heartbeat/service.rs | 2 + orchestrator/src/api/routes/heartbeat.rs | 22 ++++- orchestrator/src/api/routes/nodes.rs | 11 ++- orchestrator/src/models/node.rs | 2 + orchestrator/src/node/status_update.rs | 22 ++++- .../src/store/domains/heartbeat_store.rs | 4 +- .../src/store/domains/metrics_store.rs | 28 ++++--- orchestrator/src/store/domains/node_store.rs | 80 +++++++++++++++++++ shared/src/models/heartbeat.rs | 4 +- shared/src/models/metric.rs | 2 +- 11 files changed, 157 insertions(+), 22 deletions(-) diff --git a/miner/src/cli/command.rs b/miner/src/cli/command.rs index 3d9983d..6e03589 100644 --- a/miner/src/cli/command.rs +++ b/miner/src/cli/command.rs @@ -113,7 +113,9 @@ pub async fn execute_command( } let mut recover_last_state = *auto_recover; + let version = env!("CARGO_PKG_VERSION"); Console::section("🚀 PRIME MINER INITIALIZATION"); + Console::info("Version:", version); /* Initialize Wallet instances */ diff --git a/miner/src/operations/heartbeat/service.rs b/miner/src/operations/heartbeat/service.rs index 55b7b6d..c8eb974 100644 --- a/miner/src/operations/heartbeat/service.rs +++ b/miner/src/operations/heartbeat/service.rs @@ -143,6 +143,7 @@ impl HeartbeatService { task_id: Some(task.id.to_string()), task_state: Some(task.state.to_string()), metrics: Some(metrics_for_task), + version: Some(env!("CARGO_PKG_VERSION").to_string()), } } else { HeartbeatRequest { @@ -150,6 +151,7 @@ impl HeartbeatService { task_id: None, task_state: None, metrics: None, + version: Some(env!("CARGO_PKG_VERSION").to_string()), } }; diff --git a/orchestrator/src/api/routes/heartbeat.rs b/orchestrator/src/api/routes/heartbeat.rs index cf7aceb..4653623 100644 --- a/orchestrator/src/api/routes/heartbeat.rs +++ b/orchestrator/src/api/routes/heartbeat.rs @@ -74,7 +74,16 @@ mod tests { .store_context .heartbeat_store .get_heartbeat(&node_address); - assert_eq!(value, Some("{\"address\":\"0x0000000000000000000000000000000000000000\",\"task_id\":null,\"task_state\":null,\"metrics\":null}".to_string())); + assert_eq!( + value, + Some(HeartbeatRequest { + address: "0x0000000000000000000000000000000000000000".to_string(), + task_id: None, + task_state: None, + metrics: None, + version: None + }) + ); } #[actix_web::test] @@ -120,6 +129,15 @@ mod tests { .heartbeat_store .get_heartbeat(&node_address); // Task has not started yet - assert_eq!(value, Some("{\"address\":\"0x0000000000000000000000000000000000000000\",\"task_id\":null,\"task_state\":null,\"metrics\":null}".to_string())); + + let value = value.unwrap(); + let heartbeat = HeartbeatRequest { + address: "0x0000000000000000000000000000000000000000".to_string(), + task_id: None, + task_state: None, + metrics: None, + version: None, + }; + assert_eq!(value, heartbeat); } } diff --git a/orchestrator/src/api/routes/nodes.rs b/orchestrator/src/api/routes/nodes.rs index 4b51e46..26587f2 100644 --- a/orchestrator/src/api/routes/nodes.rs +++ b/orchestrator/src/api/routes/nodes.rs @@ -136,10 +136,12 @@ async fn get_node_logs(node_id: web::Path, app_state: Data) -> async fn get_node_metrics(node_id: web::Path, app_state: Data) -> HttpResponse { println!("get_node_metrics: {}", node_id); let node_address = Address::from_str(&node_id).unwrap(); - let metrics = app_state.store_context.metrics_store.get_metrics_for_node(node_address); + let metrics = app_state + .store_context + .metrics_store + .get_metrics_for_node(node_address); HttpResponse::Ok().json(json!({"success": true, "metrics": metrics})) } - pub fn nodes_routes() -> Scope { web::scope("/nodes") @@ -177,6 +179,7 @@ mod tests { status: NodeStatus::Discovered, task_id: None, task_state: None, + version: None, }; app_state.store_context.node_store.add_node(node.clone()); @@ -226,7 +229,6 @@ mod tests { .uri(&format!("/nodes/{}/metrics", node_id)) .to_request(); let resp = test::call_service(&app, req).await; - let body = test::read_body(resp).await; let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); @@ -237,7 +239,8 @@ mod tests { json["success"] ); assert_eq!( - json["metrics"], json!({}), + json["metrics"], + json!({}), "Expected empty metrics object but got {:?}", json["metrics"] ); diff --git a/orchestrator/src/models/node.rs b/orchestrator/src/models/node.rs index 70d3872..a00df4a 100644 --- a/orchestrator/src/models/node.rs +++ b/orchestrator/src/models/node.rs @@ -12,6 +12,7 @@ pub struct OrchestratorNode { pub task_id: Option, pub task_state: Option, + pub version: Option, } impl From for OrchestratorNode { @@ -23,6 +24,7 @@ impl From for OrchestratorNode { status: NodeStatus::Discovered, task_id: None, task_state: None, + version: None, } } } diff --git a/orchestrator/src/node/status_update.rs b/orchestrator/src/node/status_update.rs index 462d62f..0d66402 100644 --- a/orchestrator/src/node/status_update.rs +++ b/orchestrator/src/node/status_update.rs @@ -98,8 +98,17 @@ impl NodeStatusUpdater { .heartbeat_store .get_unhealthy_counter(&node.address); match heartbeat { - Some(_) => { + Some(beat) => { // We have a heartbeat + if let Some(version) = &beat.version { + if node.version.as_ref() != Some(version) { + let _: () = self + .store_context + .node_store + .update_node_version(&node.address, version); + } + } + if node.status == NodeStatus::Unhealthy || node.status == NodeStatus::WaitingForHeartbeat { @@ -181,6 +190,7 @@ mod tests { status: NodeStatus::WaitingForHeartbeat, task_id: None, task_state: None, + version: None, }; let _: () = app_state.store_context.node_store.add_node(node.clone()); @@ -189,6 +199,7 @@ mod tests { task_id: None, task_state: None, metrics: None, + version: Some(env!("CARGO_PKG_VERSION").to_string()), }; let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat); @@ -228,6 +239,7 @@ mod tests { status: NodeStatus::Healthy, task_id: None, task_state: None, + version: None, }; let _: () = app_state.store_context.node_store.add_node(node.clone()); @@ -267,6 +279,7 @@ mod tests { status: NodeStatus::Unhealthy, task_id: None, task_state: None, + version: None, }; let _: () = app_state.store_context.node_store.add_node(node.clone()); @@ -311,6 +324,7 @@ mod tests { status: NodeStatus::Unhealthy, task_id: None, task_state: None, + version: None, }; let _: () = app_state.store_context.node_store.add_node(node.clone()); @@ -355,6 +369,7 @@ mod tests { status: NodeStatus::Unhealthy, task_id: None, task_state: None, + version: None, }; let _: () = app_state .store_context @@ -366,6 +381,7 @@ mod tests { task_id: None, task_state: None, metrics: None, + version: Some(env!("CARGO_PKG_VERSION").to_string()), }; let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat); let _: () = app_state.store_context.node_store.add_node(node.clone()); @@ -412,6 +428,7 @@ mod tests { status: NodeStatus::Unhealthy, task_id: None, task_state: None, + version: None, }; let _: () = app_state .store_context @@ -426,6 +443,7 @@ mod tests { status: NodeStatus::Healthy, task_id: None, task_state: None, + version: None, }; let _: () = app_state.store_context.node_store.add_node(node2.clone()); @@ -483,6 +501,7 @@ mod tests { status: NodeStatus::Unhealthy, task_id: None, task_state: None, + version: None, }; let _: () = app_state.store_context.node_store.add_node(node.clone()); @@ -519,6 +538,7 @@ mod tests { task_id: None, task_state: None, metrics: None, + version: Some(env!("CARGO_PKG_VERSION").to_string()), }; let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat); diff --git a/orchestrator/src/store/domains/heartbeat_store.rs b/orchestrator/src/store/domains/heartbeat_store.rs index 2ff0fc1..8cdd1a9 100644 --- a/orchestrator/src/store/domains/heartbeat_store.rs +++ b/orchestrator/src/store/domains/heartbeat_store.rs @@ -32,11 +32,11 @@ impl HeartbeatStore { .unwrap(); } - pub fn get_heartbeat(&self, address: &Address) -> Option { + pub fn get_heartbeat(&self, address: &Address) -> Option { let mut con = self.redis.client.get_connection().unwrap(); let key = format!("{}:{}", ORCHESTRATOR_HEARTBEAT_KEY, address); let value: Option = con.get(key).unwrap(); - value + value.and_then(|v| serde_json::from_str(&v).ok()) } pub fn get_unhealthy_counter(&self, address: &Address) -> u32 { diff --git a/orchestrator/src/store/domains/metrics_store.rs b/orchestrator/src/store/domains/metrics_store.rs index 1246faa..ad55d4b 100644 --- a/orchestrator/src/store/domains/metrics_store.rs +++ b/orchestrator/src/store/domains/metrics_store.rs @@ -95,9 +95,12 @@ impl MetricsStore { println!("result {:?}", result); result - } + } - pub fn get_metrics_for_node(&self, node_address: Address) -> HashMap> { + pub fn get_metrics_for_node( + &self, + node_address: Address, + ) -> HashMap> { let mut con = self.redis.client.get_connection().unwrap(); let all_keys: Vec = con .keys(format!("{}:*:*", ORCHESTRATOR_METRICS_STORE)) @@ -106,7 +109,7 @@ impl MetricsStore { for key in all_keys { let values: HashMap = con.hgetall(&key).unwrap(); - + // Get the metric value for this specific node address if let Some(value) = values.get(&node_address.to_string()) { if let Ok(val) = value.parse::() { @@ -115,10 +118,7 @@ impl MetricsStore { let task_id = parts[2].to_string(); let metric_name = parts[3].to_string(); - result - .entry(task_id) - .or_insert_with(HashMap::new) - .insert(metric_name, val); + result.entry(task_id).or_default().insert(metric_name, val); } } } @@ -131,9 +131,9 @@ impl MetricsStore { mod tests { use super::*; use crate::api::tests::helper::create_test_app_state; - use shared::models::metric::MetricKey; use shared::models::metric::MetricEntry; - use std::str::FromStr; + use shared::models::metric::MetricKey; + use std::str::FromStr; #[tokio::test] async fn test_store_metrics() { @@ -213,7 +213,13 @@ mod tests { assert_eq!(metrics.get("task_2"), None); let metrics_1 = metrics_store.get_metrics_for_node(node_addr_1); - assert_eq!(metrics_1.get("task_1").unwrap().get("cpu_usage"), Some(&1.0)); - assert_eq!(metrics_1.get("task_2").unwrap().get("cpu_usage"), Some(&1.0)); + assert_eq!( + metrics_1.get("task_1").unwrap().get("cpu_usage"), + Some(&1.0) + ); + assert_eq!( + metrics_1.get("task_2").unwrap().get("cpu_usage"), + Some(&1.0) + ); } } diff --git a/orchestrator/src/store/domains/node_store.rs b/orchestrator/src/store/domains/node_store.rs index f2cb316..2180710 100644 --- a/orchestrator/src/store/domains/node_store.rs +++ b/orchestrator/src/store/domains/node_store.rs @@ -28,6 +28,20 @@ impl NodeStore { let node: OrchestratorNode = OrchestratorNode::from_string(&node_string); nodes.push(node); } + + nodes.sort_by(|a, b| match (&a.status, &b.status) { + (NodeStatus::Healthy, NodeStatus::Healthy) => std::cmp::Ordering::Equal, + (NodeStatus::Healthy, _) => std::cmp::Ordering::Less, + (_, NodeStatus::Healthy) => std::cmp::Ordering::Greater, + (NodeStatus::Discovered, NodeStatus::Discovered) => std::cmp::Ordering::Equal, + (NodeStatus::Discovered, _) => std::cmp::Ordering::Less, + (_, NodeStatus::Discovered) => std::cmp::Ordering::Greater, + (NodeStatus::Dead, NodeStatus::Dead) => std::cmp::Ordering::Equal, + (NodeStatus::Dead, _) => std::cmp::Ordering::Greater, + (_, NodeStatus::Dead) => std::cmp::Ordering::Less, + _ => std::cmp::Ordering::Equal, + }); + nodes } @@ -75,6 +89,16 @@ impl NodeStore { let _: () = con.set(&node_key, node_string).unwrap(); } + pub fn update_node_version(&self, node_address: &Address, version: &str) { + let mut con = self.redis.client.get_connection().unwrap(); + let node_key: String = format!("{}:{}", ORCHESTRATOR_BASE_KEY, node_address); + let node_string: String = con.get(&node_key).unwrap(); + let mut node: OrchestratorNode = serde_json::from_str(&node_string).unwrap(); + node.version = Some(version.to_string()); + let node_string = node.to_string(); + let _: () = con.set(&node_key, node_string).unwrap(); + } + pub fn update_node_task( &self, node_address: Address, @@ -141,6 +165,7 @@ mod tests { status: NodeStatus::Discovered, task_id: None, task_state: None, + version: None, }; let healthy_node = OrchestratorNode { @@ -150,6 +175,7 @@ mod tests { status: NodeStatus::Healthy, task_id: None, task_state: None, + version: None, }; node_store.add_node(uninvited_node.clone()); @@ -159,4 +185,58 @@ mod tests { assert_eq!(uninvited_nodes.len(), 1); assert_eq!(uninvited_nodes[0].address, uninvited_node.address); } + + #[tokio::test] + async fn test_node_sorting() { + let app_state = create_test_app_state().await; + let node_store = &app_state.store_context.node_store; + + let nodes = vec![ + OrchestratorNode { + address: Address::from_str("0x0000000000000000000000000000000000000003").unwrap(), + ip_address: "192.168.1.3".to_string(), + port: 8082, + status: NodeStatus::Dead, + task_id: None, + task_state: None, + version: None, + }, + OrchestratorNode { + address: Address::from_str("0x0000000000000000000000000000000000000002").unwrap(), + ip_address: "192.168.1.2".to_string(), + port: 8081, + status: NodeStatus::Discovered, + task_id: None, + task_state: None, + version: None, + }, + OrchestratorNode { + address: Address::from_str("0x0000000000000000000000000000000000000001").unwrap(), + ip_address: "192.168.1.1".to_string(), + port: 8080, + status: NodeStatus::Healthy, + task_id: None, + task_state: None, + version: None, + }, + ]; + for node in nodes { + node_store.add_node(node); + } + + let nodes = node_store.get_nodes(); + assert_eq!(nodes.len(), 3); + assert_eq!( + nodes[0].address, + Address::from_str("0x0000000000000000000000000000000000000001").unwrap() + ); + assert_eq!( + nodes[1].address, + Address::from_str("0x0000000000000000000000000000000000000002").unwrap() + ); + assert_eq!( + nodes[2].address, + Address::from_str("0x0000000000000000000000000000000000000003").unwrap() + ); + } } diff --git a/shared/src/models/heartbeat.rs b/shared/src/models/heartbeat.rs index de973b6..140be5f 100644 --- a/shared/src/models/heartbeat.rs +++ b/shared/src/models/heartbeat.rs @@ -20,10 +20,12 @@ impl From for HttpResponse { } } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct HeartbeatRequest { pub address: String, pub task_id: Option, pub task_state: Option, pub metrics: Option>, + #[serde(default)] + pub version: Option, } diff --git a/shared/src/models/metric.rs b/shared/src/models/metric.rs index 1a1989e..431c938 100644 --- a/shared/src/models/metric.rs +++ b/shared/src/models/metric.rs @@ -1,7 +1,7 @@ use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct MetricEntry { pub key: MetricKey, pub value: f64, From b89fb5027434b443671378a6cc278aaedb8b71f7 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Wed, 5 Feb 2025 15:33:08 +0100 Subject: [PATCH 3/5] bump version --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea14aad..f9bdaf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2211,7 +2211,7 @@ dependencies = [ [[package]] name = "discovery" -version = "0.1.1" +version = "0.1.2" dependencies = [ "actix-web", "alloy", @@ -3596,7 +3596,7 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miner" -version = "0.1.1" +version = "0.1.2" dependencies = [ "actix-web", "alloy", @@ -4020,7 +4020,7 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestrator" -version = "0.1.1" +version = "0.1.2" dependencies = [ "actix-web", "alloy", @@ -5868,7 +5868,7 @@ dependencies = [ [[package]] name = "validator" -version = "0.1.1" +version = "0.1.2" dependencies = [ "actix-web", "alloy", diff --git a/Cargo.toml b/Cargo.toml index dd9ed70..a270743 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["discovery", "miner", "validator", "shared", "orchestrator", "dev-uti resolver = "2" [workspace.package] -version = "0.1.1" +version = "0.1.2" edition = "2021" [workspace.features] From aeb2077b2024d1dc0cd360db9afe61604203a90e Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Wed, 5 Feb 2025 17:10:57 +0100 Subject: [PATCH 4/5] remove log and ajust compute unit value --- miner/src/operations/compute_node.rs | 2 +- orchestrator/src/node/status_update.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/miner/src/operations/compute_node.rs b/miner/src/operations/compute_node.rs index e90e5ee..3f3208c 100644 --- a/miner/src/operations/compute_node.rs +++ b/miner/src/operations/compute_node.rs @@ -77,7 +77,7 @@ impl<'c> ComputeNodeOperations<'c> { .as_bytes(); // Create the signature bytes - let compute_units: U256 = U256::from(10); + let compute_units: U256 = U256::from(1000); let add_node_tx = self .prime_network .add_compute_node(node_address, compute_units, signature.to_vec()) diff --git a/orchestrator/src/node/status_update.rs b/orchestrator/src/node/status_update.rs index 0d66402..386e968 100644 --- a/orchestrator/src/node/status_update.rs +++ b/orchestrator/src/node/status_update.rs @@ -52,7 +52,6 @@ impl NodeStatusUpdater { let nodes = self.store_context.node_store.get_nodes(); for node in nodes { if node.status == NodeStatus::Dead { - println!("Node is dead, checking if we need to remove from chain"); let node_in_pool: bool = match self .contracts .compute_pool From bbf77f13079762b02c538994b414fec052e13c77 Mon Sep 17 00:00:00 2001 From: Jannik Straube Date: Wed, 5 Feb 2025 17:18:43 +0100 Subject: [PATCH 5/5] fmt --- miner/src/cli/command.rs | 11 ++++++++++- miner/src/operations/compute_node.rs | 7 +++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/miner/src/cli/command.rs b/miner/src/cli/command.rs index 6e03589..b21cc79 100644 --- a/miner/src/cli/command.rs +++ b/miner/src/cli/command.rs @@ -285,7 +285,16 @@ pub async fn execute_command( std::process::exit(1); }; - match compute_node_ops.add_compute_node().await { + let gpu_count: u32 = match &node_config.compute_specs { + Some(specs) => specs + .gpu + .as_ref() + .map(|gpu| gpu.count.unwrap_or(0)) + .unwrap_or(0), + None => 0, + }; + + match compute_node_ops.add_compute_node(gpu_count).await { Ok(added_node) => { if added_node { // If we are adding a new compute node we wait for a proper diff --git a/miner/src/operations/compute_node.rs b/miner/src/operations/compute_node.rs index 3f3208c..d53106f 100644 --- a/miner/src/operations/compute_node.rs +++ b/miner/src/operations/compute_node.rs @@ -29,7 +29,10 @@ impl<'c> ComputeNodeOperations<'c> { } // Returns true if the compute node was added, false if it already exists - pub async fn add_compute_node(&self) -> Result> { + pub async fn add_compute_node( + &self, + gpu_count: u32, + ) -> Result> { Console::section("🔄 Adding compute node"); let compute_node = self .compute_registry @@ -77,7 +80,7 @@ impl<'c> ComputeNodeOperations<'c> { .as_bytes(); // Create the signature bytes - let compute_units: U256 = U256::from(1000); + let compute_units: U256 = U256::from(1000 * gpu_count); let add_node_tx = self .prime_network .add_compute_node(node_address, compute_units, signature.to_vec())