Skip to content

Commit 2416436

Browse files
authored
Merge pull request #104 from PrimeIntellect-ai/feature/orchestrator-metrics-api
Feature/orchestrator metrics api
2 parents e7dbec0 + b89fb50 commit 2416436

File tree

13 files changed

+264
-13
lines changed

13 files changed

+264
-13
lines changed

Cargo.lock

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["discovery", "miner", "validator", "shared", "orchestrator", "dev-uti
33
resolver = "2"
44

55
[workspace.package]
6-
version = "0.1.1"
6+
version = "0.1.2"
77
edition = "2021"
88

99
[workspace.features]

miner/src/cli/command.rs

+2
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ pub async fn execute_command(
113113
}
114114

115115
let mut recover_last_state = *auto_recover;
116+
let version = env!("CARGO_PKG_VERSION");
116117
Console::section("🚀 PRIME MINER INITIALIZATION");
118+
Console::info("Version:", version);
117119
/*
118120
Initialize Wallet instances
119121
*/

miner/src/operations/heartbeat/service.rs

+2
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,15 @@ impl HeartbeatService {
143143
task_id: Some(task.id.to_string()),
144144
task_state: Some(task.state.to_string()),
145145
metrics: Some(metrics_for_task),
146+
version: Some(env!("CARGO_PKG_VERSION").to_string()),
146147
}
147148
} else {
148149
HeartbeatRequest {
149150
address: wallet.address().to_string(),
150151
task_id: None,
151152
task_state: None,
152153
metrics: None,
154+
version: Some(env!("CARGO_PKG_VERSION").to_string()),
153155
}
154156
};
155157

orchestrator/src/api/routes/heartbeat.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,16 @@ mod tests {
7474
.store_context
7575
.heartbeat_store
7676
.get_heartbeat(&node_address);
77-
assert_eq!(value, Some("{\"address\":\"0x0000000000000000000000000000000000000000\",\"task_id\":null,\"task_state\":null,\"metrics\":null}".to_string()));
77+
assert_eq!(
78+
value,
79+
Some(HeartbeatRequest {
80+
address: "0x0000000000000000000000000000000000000000".to_string(),
81+
task_id: None,
82+
task_state: None,
83+
metrics: None,
84+
version: None
85+
})
86+
);
7887
}
7988

8089
#[actix_web::test]
@@ -120,6 +129,15 @@ mod tests {
120129
.heartbeat_store
121130
.get_heartbeat(&node_address);
122131
// Task has not started yet
123-
assert_eq!(value, Some("{\"address\":\"0x0000000000000000000000000000000000000000\",\"task_id\":null,\"task_state\":null,\"metrics\":null}".to_string()));
132+
133+
let value = value.unwrap();
134+
let heartbeat = HeartbeatRequest {
135+
address: "0x0000000000000000000000000000000000000000".to_string(),
136+
task_id: None,
137+
task_state: None,
138+
metrics: None,
139+
version: None,
140+
};
141+
assert_eq!(value, heartbeat);
124142
}
125143
}

orchestrator/src/api/routes/nodes.rs

+44
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,22 @@ async fn get_node_logs(node_id: web::Path<String>, app_state: Data<AppState>) ->
133133
}
134134
}
135135

136+
async fn get_node_metrics(node_id: web::Path<String>, app_state: Data<AppState>) -> HttpResponse {
137+
println!("get_node_metrics: {}", node_id);
138+
let node_address = Address::from_str(&node_id).unwrap();
139+
let metrics = app_state
140+
.store_context
141+
.metrics_store
142+
.get_metrics_for_node(node_address);
143+
HttpResponse::Ok().json(json!({"success": true, "metrics": metrics}))
144+
}
145+
136146
pub fn nodes_routes() -> Scope {
137147
web::scope("/nodes")
138148
.route("", get().to(get_nodes))
139149
.route("/{node_id}/restart", post().to(restart_node_task))
140150
.route("/{node_id}/logs", get().to(get_node_logs))
151+
.route("/{node_id}/metrics", get().to(get_node_metrics))
141152
}
142153

143154
#[cfg(test)]
@@ -168,6 +179,7 @@ mod tests {
168179
status: NodeStatus::Discovered,
169180
task_id: None,
170181
task_state: None,
182+
version: None,
171183
};
172184
app_state.store_context.node_store.add_node(node.clone());
173185

@@ -201,4 +213,36 @@ mod tests {
201213
nodes_array[0]["address"]
202214
);
203215
}
216+
217+
#[actix_web::test]
218+
async fn test_get_metrics_for_node_not_exist() {
219+
let app_state = create_test_app_state().await;
220+
let app = test::init_service(
221+
App::new()
222+
.app_data(app_state.clone())
223+
.route("/nodes/{node_id}/metrics", get().to(get_node_metrics)),
224+
)
225+
.await;
226+
227+
let node_id = "0x0000000000000000000000000000000000000000";
228+
let req = test::TestRequest::get()
229+
.uri(&format!("/nodes/{}/metrics", node_id))
230+
.to_request();
231+
let resp = test::call_service(&app, req).await;
232+
233+
let body = test::read_body(resp).await;
234+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
235+
println!("json {:?}", json);
236+
assert_eq!(
237+
json["success"], true,
238+
"Expected success to be true but got {:?}",
239+
json["success"]
240+
);
241+
assert_eq!(
242+
json["metrics"],
243+
json!({}),
244+
"Expected empty metrics object but got {:?}",
245+
json["metrics"]
246+
);
247+
}
204248
}

orchestrator/src/models/node.rs

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub struct OrchestratorNode {
1212

1313
pub task_id: Option<String>,
1414
pub task_state: Option<TaskState>,
15+
pub version: Option<String>,
1516
}
1617

1718
impl From<DiscoveryNode> for OrchestratorNode {
@@ -23,6 +24,7 @@ impl From<DiscoveryNode> for OrchestratorNode {
2324
status: NodeStatus::Discovered,
2425
task_id: None,
2526
task_state: None,
27+
version: None,
2628
}
2729
}
2830
}

orchestrator/src/node/status_update.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,17 @@ impl NodeStatusUpdater {
9898
.heartbeat_store
9999
.get_unhealthy_counter(&node.address);
100100
match heartbeat {
101-
Some(_) => {
101+
Some(beat) => {
102102
// We have a heartbeat
103+
if let Some(version) = &beat.version {
104+
if node.version.as_ref() != Some(version) {
105+
let _: () = self
106+
.store_context
107+
.node_store
108+
.update_node_version(&node.address, version);
109+
}
110+
}
111+
103112
if node.status == NodeStatus::Unhealthy
104113
|| node.status == NodeStatus::WaitingForHeartbeat
105114
{
@@ -181,6 +190,7 @@ mod tests {
181190
status: NodeStatus::WaitingForHeartbeat,
182191
task_id: None,
183192
task_state: None,
193+
version: None,
184194
};
185195

186196
let _: () = app_state.store_context.node_store.add_node(node.clone());
@@ -189,6 +199,7 @@ mod tests {
189199
task_id: None,
190200
task_state: None,
191201
metrics: None,
202+
version: Some(env!("CARGO_PKG_VERSION").to_string()),
192203
};
193204
let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat);
194205

@@ -228,6 +239,7 @@ mod tests {
228239
status: NodeStatus::Healthy,
229240
task_id: None,
230241
task_state: None,
242+
version: None,
231243
};
232244

233245
let _: () = app_state.store_context.node_store.add_node(node.clone());
@@ -267,6 +279,7 @@ mod tests {
267279
status: NodeStatus::Unhealthy,
268280
task_id: None,
269281
task_state: None,
282+
version: None,
270283
};
271284

272285
let _: () = app_state.store_context.node_store.add_node(node.clone());
@@ -311,6 +324,7 @@ mod tests {
311324
status: NodeStatus::Unhealthy,
312325
task_id: None,
313326
task_state: None,
327+
version: None,
314328
};
315329

316330
let _: () = app_state.store_context.node_store.add_node(node.clone());
@@ -355,6 +369,7 @@ mod tests {
355369
status: NodeStatus::Unhealthy,
356370
task_id: None,
357371
task_state: None,
372+
version: None,
358373
};
359374
let _: () = app_state
360375
.store_context
@@ -366,6 +381,7 @@ mod tests {
366381
task_id: None,
367382
task_state: None,
368383
metrics: None,
384+
version: Some(env!("CARGO_PKG_VERSION").to_string()),
369385
};
370386
let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat);
371387
let _: () = app_state.store_context.node_store.add_node(node.clone());
@@ -412,6 +428,7 @@ mod tests {
412428
status: NodeStatus::Unhealthy,
413429
task_id: None,
414430
task_state: None,
431+
version: None,
415432
};
416433
let _: () = app_state
417434
.store_context
@@ -426,6 +443,7 @@ mod tests {
426443
status: NodeStatus::Healthy,
427444
task_id: None,
428445
task_state: None,
446+
version: None,
429447
};
430448

431449
let _: () = app_state.store_context.node_store.add_node(node2.clone());
@@ -483,6 +501,7 @@ mod tests {
483501
status: NodeStatus::Unhealthy,
484502
task_id: None,
485503
task_state: None,
504+
version: None,
486505
};
487506

488507
let _: () = app_state.store_context.node_store.add_node(node.clone());
@@ -519,6 +538,7 @@ mod tests {
519538
task_id: None,
520539
task_state: None,
521540
metrics: None,
541+
version: Some(env!("CARGO_PKG_VERSION").to_string()),
522542
};
523543
let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat);
524544

orchestrator/src/store/domains/heartbeat_store.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ impl HeartbeatStore {
3232
.unwrap();
3333
}
3434

35-
pub fn get_heartbeat(&self, address: &Address) -> Option<String> {
35+
pub fn get_heartbeat(&self, address: &Address) -> Option<HeartbeatRequest> {
3636
let mut con = self.redis.client.get_connection().unwrap();
3737
let key = format!("{}:{}", ORCHESTRATOR_HEARTBEAT_KEY, address);
3838
let value: Option<String> = con.get(key).unwrap();
39-
value
39+
value.and_then(|v| serde_json::from_str(&v).ok())
4040
}
4141

4242
pub fn get_unhealthy_counter(&self, address: &Address) -> u32 {

0 commit comments

Comments
 (0)