diff --git a/Cargo.lock b/Cargo.lock index 48d33658..839f2417 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4211,6 +4211,7 @@ dependencies = [ "libc", "libsysinspect", "log", + "once_cell", "rand", "rsa", "rustls", diff --git a/libsysinspect/src/proto/query.rs b/libsysinspect/src/proto/query.rs index bccf776c..14faa7a8 100644 --- a/libsysinspect/src/proto/query.rs +++ b/libsysinspect/src/proto/query.rs @@ -1,9 +1,22 @@ -use std::sync::{Arc, Mutex}; - use crate::SysinspectError; +use std::sync::{Arc, Mutex}; /// Targeting schemes pub static SCHEME_MODEL: &str = "model://"; +pub static SCHEME_COMMAND: &str = "cmd://"; + +pub mod commands { + // Stop the entire cluster + pub const CLUSTER_SHUTDOWN: &str = "cluster/shutdown"; + + // Restart the entire cluster + // TODO: Not implemented yet + pub const CLUSTER_REBOOT: &str = "cluster/reboot"; + + // Rotate RSA/AES on the entire cluster + // TODO: Not implemented yet + pub const CLUSTER_ROTATE: &str = "cluster/rotate"; +} /// /// Query parser (scheme). diff --git a/libsysinspect/src/proto/rqtypes.rs b/libsysinspect/src/proto/rqtypes.rs index 025edfb6..8ac159f0 100644 --- a/libsysinspect/src/proto/rqtypes.rs +++ b/libsysinspect/src/proto/rqtypes.rs @@ -26,6 +26,14 @@ pub enum RequestType { #[serde(rename = "ehlo")] Ehlo, + /// Bye + #[serde(rename = "b")] + Bye, + + /// Bye Ack + #[serde(rename = "ba")] + ByeAck, + /// Retry connect (e.g. after the registration) #[serde(rename = "retry")] Reconnect, diff --git a/src/clidef.rs b/src/clidef.rs index 52da86a4..95c02e50 100644 --- a/src/clidef.rs +++ b/src/clidef.rs @@ -68,6 +68,12 @@ pub fn cli(version: &'static str) -> Command { .long("state") .help("Specify a state to be processed. If none specified, default is taken ($)") ) + .arg( + Arg::new("shutdown") + .long("shutdown") + .action(ArgAction::SetTrue) + .help(format!("Notify the running master to shutdown the {}, be careful! :)", "entire cluster".bright_red())) + ) .next_help_heading("Info") .arg( diff --git a/src/main.rs b/src/main.rs index 14c04dcc..754dd917 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use libsysinspect::{ cfg::{mmconf::MasterConfig, select_config_path}, inspector::SysInspectRunner, logger, + proto::query::{commands::CLUSTER_SHUTDOWN, SCHEME_COMMAND}, reactor::handlers, traits::get_minion_traits, SysinspectError, @@ -103,6 +104,10 @@ fn main() { if let Err(err) = call_master_fifo(model, query.unwrap_or(&"".to_string()), traits, &cfg.socket()) { log::error!("Cannot reach master: {err}"); } + } else if params.get_flag("shutdown") { + if let Err(err) = call_master_fifo(&format!("{}{}", SCHEME_COMMAND, CLUSTER_SHUTDOWN), "*", None, &cfg.socket()) { + log::error!("Cannot reach master: {err}"); + } } else if let Some(mpath) = params.get_one::("model") { let mut sr = SysInspectRunner::new(None); sr.set_model_path(mpath); diff --git a/sysmaster/Cargo.toml b/sysmaster/Cargo.toml index 701b4323..a4553597 100644 --- a/sysmaster/Cargo.toml +++ b/sysmaster/Cargo.toml @@ -33,3 +33,4 @@ sled = "0.34.7" rsa = { version = "0.9.6", features = ["pkcs5", "sha1", "sha2"] } uuid = { version = "1.11.0", features = ["v4"] } actix-web = "4.9.0" +once_cell = "1.20.2" diff --git a/sysmaster/src/master.rs b/sysmaster/src/master.rs index 2ecbee3e..761b8a96 100644 --- a/sysmaster/src/master.rs +++ b/sysmaster/src/master.rs @@ -16,6 +16,7 @@ use libsysinspect::{ util::iofs::scan_files_sha256, SysinspectError, }; +use once_cell::sync::Lazy; use serde_json::json; use std::{ collections::{HashMap, HashSet}, @@ -32,6 +33,9 @@ use tokio::{ time, }; +// Session singleton +static SHARED_SESSION: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(SessionKeeper::new(30)))); + #[derive(Debug)] pub struct SysMaster { cfg: MasterConfig, @@ -39,7 +43,7 @@ pub struct SysMaster { mkr: MinionsKeyRegistry, mreg: MinionRegistry, to_drop: HashSet, - session: session::SessionKeeper, + session: Arc>, } impl SysMaster { @@ -47,7 +51,7 @@ impl SysMaster { let (tx, _) = broadcast::channel::>(100); let mkr = MinionsKeyRegistry::new(cfg.keyman_root())?; let mreg = MinionRegistry::new(cfg.minion_registry_root())?; - Ok(SysMaster { cfg, broadcast: tx, mkr, to_drop: HashSet::default(), session: SessionKeeper::new(30), mreg }) + Ok(SysMaster { cfg, broadcast: tx, mkr, to_drop: HashSet::default(), session: Arc::clone(&SHARED_SESSION), mreg }) } /// Open FIFO socket for command-line communication @@ -103,15 +107,14 @@ impl SysMaster { fn msg_query(&mut self, payload: &str) -> Option { let query = payload.split(";").map(|s| s.to_string()).collect::>(); - if let [scheme, query, traits] = query.as_slice() { + if let [querypath, query, traits] = query.as_slice() { let mut tgt = MinionTarget::default(); - tgt.set_scheme(scheme); + tgt.set_scheme(querypath); tgt.set_traits_query(traits); for hostname in query.split(",") { tgt.add_hostname(hostname); } - // Collect downloadable model(s) files let mut out: HashMap = HashMap::default(); for em in self.cfg.fileserver_models() { for (n, cs) in scan_files_sha256(self.cfg.fileserver_mdl_root(false).join(em), Some(MODEL_FILE_EXT)) { @@ -122,10 +125,15 @@ impl SysMaster { } } + let mut payload = String::from(""); + if tgt.scheme().eq(proto::query::SCHEME_COMMAND) { + payload = query.to_owned(); + } + let mut msg = MasterMessage::new( RequestType::Command, - json!(ModStatePayload::new("12345".to_string()) - .set_uri(scheme.to_string()) + json!(ModStatePayload::new(payload) + .set_uri(querypath.to_string()) .add_files(out) .set_models_root(self.cfg.fileserver_mdl_root(true).to_str().unwrap_or_default())), // TODO: SID part ); @@ -176,6 +184,18 @@ impl SysMaster { m } + fn msg_bye_ack(&mut self, mid: String, sid: String) -> MasterMessage { + let mut m = MasterMessage::new(RequestType::ByeAck, json!(sid)); + m.set_target(MinionTarget::new(&mid, &sid)); + m.set_retcode(ProtoErrorCode::Success); + + m + } + + pub fn get_session(&self) -> Arc> { + Arc::clone(&self.session) + } + /// Process incoming minion messages #[allow(clippy::while_let_loop)] pub async fn do_incoming(master: Arc>, mut rx: tokio::sync::mpsc::Receiver<(Vec, String)>) { @@ -226,7 +246,7 @@ impl SysMaster { log::info!("Minion at {minion_addr} ({}) is not registered", req.id()); guard.to_drop.insert(minion_addr); _ = c_bcast.send(guard.msg_not_registered(req.id().to_string()).sendable().unwrap()); - } else if guard.session.exists(&c_id) { + } else if guard.get_session().lock().await.exists(&c_id) { log::info!("Minion at {minion_addr} ({}) is already connected", req.id()); guard.to_drop.insert(minion_addr); _ = c_bcast.send( @@ -234,7 +254,7 @@ impl SysMaster { ); } else { log::info!("{} connected successfully", c_id); - guard.session.ping(&c_id, &c_payload); + guard.get_session().lock().await.ping(&c_id, Some(&c_payload)); _ = c_bcast .send(guard.msg_request_traits(req.id().to_string(), c_payload).sendable().unwrap()); log::info!("Syncing traits with minion at {}", c_id); @@ -245,11 +265,10 @@ impl SysMaster { RequestType::Pong => { let c_master = Arc::clone(&master); let c_id = req.id().to_string(); - let c_payload = req.payload().to_string(); tokio::spawn(async move { - let mut guard = c_master.lock().await; - guard.session.ping(&c_id, &c_payload); - let uptime = guard.session.uptime(req.id()).unwrap_or_default(); + let guard = c_master.lock().await; + guard.get_session().lock().await.ping(&c_id, None); + let uptime = guard.get_session().lock().await.uptime(req.id()).unwrap_or_default(); log::trace!( "Update last contacted for {} (alive for {:.2} min)", req.id().to_string(), @@ -268,6 +287,19 @@ impl SysMaster { guard.on_traits(c_id, c_payload).await; }); } + + RequestType::Bye => { + let c_master = Arc::clone(&master); + let c_bcast = bcast.clone(); + log::info!("Minion {} disconnects", req.id()); + tokio::spawn(async move { + let mut guard = c_master.lock().await; + guard.get_session().lock().await.remove(req.id()); + let m = guard.msg_bye_ack(req.id().to_string(), req.payload().to_string()); + _ = c_bcast.send(m.sendable().unwrap()); + }); + } + _ => { log::error!("Minion sends unknown request type"); } @@ -371,24 +403,26 @@ impl SysMaster { loop { if let Ok(msg) = bcast_sub.recv().await { log::trace!("Sending message to minion at {} length of {}", local_addr.to_string(), msg.len()); + let mut guard = c_master.lock().await; if writer.write_all(&(msg.len() as u32).to_be_bytes()).await.is_err() || writer.write_all(&msg).await.is_err() || writer.flush().await.is_err() { if let Err(err) = cancel_tx.send(true) { - log::error!("Sending cancel notification: {err}"); + log::debug!("Error sending cancel notification: {err}"); } break; } - if c_master.lock().await.to_drop.contains(&local_addr.to_string()) { - c_master.lock().await.to_drop.remove(&local_addr.to_string()); + if guard.to_drop.contains(&local_addr.to_string()) { + guard.to_drop.remove(&local_addr.to_string()); log::info!("Dropping minion: {}", &local_addr.to_string()); + log::info!(""); if let Err(err) = writer.shutdown().await { log::error!("Error shutting down outgoing: {err}"); } if let Err(err) = cancel_tx.send(true) { - log::error!("Sending cancel notification: {err}"); + log::debug!("Error sending cancel notification: {err}"); } return; diff --git a/sysmaster/src/registry/session.rs b/sysmaster/src/registry/session.rs index c966f814..476997c4 100644 --- a/sysmaster/src/registry/session.rs +++ b/sysmaster/src/registry/session.rs @@ -4,7 +4,10 @@ Keeps connected minions and updates their uptime via heartbeat. This prevents simultaenous connection of multiple minions on the same machine. */ -use std::{collections::HashMap, time::Instant}; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; #[derive(Debug, Clone)] struct Session { @@ -33,6 +36,10 @@ impl Session { pub fn session_id(&self) -> String { self.sid.to_string() } + + pub fn expire(&mut self) { + self.last -= Duration::from_secs(90); + } } #[derive(Debug, Default, Clone)] @@ -55,8 +62,13 @@ impl SessionKeeper { } /// Create a new session or update the existing - pub fn ping(&mut self, mid: &str, sid: &str) { - self.sessions.entry(mid.to_string()).or_insert_with(|| Session::new(sid)).update(); + pub fn ping(&mut self, mid: &str, sid: Option<&str>) { + if let Some(sid) = sid { + self.sessions.entry(mid.to_string()).or_insert_with(|| Session::new(sid)).update(); + } else if let Some(session) = self.sessions.get_mut(mid) { + session.update(); + } + self.gc(); } @@ -86,4 +98,14 @@ impl SessionKeeper { pub(crate) fn get_id(&self, mid: &str) -> Option { self.sessions.get(mid).map(|s| s.session_id()) } + + pub(crate) fn remove(&mut self, id: &str) { + self.sessions.remove(id); + } + + pub(crate) fn expire(&mut self, mid: &str) { + if let Some(session) = self.sessions.get_mut(mid) { + session.expire(); + } + } } diff --git a/sysminion/src/minion.rs b/sysminion/src/minion.rs index e919ffcb..fef5771b 100644 --- a/sysminion/src/minion.rs +++ b/sysminion/src/minion.rs @@ -5,7 +5,7 @@ use libsysinspect::{ proto::{ errcodes::ProtoErrorCode, payload::{ModStatePayload, PayloadType}, - query::MinionQuery, + query::{MinionQuery, SCHEME_COMMAND}, rqtypes::RequestType, MasterMessage, MinionMessage, ProtoConversion, }, @@ -42,7 +42,7 @@ impl SysMinion { log::debug!("Configuration: {:#?}", cfg); log::debug!("Trying to connect at {}", cfg.master()); - let (rstm, wstm) = TcpStream::connect(cfg.master()).await.unwrap().into_split(); + let (rstm, wstm) = TcpStream::connect(cfg.master()).await?.into_split(); log::debug!("Network bound at {}", cfg.master()); let instance = SysMinion { cfg: cfg.clone(), @@ -219,6 +219,10 @@ impl SysMinion { RequestType::Ping => { self.request(proto::msg::get_pong()).await; } + RequestType::ByeAck => { + log::info!("Master confirmed shutdown, terminating"); + std::process::exit(0); + } _ => { log::error!("Unknown request type"); } @@ -260,6 +264,18 @@ impl SysMinion { Ok(()) } + /// Send bye message + pub async fn send_bye(self: Arc) { + let r = MinionMessage::new( + dataconv::as_str(traits::get_minion_traits(None).get(traits::SYS_ID)), + RequestType::Bye, + MINION_SID.to_string(), + ); + + log::info!("Goodbye to {}", self.cfg.master()); + self.request(r.sendable().unwrap()).await; + } + /// Download a file from master async fn download_file(self: Arc, fname: &str) -> Result, SysinspectError> { async fn fetch_file(url: &str, filename: &str) -> Result { @@ -371,6 +387,24 @@ impl SysMinion { log::debug!("Sysinspect model cycle finished"); } + /// Calls internal command + async fn call_internal_command(self: Arc, cmd: &str) { + let cmd = cmd.strip_prefix(SCHEME_COMMAND).unwrap_or_default(); + match cmd { + libsysinspect::proto::query::commands::CLUSTER_SHUTDOWN => { + log::info!("Requesting minion shutdown from a master"); + self.as_ptr().send_bye().await; + } + libsysinspect::proto::query::commands::CLUSTER_REBOOT => { + log::warn!("Command \"reboot\" is not implemented yet"); + } + libsysinspect::proto::query::commands::CLUSTER_ROTATE => { + log::warn!("Command \"rotate\" is not implemented yet"); + } + _ => {} + } + } + async fn dispatch(self: Arc, cmd: MasterMessage) { log::debug!("Dispatching message"); let tgt = cmd.get_target(); @@ -424,9 +458,13 @@ impl SysMinion { match PayloadType::try_from(cmd.payload().clone()) { Ok(PayloadType::ModelOrStatement(pld)) => { - self.launch_sysinspect(cmd.get_target().scheme(), &pld).await; - log::debug!("Command dispatched"); - log::trace!("Command payload: {:#?}", pld); + if cmd.get_target().scheme().starts_with(SCHEME_COMMAND) { + self.as_ptr().call_internal_command(cmd.get_target().scheme()).await; + } else { + self.as_ptr().launch_sysinspect(cmd.get_target().scheme(), &pld).await; + log::debug!("Command dispatched"); + log::trace!("Command payload: {:#?}", pld); + } } Ok(PayloadType::Undef(pld)) => { log::error!("Unknown command: {:#?}", pld);