From 5d34ecf3a1fe9e73d1fc6e5c80eb3b54621844a0 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Fri, 13 Dec 2024 16:38:06 +0100 Subject: [PATCH 01/12] Add CLI option to send a message to the Master for shutting down the whole cluster --- src/clidef.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/clidef.rs b/src/clidef.rs index 52da86a..95c02e5 100644 --- a/src/clidef.rs +++ b/src/clidef.rs @@ -68,6 +68,12 @@ pub fn cli(version: &'static str) -> Command { .long("state") .help("Specify a state to be processed. If none specified, default is taken ($)") ) + .arg( + Arg::new("shutdown") + .long("shutdown") + .action(ArgAction::SetTrue) + .help(format!("Notify the running master to shutdown the {}, be careful! :)", "entire cluster".bright_red())) + ) .next_help_heading("Info") .arg( From 256a6cf1f167f326a9bb2bad8a5758198a6d0a93 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Fri, 13 Dec 2024 17:51:38 +0100 Subject: [PATCH 02/12] Define internal commands --- libsysinspect/src/proto/query.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/libsysinspect/src/proto/query.rs b/libsysinspect/src/proto/query.rs index bccf776..14faa7a 100644 --- a/libsysinspect/src/proto/query.rs +++ b/libsysinspect/src/proto/query.rs @@ -1,9 +1,22 @@ -use std::sync::{Arc, Mutex}; - use crate::SysinspectError; +use std::sync::{Arc, Mutex}; /// Targeting schemes pub static SCHEME_MODEL: &str = "model://"; +pub static SCHEME_COMMAND: &str = "cmd://"; + +pub mod commands { + // Stop the entire cluster + pub const CLUSTER_SHUTDOWN: &str = "cluster/shutdown"; + + // Restart the entire cluster + // TODO: Not implemented yet + pub const CLUSTER_REBOOT: &str = "cluster/reboot"; + + // Rotate RSA/AES on the entire cluster + // TODO: Not implemented yet + pub const CLUSTER_ROTATE: &str = "cluster/rotate"; +} /// /// Query parser (scheme). From cf7f8afba96f370e52d7bbc8351f95c147755939 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Fri, 13 Dec 2024 17:51:57 +0100 Subject: [PATCH 03/12] Bugfix: don't crash/panic on refused TCP --- sysminion/src/minion.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sysminion/src/minion.rs b/sysminion/src/minion.rs index e919ffc..2fd6365 100644 --- a/sysminion/src/minion.rs +++ b/sysminion/src/minion.rs @@ -42,7 +42,7 @@ impl SysMinion { log::debug!("Configuration: {:#?}", cfg); log::debug!("Trying to connect at {}", cfg.master()); - let (rstm, wstm) = TcpStream::connect(cfg.master()).await.unwrap().into_split(); + let (rstm, wstm) = TcpStream::connect(cfg.master()).await?.into_split(); log::debug!("Network bound at {}", cfg.master()); let instance = SysMinion { cfg: cfg.clone(), From 09bf2818ab08e817eaac36559021285f2de0b77c Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Fri, 13 Dec 2024 17:52:33 +0100 Subject: [PATCH 04/12] Broadcast custer shutdown message --- src/main.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main.rs b/src/main.rs index 14c04dc..754dd91 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use libsysinspect::{ cfg::{mmconf::MasterConfig, select_config_path}, inspector::SysInspectRunner, logger, + proto::query::{commands::CLUSTER_SHUTDOWN, SCHEME_COMMAND}, reactor::handlers, traits::get_minion_traits, SysinspectError, @@ -103,6 +104,10 @@ fn main() { if let Err(err) = call_master_fifo(model, query.unwrap_or(&"".to_string()), traits, &cfg.socket()) { log::error!("Cannot reach master: {err}"); } + } else if params.get_flag("shutdown") { + if let Err(err) = call_master_fifo(&format!("{}{}", SCHEME_COMMAND, CLUSTER_SHUTDOWN), "*", None, &cfg.socket()) { + log::error!("Cannot reach master: {err}"); + } } else if let Some(mpath) = params.get_one::("model") { let mut sr = SysInspectRunner::new(None); sr.set_model_path(mpath); From e7d7d44bf78f8e472b6c29b89e69855170b9bf00 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Fri, 13 Dec 2024 17:52:58 +0100 Subject: [PATCH 05/12] Broadcast minion shutdown --- sysmaster/src/master.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sysmaster/src/master.rs b/sysmaster/src/master.rs index 2ecbee3..8d625e2 100644 --- a/sysmaster/src/master.rs +++ b/sysmaster/src/master.rs @@ -103,15 +103,15 @@ impl SysMaster { fn msg_query(&mut self, payload: &str) -> Option { let query = payload.split(";").map(|s| s.to_string()).collect::>(); - if let [scheme, query, traits] = query.as_slice() { + if let [querypath, query, traits] = query.as_slice() { + println!("Scheme: {querypath}, Query: {query}"); let mut tgt = MinionTarget::default(); - tgt.set_scheme(scheme); + tgt.set_scheme(querypath); tgt.set_traits_query(traits); for hostname in query.split(",") { tgt.add_hostname(hostname); } - // Collect downloadable model(s) files let mut out: HashMap = HashMap::default(); for em in self.cfg.fileserver_models() { for (n, cs) in scan_files_sha256(self.cfg.fileserver_mdl_root(false).join(em), Some(MODEL_FILE_EXT)) { @@ -122,10 +122,15 @@ impl SysMaster { } } + let mut payload = String::from(""); + if tgt.scheme().eq(proto::query::SCHEME_COMMAND) { + payload = query.to_owned(); + } + let mut msg = MasterMessage::new( RequestType::Command, - json!(ModStatePayload::new("12345".to_string()) - .set_uri(scheme.to_string()) + json!(ModStatePayload::new(payload) + .set_uri(querypath.to_string()) .add_files(out) .set_models_root(self.cfg.fileserver_mdl_root(true).to_str().unwrap_or_default())), // TODO: SID part ); From 37305641d1b42f0e807a82307b8772c8355ebe1e Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Fri, 13 Dec 2024 17:53:16 +0100 Subject: [PATCH 06/12] Implement minion stop --- sysminion/src/minion.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/sysminion/src/minion.rs b/sysminion/src/minion.rs index 2fd6365..6d34996 100644 --- a/sysminion/src/minion.rs +++ b/sysminion/src/minion.rs @@ -5,7 +5,7 @@ use libsysinspect::{ proto::{ errcodes::ProtoErrorCode, payload::{ModStatePayload, PayloadType}, - query::MinionQuery, + query::{MinionQuery, SCHEME_COMMAND}, rqtypes::RequestType, MasterMessage, MinionMessage, ProtoConversion, }, @@ -371,6 +371,24 @@ impl SysMinion { log::debug!("Sysinspect model cycle finished"); } + /// Calls internal command + fn call_internal_command(self: Arc, cmd: &str) { + let cmd = cmd.strip_prefix(SCHEME_COMMAND).unwrap_or_default(); + match cmd { + libsysinspect::proto::query::commands::CLUSTER_SHUTDOWN => { + log::info!("Shutting down minion"); + std::process::exit(0); + } + libsysinspect::proto::query::commands::CLUSTER_REBOOT => { + log::warn!("Command \"reboot\" is not implemented yet"); + } + libsysinspect::proto::query::commands::CLUSTER_ROTATE => { + log::warn!("Command \"rotate\" is not implemented yet"); + } + _ => {} + } + } + async fn dispatch(self: Arc, cmd: MasterMessage) { log::debug!("Dispatching message"); let tgt = cmd.get_target(); @@ -424,9 +442,13 @@ impl SysMinion { match PayloadType::try_from(cmd.payload().clone()) { Ok(PayloadType::ModelOrStatement(pld)) => { - self.launch_sysinspect(cmd.get_target().scheme(), &pld).await; - log::debug!("Command dispatched"); - log::trace!("Command payload: {:#?}", pld); + if cmd.get_target().scheme().starts_with(SCHEME_COMMAND) { + self.as_ptr().call_internal_command(cmd.get_target().scheme()); + } else { + self.as_ptr().launch_sysinspect(cmd.get_target().scheme(), &pld).await; + log::debug!("Command dispatched"); + log::trace!("Command payload: {:#?}", pld); + } } Ok(PayloadType::Undef(pld)) => { log::error!("Unknown command: {:#?}", pld); From 5a57e9dc5bed0773aec7d2af8c39de760f5d0378 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Fri, 13 Dec 2024 19:06:54 +0100 Subject: [PATCH 07/12] Add Bye message type. --- libsysinspect/src/proto/rqtypes.rs | 4 ++++ sysmaster/src/master.rs | 5 ++++- sysmaster/src/registry/session.rs | 4 ++++ sysminion/src/minion.rs | 20 +++++++++++++++++--- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/libsysinspect/src/proto/rqtypes.rs b/libsysinspect/src/proto/rqtypes.rs index 025edfb..0776875 100644 --- a/libsysinspect/src/proto/rqtypes.rs +++ b/libsysinspect/src/proto/rqtypes.rs @@ -26,6 +26,10 @@ pub enum RequestType { #[serde(rename = "ehlo")] Ehlo, + /// Bye + #[serde(rename = "b")] + Bye, + /// Retry connect (e.g. after the registration) #[serde(rename = "retry")] Reconnect, diff --git a/sysmaster/src/master.rs b/sysmaster/src/master.rs index 8d625e2..1cc7608 100644 --- a/sysmaster/src/master.rs +++ b/sysmaster/src/master.rs @@ -104,7 +104,6 @@ impl SysMaster { let query = payload.split(";").map(|s| s.to_string()).collect::>(); if let [querypath, query, traits] = query.as_slice() { - println!("Scheme: {querypath}, Query: {query}"); let mut tgt = MinionTarget::default(); tgt.set_scheme(querypath); tgt.set_traits_query(traits); @@ -273,6 +272,10 @@ impl SysMaster { guard.on_traits(c_id, c_payload).await; }); } + + RequestType::Bye => { + log::debug!("Minion at {} disconnects", req.id()); + } _ => { log::error!("Minion sends unknown request type"); } diff --git a/sysmaster/src/registry/session.rs b/sysmaster/src/registry/session.rs index c966f81..77501ee 100644 --- a/sysmaster/src/registry/session.rs +++ b/sysmaster/src/registry/session.rs @@ -86,4 +86,8 @@ impl SessionKeeper { pub(crate) fn get_id(&self, mid: &str) -> Option { self.sessions.get(mid).map(|s| s.session_id()) } + + pub(crate) fn remove(&mut self, id: &str) { + self.sessions.remove(id); + } } diff --git a/sysminion/src/minion.rs b/sysminion/src/minion.rs index 6d34996..629b429 100644 --- a/sysminion/src/minion.rs +++ b/sysminion/src/minion.rs @@ -15,7 +15,7 @@ use libsysinspect::{ SysinspectError, }; use once_cell::sync::Lazy; -use std::{fs, sync::Arc, vec}; +use std::{fs, sync::Arc, time::Duration, vec}; use tokio::io::AsyncReadExt; use tokio::net::{tcp::OwnedReadHalf, TcpStream}; use tokio::sync::Mutex; @@ -260,6 +260,18 @@ impl SysMinion { Ok(()) } + /// Send bye message + pub async fn send_bye(self: Arc) { + let r = MinionMessage::new( + dataconv::as_str(traits::get_minion_traits(None).get(traits::SYS_ID)), + RequestType::Bye, + MINION_SID.to_string(), + ); + + log::info!("Goodbye to {}", self.cfg.master()); + self.request(r.sendable().unwrap()).await; + } + /// Download a file from master async fn download_file(self: Arc, fname: &str) -> Result, SysinspectError> { async fn fetch_file(url: &str, filename: &str) -> Result { @@ -372,11 +384,13 @@ impl SysMinion { } /// Calls internal command - fn call_internal_command(self: Arc, cmd: &str) { + async fn call_internal_command(self: Arc, cmd: &str) { let cmd = cmd.strip_prefix(SCHEME_COMMAND).unwrap_or_default(); match cmd { libsysinspect::proto::query::commands::CLUSTER_SHUTDOWN => { log::info!("Shutting down minion"); + self.as_ptr().send_bye().await; + tokio::time::sleep(Duration::from_secs(2)).await; std::process::exit(0); } libsysinspect::proto::query::commands::CLUSTER_REBOOT => { @@ -443,7 +457,7 @@ impl SysMinion { match PayloadType::try_from(cmd.payload().clone()) { Ok(PayloadType::ModelOrStatement(pld)) => { if cmd.get_target().scheme().starts_with(SCHEME_COMMAND) { - self.as_ptr().call_internal_command(cmd.get_target().scheme()); + self.as_ptr().call_internal_command(cmd.get_target().scheme()).await; } else { self.as_ptr().launch_sysinspect(cmd.get_target().scheme(), &pld).await; log::debug!("Command dispatched"); From af3a76dacc5a320b1702d05b969f4cc475431a3c Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 14 Dec 2024 19:51:56 +0100 Subject: [PATCH 08/12] Add session expiration --- sysmaster/src/registry/session.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/sysmaster/src/registry/session.rs b/sysmaster/src/registry/session.rs index 77501ee..7ad1353 100644 --- a/sysmaster/src/registry/session.rs +++ b/sysmaster/src/registry/session.rs @@ -4,7 +4,10 @@ Keeps connected minions and updates their uptime via heartbeat. This prevents simultaenous connection of multiple minions on the same machine. */ -use std::{collections::HashMap, time::Instant}; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; #[derive(Debug, Clone)] struct Session { @@ -33,6 +36,10 @@ impl Session { pub fn session_id(&self) -> String { self.sid.to_string() } + + pub fn expire(&mut self) { + self.last -= Duration::from_secs(90); + } } #[derive(Debug, Default, Clone)] @@ -90,4 +97,10 @@ impl SessionKeeper { pub(crate) fn remove(&mut self, id: &str) { self.sessions.remove(id); } + + pub(crate) fn expire(&mut self, mid: &str) { + if let Some(session) = self.sessions.get_mut(mid) { + session.expire(); + } + } } From 48732042bdd8bc89926e69e3bf78fe4525ff9c78 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 14 Dec 2024 19:53:14 +0100 Subject: [PATCH 09/12] Bugfix: Let ping also only reuse session instead of adding a new one. This prevents reintroduce it asynchronously right after removing it on a minion disconnect. --- sysmaster/src/registry/session.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sysmaster/src/registry/session.rs b/sysmaster/src/registry/session.rs index 7ad1353..476997c 100644 --- a/sysmaster/src/registry/session.rs +++ b/sysmaster/src/registry/session.rs @@ -62,8 +62,13 @@ impl SessionKeeper { } /// Create a new session or update the existing - pub fn ping(&mut self, mid: &str, sid: &str) { - self.sessions.entry(mid.to_string()).or_insert_with(|| Session::new(sid)).update(); + pub fn ping(&mut self, mid: &str, sid: Option<&str>) { + if let Some(sid) = sid { + self.sessions.entry(mid.to_string()).or_insert_with(|| Session::new(sid)).update(); + } else if let Some(session) = self.sessions.get_mut(mid) { + session.update(); + } + self.gc(); } From 368eb1f2faf73451c978fcae960beaff7fe36985 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 14 Dec 2024 19:54:07 +0100 Subject: [PATCH 10/12] Ensure the session keeper has only one instance --- Cargo.lock | 1 + sysmaster/Cargo.toml | 1 + sysmaster/src/master.rs | 33 +++++++++++++++++++++------------ 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48d3365..839f241 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4211,6 +4211,7 @@ dependencies = [ "libc", "libsysinspect", "log", + "once_cell", "rand", "rsa", "rustls", diff --git a/sysmaster/Cargo.toml b/sysmaster/Cargo.toml index 701b432..a455359 100644 --- a/sysmaster/Cargo.toml +++ b/sysmaster/Cargo.toml @@ -33,3 +33,4 @@ sled = "0.34.7" rsa = { version = "0.9.6", features = ["pkcs5", "sha1", "sha2"] } uuid = { version = "1.11.0", features = ["v4"] } actix-web = "4.9.0" +once_cell = "1.20.2" diff --git a/sysmaster/src/master.rs b/sysmaster/src/master.rs index 1cc7608..91de88c 100644 --- a/sysmaster/src/master.rs +++ b/sysmaster/src/master.rs @@ -16,6 +16,7 @@ use libsysinspect::{ util::iofs::scan_files_sha256, SysinspectError, }; +use once_cell::sync::Lazy; use serde_json::json; use std::{ collections::{HashMap, HashSet}, @@ -32,6 +33,9 @@ use tokio::{ time, }; +// Session singleton +static SHARED_SESSION: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(SessionKeeper::new(30)))); + #[derive(Debug)] pub struct SysMaster { cfg: MasterConfig, @@ -39,7 +43,7 @@ pub struct SysMaster { mkr: MinionsKeyRegistry, mreg: MinionRegistry, to_drop: HashSet, - session: session::SessionKeeper, + session: Arc>, } impl SysMaster { @@ -47,7 +51,7 @@ impl SysMaster { let (tx, _) = broadcast::channel::>(100); let mkr = MinionsKeyRegistry::new(cfg.keyman_root())?; let mreg = MinionRegistry::new(cfg.minion_registry_root())?; - Ok(SysMaster { cfg, broadcast: tx, mkr, to_drop: HashSet::default(), session: SessionKeeper::new(30), mreg }) + Ok(SysMaster { cfg, broadcast: tx, mkr, to_drop: HashSet::default(), session: Arc::clone(&SHARED_SESSION), mreg }) } /// Open FIFO socket for command-line communication @@ -180,6 +184,10 @@ impl SysMaster { m } + pub fn get_session(&self) -> Arc> { + Arc::clone(&self.session) + } + /// Process incoming minion messages #[allow(clippy::while_let_loop)] pub async fn do_incoming(master: Arc>, mut rx: tokio::sync::mpsc::Receiver<(Vec, String)>) { @@ -230,7 +238,7 @@ impl SysMaster { log::info!("Minion at {minion_addr} ({}) is not registered", req.id()); guard.to_drop.insert(minion_addr); _ = c_bcast.send(guard.msg_not_registered(req.id().to_string()).sendable().unwrap()); - } else if guard.session.exists(&c_id) { + } else if guard.get_session().lock().await.exists(&c_id) { log::info!("Minion at {minion_addr} ({}) is already connected", req.id()); guard.to_drop.insert(minion_addr); _ = c_bcast.send( @@ -238,7 +246,7 @@ impl SysMaster { ); } else { log::info!("{} connected successfully", c_id); - guard.session.ping(&c_id, &c_payload); + guard.get_session().lock().await.ping(&c_id, Some(&c_payload)); _ = c_bcast .send(guard.msg_request_traits(req.id().to_string(), c_payload).sendable().unwrap()); log::info!("Syncing traits with minion at {}", c_id); @@ -249,11 +257,10 @@ impl SysMaster { RequestType::Pong => { let c_master = Arc::clone(&master); let c_id = req.id().to_string(); - let c_payload = req.payload().to_string(); tokio::spawn(async move { - let mut guard = c_master.lock().await; - guard.session.ping(&c_id, &c_payload); - let uptime = guard.session.uptime(req.id()).unwrap_or_default(); + let guard = c_master.lock().await; + guard.get_session().lock().await.ping(&c_id, None); + let uptime = guard.get_session().lock().await.uptime(req.id()).unwrap_or_default(); log::trace!( "Update last contacted for {} (alive for {:.2} min)", req.id().to_string(), @@ -379,24 +386,26 @@ impl SysMaster { loop { if let Ok(msg) = bcast_sub.recv().await { log::trace!("Sending message to minion at {} length of {}", local_addr.to_string(), msg.len()); + let mut guard = c_master.lock().await; if writer.write_all(&(msg.len() as u32).to_be_bytes()).await.is_err() || writer.write_all(&msg).await.is_err() || writer.flush().await.is_err() { if let Err(err) = cancel_tx.send(true) { - log::error!("Sending cancel notification: {err}"); + log::debug!("Error sending cancel notification: {err}"); } break; } - if c_master.lock().await.to_drop.contains(&local_addr.to_string()) { - c_master.lock().await.to_drop.remove(&local_addr.to_string()); + if guard.to_drop.contains(&local_addr.to_string()) { + guard.to_drop.remove(&local_addr.to_string()); log::info!("Dropping minion: {}", &local_addr.to_string()); + log::info!(""); if let Err(err) = writer.shutdown().await { log::error!("Error shutting down outgoing: {err}"); } if let Err(err) = cancel_tx.send(true) { - log::error!("Sending cancel notification: {err}"); + log::debug!("Error sending cancel notification: {err}"); } return; From 21ceb9cff5f341ec318d2657b9fbeb7ecced2f75 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 14 Dec 2024 19:54:40 +0100 Subject: [PATCH 11/12] Remove minion from the session on its disconnect --- sysmaster/src/master.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sysmaster/src/master.rs b/sysmaster/src/master.rs index 91de88c..d813615 100644 --- a/sysmaster/src/master.rs +++ b/sysmaster/src/master.rs @@ -281,8 +281,14 @@ impl SysMaster { } RequestType::Bye => { - log::debug!("Minion at {} disconnects", req.id()); + let c_master = Arc::clone(&master); + log::info!("Minion {} disconnects", req.id()); + tokio::spawn(async move { + let guard = c_master.lock().await; + guard.get_session().lock().await.remove(req.id()); + }); } + _ => { log::error!("Minion sends unknown request type"); } From 02d5b7c138cdf09e6611e6d3a87cab96cb6afc11 Mon Sep 17 00:00:00 2001 From: Bo Maryniuk Date: Sat, 14 Dec 2024 20:08:54 +0100 Subject: [PATCH 12/12] Rely on shutdown ack from the master when shutting down --- libsysinspect/src/proto/rqtypes.rs | 4 ++++ sysmaster/src/master.rs | 13 ++++++++++++- sysminion/src/minion.rs | 10 ++++++---- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/libsysinspect/src/proto/rqtypes.rs b/libsysinspect/src/proto/rqtypes.rs index 0776875..8ac159f 100644 --- a/libsysinspect/src/proto/rqtypes.rs +++ b/libsysinspect/src/proto/rqtypes.rs @@ -30,6 +30,10 @@ pub enum RequestType { #[serde(rename = "b")] Bye, + /// Bye Ack + #[serde(rename = "ba")] + ByeAck, + /// Retry connect (e.g. after the registration) #[serde(rename = "retry")] Reconnect, diff --git a/sysmaster/src/master.rs b/sysmaster/src/master.rs index d813615..761b8a9 100644 --- a/sysmaster/src/master.rs +++ b/sysmaster/src/master.rs @@ -184,6 +184,14 @@ impl SysMaster { m } + fn msg_bye_ack(&mut self, mid: String, sid: String) -> MasterMessage { + let mut m = MasterMessage::new(RequestType::ByeAck, json!(sid)); + m.set_target(MinionTarget::new(&mid, &sid)); + m.set_retcode(ProtoErrorCode::Success); + + m + } + pub fn get_session(&self) -> Arc> { Arc::clone(&self.session) } @@ -282,10 +290,13 @@ impl SysMaster { RequestType::Bye => { let c_master = Arc::clone(&master); + let c_bcast = bcast.clone(); log::info!("Minion {} disconnects", req.id()); tokio::spawn(async move { - let guard = c_master.lock().await; + let mut guard = c_master.lock().await; guard.get_session().lock().await.remove(req.id()); + let m = guard.msg_bye_ack(req.id().to_string(), req.payload().to_string()); + _ = c_bcast.send(m.sendable().unwrap()); }); } diff --git a/sysminion/src/minion.rs b/sysminion/src/minion.rs index 629b429..fef5771 100644 --- a/sysminion/src/minion.rs +++ b/sysminion/src/minion.rs @@ -15,7 +15,7 @@ use libsysinspect::{ SysinspectError, }; use once_cell::sync::Lazy; -use std::{fs, sync::Arc, time::Duration, vec}; +use std::{fs, sync::Arc, vec}; use tokio::io::AsyncReadExt; use tokio::net::{tcp::OwnedReadHalf, TcpStream}; use tokio::sync::Mutex; @@ -219,6 +219,10 @@ impl SysMinion { RequestType::Ping => { self.request(proto::msg::get_pong()).await; } + RequestType::ByeAck => { + log::info!("Master confirmed shutdown, terminating"); + std::process::exit(0); + } _ => { log::error!("Unknown request type"); } @@ -388,10 +392,8 @@ impl SysMinion { let cmd = cmd.strip_prefix(SCHEME_COMMAND).unwrap_or_default(); match cmd { libsysinspect::proto::query::commands::CLUSTER_SHUTDOWN => { - log::info!("Shutting down minion"); + log::info!("Requesting minion shutdown from a master"); self.as_ptr().send_bye().await; - tokio::time::sleep(Duration::from_secs(2)).await; - std::process::exit(0); } libsysinspect::proto::query::commands::CLUSTER_REBOOT => { log::warn!("Command \"reboot\" is not implemented yet");