Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v.0.1.2 #106

Merged
merged 7 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["discovery", "miner", "validator", "shared", "orchestrator", "dev-uti
resolver = "2"

[workspace.package]
version = "0.1.1"
version = "0.1.2"
edition = "2021"

[workspace.features]
Expand Down
13 changes: 12 additions & 1 deletion miner/src/cli/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ pub async fn execute_command(
}

let mut recover_last_state = *auto_recover;
let version = env!("CARGO_PKG_VERSION");
Console::section("🚀 PRIME MINER INITIALIZATION");
Console::info("Version:", version);
/*
Initialize Wallet instances
*/
Expand Down Expand Up @@ -283,7 +285,16 @@ pub async fn execute_command(
std::process::exit(1);
};

match compute_node_ops.add_compute_node().await {
let gpu_count: u32 = match &node_config.compute_specs {
Some(specs) => specs
.gpu
.as_ref()
.map(|gpu| gpu.count.unwrap_or(0))
.unwrap_or(0),
None => 0,
};

match compute_node_ops.add_compute_node(gpu_count).await {
Ok(added_node) => {
if added_node {
// If we are adding a new compute node we wait for a proper
Expand Down
7 changes: 5 additions & 2 deletions miner/src/operations/compute_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ impl<'c> ComputeNodeOperations<'c> {
}

// Returns true if the compute node was added, false if it already exists
pub async fn add_compute_node(&self) -> Result<bool, Box<dyn std::error::Error>> {
pub async fn add_compute_node(
&self,
gpu_count: u32,
) -> Result<bool, Box<dyn std::error::Error>> {
Console::section("🔄 Adding compute node");
let compute_node = self
.compute_registry
Expand Down Expand Up @@ -77,7 +80,7 @@ impl<'c> ComputeNodeOperations<'c> {
.as_bytes();

// Create the signature bytes
let compute_units: U256 = U256::from(10);
let compute_units: U256 = U256::from(1000 * gpu_count);
let add_node_tx = self
.prime_network
.add_compute_node(node_address, compute_units, signature.to_vec())
Expand Down
2 changes: 2 additions & 0 deletions miner/src/operations/heartbeat/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ impl HeartbeatService {
task_id: Some(task.id.to_string()),
task_state: Some(task.state.to_string()),
metrics: Some(metrics_for_task),
version: Some(env!("CARGO_PKG_VERSION").to_string()),
}
} else {
HeartbeatRequest {
address: wallet.address().to_string(),
task_id: None,
task_state: None,
metrics: None,
version: Some(env!("CARGO_PKG_VERSION").to_string()),
}
};

Expand Down
22 changes: 20 additions & 2 deletions orchestrator/src/api/routes/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ mod tests {
.store_context
.heartbeat_store
.get_heartbeat(&node_address);
assert_eq!(value, Some("{\"address\":\"0x0000000000000000000000000000000000000000\",\"task_id\":null,\"task_state\":null,\"metrics\":null}".to_string()));
assert_eq!(
value,
Some(HeartbeatRequest {
address: "0x0000000000000000000000000000000000000000".to_string(),
task_id: None,
task_state: None,
metrics: None,
version: None
})
);
}

#[actix_web::test]
Expand Down Expand Up @@ -120,6 +129,15 @@ mod tests {
.heartbeat_store
.get_heartbeat(&node_address);
// Task has not started yet
assert_eq!(value, Some("{\"address\":\"0x0000000000000000000000000000000000000000\",\"task_id\":null,\"task_state\":null,\"metrics\":null}".to_string()));

let value = value.unwrap();
let heartbeat = HeartbeatRequest {
address: "0x0000000000000000000000000000000000000000".to_string(),
task_id: None,
task_state: None,
metrics: None,
version: None,
};
assert_eq!(value, heartbeat);
}
}
44 changes: 44 additions & 0 deletions orchestrator/src/api/routes/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,22 @@ async fn get_node_logs(node_id: web::Path<String>, app_state: Data<AppState>) ->
}
}

async fn get_node_metrics(node_id: web::Path<String>, app_state: Data<AppState>) -> HttpResponse {
println!("get_node_metrics: {}", node_id);
let node_address = Address::from_str(&node_id).unwrap();
let metrics = app_state
.store_context
.metrics_store
.get_metrics_for_node(node_address);
HttpResponse::Ok().json(json!({"success": true, "metrics": metrics}))
}

pub fn nodes_routes() -> Scope {
web::scope("/nodes")
.route("", get().to(get_nodes))
.route("/{node_id}/restart", post().to(restart_node_task))
.route("/{node_id}/logs", get().to(get_node_logs))
.route("/{node_id}/metrics", get().to(get_node_metrics))
}

#[cfg(test)]
Expand Down Expand Up @@ -168,6 +179,7 @@ mod tests {
status: NodeStatus::Discovered,
task_id: None,
task_state: None,
version: None,
};
app_state.store_context.node_store.add_node(node.clone());

Expand Down Expand Up @@ -201,4 +213,36 @@ mod tests {
nodes_array[0]["address"]
);
}

#[actix_web::test]
async fn test_get_metrics_for_node_not_exist() {
let app_state = create_test_app_state().await;
let app = test::init_service(
App::new()
.app_data(app_state.clone())
.route("/nodes/{node_id}/metrics", get().to(get_node_metrics)),
)
.await;

let node_id = "0x0000000000000000000000000000000000000000";
let req = test::TestRequest::get()
.uri(&format!("/nodes/{}/metrics", node_id))
.to_request();
let resp = test::call_service(&app, req).await;

let body = test::read_body(resp).await;
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
println!("json {:?}", json);
assert_eq!(
json["success"], true,
"Expected success to be true but got {:?}",
json["success"]
);
assert_eq!(
json["metrics"],
json!({}),
"Expected empty metrics object but got {:?}",
json["metrics"]
);
}
}
2 changes: 2 additions & 0 deletions orchestrator/src/models/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct OrchestratorNode {

pub task_id: Option<String>,
pub task_state: Option<TaskState>,
pub version: Option<String>,
}

impl From<DiscoveryNode> for OrchestratorNode {
Expand All @@ -23,6 +24,7 @@ impl From<DiscoveryNode> for OrchestratorNode {
status: NodeStatus::Discovered,
task_id: None,
task_state: None,
version: None,
}
}
}
Expand Down
23 changes: 21 additions & 2 deletions orchestrator/src/node/status_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl NodeStatusUpdater {
let nodes = self.store_context.node_store.get_nodes();
for node in nodes {
if node.status == NodeStatus::Dead {
println!("Node is dead, checking if we need to remove from chain");
let node_in_pool: bool = match self
.contracts
.compute_pool
Expand Down Expand Up @@ -98,8 +97,17 @@ impl NodeStatusUpdater {
.heartbeat_store
.get_unhealthy_counter(&node.address);
match heartbeat {
Some(_) => {
Some(beat) => {
// We have a heartbeat
if let Some(version) = &beat.version {
if node.version.as_ref() != Some(version) {
let _: () = self
.store_context
.node_store
.update_node_version(&node.address, version);
}
}

if node.status == NodeStatus::Unhealthy
|| node.status == NodeStatus::WaitingForHeartbeat
{
Expand Down Expand Up @@ -181,6 +189,7 @@ mod tests {
status: NodeStatus::WaitingForHeartbeat,
task_id: None,
task_state: None,
version: None,
};

let _: () = app_state.store_context.node_store.add_node(node.clone());
Expand All @@ -189,6 +198,7 @@ mod tests {
task_id: None,
task_state: None,
metrics: None,
version: Some(env!("CARGO_PKG_VERSION").to_string()),
};
let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat);

Expand Down Expand Up @@ -228,6 +238,7 @@ mod tests {
status: NodeStatus::Healthy,
task_id: None,
task_state: None,
version: None,
};

let _: () = app_state.store_context.node_store.add_node(node.clone());
Expand Down Expand Up @@ -267,6 +278,7 @@ mod tests {
status: NodeStatus::Unhealthy,
task_id: None,
task_state: None,
version: None,
};

let _: () = app_state.store_context.node_store.add_node(node.clone());
Expand Down Expand Up @@ -311,6 +323,7 @@ mod tests {
status: NodeStatus::Unhealthy,
task_id: None,
task_state: None,
version: None,
};

let _: () = app_state.store_context.node_store.add_node(node.clone());
Expand Down Expand Up @@ -355,6 +368,7 @@ mod tests {
status: NodeStatus::Unhealthy,
task_id: None,
task_state: None,
version: None,
};
let _: () = app_state
.store_context
Expand All @@ -366,6 +380,7 @@ mod tests {
task_id: None,
task_state: None,
metrics: None,
version: Some(env!("CARGO_PKG_VERSION").to_string()),
};
let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat);
let _: () = app_state.store_context.node_store.add_node(node.clone());
Expand Down Expand Up @@ -412,6 +427,7 @@ mod tests {
status: NodeStatus::Unhealthy,
task_id: None,
task_state: None,
version: None,
};
let _: () = app_state
.store_context
Expand All @@ -426,6 +442,7 @@ mod tests {
status: NodeStatus::Healthy,
task_id: None,
task_state: None,
version: None,
};

let _: () = app_state.store_context.node_store.add_node(node2.clone());
Expand Down Expand Up @@ -483,6 +500,7 @@ mod tests {
status: NodeStatus::Unhealthy,
task_id: None,
task_state: None,
version: None,
};

let _: () = app_state.store_context.node_store.add_node(node.clone());
Expand Down Expand Up @@ -519,6 +537,7 @@ mod tests {
task_id: None,
task_state: None,
metrics: None,
version: Some(env!("CARGO_PKG_VERSION").to_string()),
};
let _: () = app_state.store_context.heartbeat_store.beat(&heartbeat);

Expand Down
4 changes: 2 additions & 2 deletions orchestrator/src/store/domains/heartbeat_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ impl HeartbeatStore {
.unwrap();
}

pub fn get_heartbeat(&self, address: &Address) -> Option<String> {
pub fn get_heartbeat(&self, address: &Address) -> Option<HeartbeatRequest> {
let mut con = self.redis.client.get_connection().unwrap();
let key = format!("{}:{}", ORCHESTRATOR_HEARTBEAT_KEY, address);
let value: Option<String> = con.get(key).unwrap();
value
value.and_then(|v| serde_json::from_str(&v).ok())
}

pub fn get_unhealthy_counter(&self, address: &Address) -> u32 {
Expand Down
Loading
Loading