Skip to content

Commit fdc599a

Browse files
committed
improve task bridge setup, remove hardcoded endpoint
1 parent d4e037e commit fdc599a

File tree

9 files changed

+241
-156
lines changed

9 files changed

+241
-156
lines changed

worker/src/cli/command.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::operations::heartbeat::service::HeartbeatService;
1010
use crate::operations::provider::ProviderError;
1111
use crate::operations::provider::ProviderOperations;
1212
use crate::services::discovery::DiscoveryService;
13+
use crate::state::system_state::SystemState;
1314
use crate::TaskHandles;
1415
use alloy::primitives::U256;
1516
use clap::{Parser, Subcommand};
@@ -220,6 +221,10 @@ pub async fn execute_command(
220221
}
221222
};
222223

224+
let state = Arc::new(SystemState::new(
225+
state_dir_overwrite.clone(),
226+
*disable_state_storing,
227+
));
223228
let metrics_store = Arc::new(MetricsStore::new());
224229
let heartbeat_metrics_clone = metrics_store.clone();
225230
let bridge_contracts = contracts.clone();
@@ -236,6 +241,7 @@ pub async fn execute_command(
236241
Some(node_config.clone()),
237242
Some(bridge_wallet),
238243
docker_storage_path.clone(),
244+
state.clone(),
239245
));
240246

241247
let system_memory = node_config
@@ -262,13 +268,12 @@ pub async fn execute_command(
262268
});
263269
let heartbeat_service = HeartbeatService::new(
264270
Duration::from_secs(10),
265-
state_dir_overwrite.clone(),
266-
*disable_state_storing,
267271
cancellation_token.clone(),
268272
task_handles.clone(),
269273
node_wallet_instance.clone(),
270274
docker_service.clone(),
271275
heartbeat_metrics_clone.clone(),
276+
state,
272277
);
273278

274279
let mut attempts = 0;

worker/src/docker/taskbridge/bridge.rs

+35-130
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use crate::state::system_state::SystemState;
2+
use alloy::primitives::{Address, U256};
3+
use anyhow::Result;
4+
use log::{debug, error, info};
5+
use reqwest::header::HeaderValue;
6+
use reqwest::Client;
7+
use serde::{Deserialize, Serialize};
8+
use shared::models::node::Node;
9+
use shared::security::request_signer::sign_request;
10+
use shared::web3::contracts::core::builder::Contracts;
11+
use shared::web3::wallet::Wallet;
12+
use std::str::FromStr;
13+
use std::sync::Arc;
14+
15+
#[derive(Deserialize, Serialize, Debug)]
16+
pub struct RequestUploadRequest {
17+
pub file_name: String,
18+
pub file_size: u64,
19+
pub file_type: String,
20+
}
21+
22+
/// Handles a file upload request
23+
pub async fn handle_file_upload(
24+
storage_path: &str,
25+
task_id: &str,
26+
file_name: &str,
27+
wallet: &Arc<Wallet>,
28+
state: &Arc<SystemState>,
29+
) -> Result<()> {
30+
println!("Starting file upload handler...");
31+
info!("📄 Received file upload request: {}", file_name);
32+
33+
// Get orchestrator endpoint
34+
println!("Getting orchestrator endpoint...");
35+
let endpoint = state
36+
.get_heartbeat_endpoint()
37+
.await
38+
.ok_or_else(|| {
39+
error!("Orchestrator endpoint is not set - cannot upload file.");
40+
anyhow::anyhow!("Orchestrator endpoint not set")
41+
})?
42+
.replace("/heartbeat", "");
43+
println!("Got endpoint: {}", endpoint);
44+
45+
// Construct file path
46+
let file = format!("{}/prime-task-{}/{}", storage_path, task_id, file_name);
47+
println!("Constructed file path: {}", file);
48+
debug!("File: {:?}", file);
49+
50+
// Get file size
51+
println!("Getting file size...");
52+
let file_size = std::fs::metadata(&file).map(|m| m.len()).unwrap_or(0);
53+
54+
// Calculate SHA
55+
println!("Calculating file SHA...");
56+
let file_sha = tokio::fs::read(&file)
57+
.await
58+
.map(|contents| {
59+
use sha2::{Digest, Sha256};
60+
let mut hasher = Sha256::new();
61+
hasher.update(&contents);
62+
format!("{:x}", hasher.finalize())
63+
})
64+
.unwrap_or_else(|e| {
65+
error!("Failed to calculate file SHA: {}", e);
66+
String::new()
67+
});
68+
69+
debug!("File size: {:?}", file_size);
70+
debug!("File SHA: {}", file_sha);
71+
println!("File size: {}, SHA: {}", file_size, file_sha);
72+
73+
// Create upload request
74+
println!("Creating upload request...");
75+
let client = Client::new();
76+
let request = RequestUploadRequest {
77+
file_name: file_sha.to_string(),
78+
file_size,
79+
file_type: "application/json".to_string(), // Assume JSON
80+
};
81+
82+
// Sign request
83+
println!("Signing request...");
84+
let request_value = serde_json::to_value(&request)?;
85+
let signature = sign_request("/storage/request-upload", wallet, Some(&request_value))
86+
.await
87+
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
88+
println!("Request signed with signature: {}", signature);
89+
90+
// Prepare headers
91+
println!("Preparing request headers...");
92+
let mut headers = reqwest::header::HeaderMap::new();
93+
headers.insert(
94+
"x-address",
95+
HeaderValue::from_str(&wallet.address().to_string())?,
96+
);
97+
headers.insert("x-signature", HeaderValue::from_str(&signature)?);
98+
99+
// Create upload URL
100+
let upload_url = format!("{}/storage/request-upload", endpoint);
101+
println!("Upload URL: {}", upload_url);
102+
103+
// Send request
104+
println!("Sending request to get signed URL...");
105+
let response = client
106+
.post(&upload_url)
107+
.json(&request)
108+
.headers(headers)
109+
.send()
110+
.await?;
111+
112+
println!("Response: {:?}", response);
113+
// Process response
114+
let json = response.json::<serde_json::Value>().await?;
115+
println!("Response JSON: {:?}", json);
116+
117+
if let Some(signed_url) = json["signed_url"].as_str() {
118+
info!("Got signed URL for upload: {}", signed_url);
119+
println!("Got signed URL: {}", signed_url);
120+
121+
// Read file contents
122+
println!("Reading file contents...");
123+
let file_contents = tokio::fs::read(&file).await?;
124+
println!("File contents size: {} bytes", file_contents.len());
125+
126+
// Upload file to S3 using signed URL
127+
println!("Uploading file to S3...");
128+
let upload_response = client
129+
.put(signed_url)
130+
.body(file_contents)
131+
.header("Content-Type", "application/json")
132+
.send()
133+
.await?;
134+
135+
println!("S3 upload response: {:?}", upload_response);
136+
info!("Successfully uploaded file to S3");
137+
} else {
138+
println!("Error: Missing signed_url in response");
139+
return Err(anyhow::anyhow!("Missing signed_url in response"));
140+
}
141+
142+
println!("File upload completed successfully");
143+
Ok(())
144+
}
145+
146+
/// Handles a file validation request
147+
pub async fn handle_file_validation(
148+
file_sha: &str,
149+
contracts: &Arc<Contracts>,
150+
node: &Node,
151+
) -> Result<()> {
152+
info!("📄 Received file SHA for validation: {}", file_sha);
153+
154+
let pool_id = node.compute_pool_id;
155+
let node_address = &node.id;
156+
157+
let decoded_sha = hex::decode(file_sha)?;
158+
debug!(
159+
"Decoded file sha: {:?} ({} bytes)",
160+
decoded_sha,
161+
decoded_sha.len()
162+
);
163+
164+
let result = contracts
165+
.compute_pool
166+
.submit_work(
167+
U256::from(pool_id),
168+
Address::from_str(node_address)?,
169+
decoded_sha.to_vec(),
170+
)
171+
.await;
172+
173+
debug!("Submit work result: {:?}", result);
174+
175+
Ok(())
176+
}

worker/src/docker/taskbridge/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod bridge;
2+
pub mod file_handler;
23

34
pub use bridge::TaskBridge;

worker/src/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod docker;
66
mod metrics;
77
mod operations;
88
mod services;
9+
mod state;
910
use clap::Parser;
1011
use cli::{execute_command, Cli};
1112
use log::{debug, LevelFilter};
-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
pub mod service;
2-
pub mod state;

worker/src/operations/heartbeat/service.rs

+5-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use super::state::HeartbeatState;
21
use crate::console::Console;
32
use crate::docker::DockerService;
43
use crate::metrics::store::MetricsStore;
4+
use crate::state::system_state::SystemState;
55
use crate::TaskHandles;
66
use log;
77
use log::info;
@@ -15,7 +15,7 @@ use tokio::time::{interval, Duration};
1515
use tokio_util::sync::CancellationToken;
1616
#[derive(Clone)]
1717
pub struct HeartbeatService {
18-
state: HeartbeatState,
18+
state: Arc<SystemState>,
1919
interval: Duration,
2020
client: Client,
2121
cancellation_token: CancellationToken,
@@ -36,16 +36,13 @@ impl HeartbeatService {
3636
#[allow(clippy::too_many_arguments)]
3737
pub fn new(
3838
interval: Duration,
39-
state_dir_overwrite: Option<String>,
40-
disable_state_storing: bool,
4139
cancellation_token: CancellationToken,
4240
task_handles: TaskHandles,
4341
node_wallet: Arc<Wallet>,
4442
docker_service: Arc<DockerService>,
4543
metrics_store: Arc<MetricsStore>,
44+
state: Arc<SystemState>,
4645
) -> Result<Arc<Self>, HeartbeatError> {
47-
let state = HeartbeatState::new(state_dir_overwrite.or(None), disable_state_storing);
48-
4946
let client = Client::builder()
5047
.timeout(Duration::from_secs(5)) // 5 second timeout
5148
.build()
@@ -64,7 +61,7 @@ impl HeartbeatService {
6461
}
6562

6663
pub async fn activate_heartbeat_if_endpoint_exists(&self) {
67-
if let Some(endpoint) = self.state.get_endpoint().await {
64+
if let Some(endpoint) = self.state.get_heartbeat_endpoint().await {
6865
info!("Starting heartbeat from recovered state");
6966
self.start(endpoint).await.unwrap();
7067
}
@@ -91,7 +88,7 @@ impl HeartbeatService {
9188
if !state.is_running().await {
9289
break;
9390
}
94-
match Self::send_heartbeat(&client, state.get_endpoint().await, wallet_clone.clone(), docker_service.clone(), metrics_store.clone()).await {
91+
match Self::send_heartbeat(&client, state.get_heartbeat_endpoint().await, wallet_clone.clone(), docker_service.clone(), metrics_store.clone()).await {
9592
Ok(_) => {
9693
state.update_last_heartbeat().await;
9794
log::info!("Synced with orchestrator"); // Updated message to reflect sync

worker/src/state/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod system_state;

0 commit comments

Comments
 (0)