Skip to content

Commit

Permalink
Merge pull request #54 from tinythings/isbm-minion-drop
Browse files Browse the repository at this point in the history
Add minion drop
  • Loading branch information
isbm authored Dec 14, 2024
2 parents a612645 + 02d5b7c commit 407bef6
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions libsysinspect/src/proto/query.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
use std::sync::{Arc, Mutex};

use crate::SysinspectError;
use std::sync::{Arc, Mutex};

/// Targeting schemes
pub static SCHEME_MODEL: &str = "model://";
pub static SCHEME_COMMAND: &str = "cmd://";

pub mod commands {
// Stop the entire cluster
pub const CLUSTER_SHUTDOWN: &str = "cluster/shutdown";

// Restart the entire cluster
// TODO: Not implemented yet
pub const CLUSTER_REBOOT: &str = "cluster/reboot";

// Rotate RSA/AES on the entire cluster
// TODO: Not implemented yet
pub const CLUSTER_ROTATE: &str = "cluster/rotate";
}

///
/// Query parser (scheme).
Expand Down
8 changes: 8 additions & 0 deletions libsysinspect/src/proto/rqtypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ pub enum RequestType {
#[serde(rename = "ehlo")]
Ehlo,

/// Bye
#[serde(rename = "b")]
Bye,

/// Bye Ack
#[serde(rename = "ba")]
ByeAck,

/// Retry connect (e.g. after the registration)
#[serde(rename = "retry")]
Reconnect,
Expand Down
6 changes: 6 additions & 0 deletions src/clidef.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ pub fn cli(version: &'static str) -> Command {
.long("state")
.help("Specify a state to be processed. If none specified, default is taken ($)")
)
.arg(
Arg::new("shutdown")
.long("shutdown")
.action(ArgAction::SetTrue)
.help(format!("Notify the running master to shutdown the {}, be careful! :)", "entire cluster".bright_red()))
)

.next_help_heading("Info")
.arg(
Expand Down
5 changes: 5 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use libsysinspect::{
cfg::{mmconf::MasterConfig, select_config_path},
inspector::SysInspectRunner,
logger,
proto::query::{commands::CLUSTER_SHUTDOWN, SCHEME_COMMAND},
reactor::handlers,
traits::get_minion_traits,
SysinspectError,
Expand Down Expand Up @@ -103,6 +104,10 @@ fn main() {
if let Err(err) = call_master_fifo(model, query.unwrap_or(&"".to_string()), traits, &cfg.socket()) {
log::error!("Cannot reach master: {err}");
}
} else if params.get_flag("shutdown") {
if let Err(err) = call_master_fifo(&format!("{}{}", SCHEME_COMMAND, CLUSTER_SHUTDOWN), "*", None, &cfg.socket()) {
log::error!("Cannot reach master: {err}");
}
} else if let Some(mpath) = params.get_one::<String>("model") {
let mut sr = SysInspectRunner::new(None);
sr.set_model_path(mpath);
Expand Down
1 change: 1 addition & 0 deletions sysmaster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ sled = "0.34.7"
rsa = { version = "0.9.6", features = ["pkcs5", "sha1", "sha2"] }
uuid = { version = "1.11.0", features = ["v4"] }
actix-web = "4.9.0"
once_cell = "1.20.2"
68 changes: 51 additions & 17 deletions sysmaster/src/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use libsysinspect::{
util::iofs::scan_files_sha256,
SysinspectError,
};
use once_cell::sync::Lazy;
use serde_json::json;
use std::{
collections::{HashMap, HashSet},
Expand All @@ -32,22 +33,25 @@ use tokio::{
time,
};

// Session singleton
static SHARED_SESSION: Lazy<Arc<Mutex<SessionKeeper>>> = Lazy::new(|| Arc::new(Mutex::new(SessionKeeper::new(30))));

#[derive(Debug)]
pub struct SysMaster {
cfg: MasterConfig,
broadcast: broadcast::Sender<Vec<u8>>,
mkr: MinionsKeyRegistry,
mreg: MinionRegistry,
to_drop: HashSet<String>,
session: session::SessionKeeper,
session: Arc<Mutex<session::SessionKeeper>>,
}

impl SysMaster {
pub fn new(cfg: MasterConfig) -> Result<SysMaster, SysinspectError> {
let (tx, _) = broadcast::channel::<Vec<u8>>(100);
let mkr = MinionsKeyRegistry::new(cfg.keyman_root())?;
let mreg = MinionRegistry::new(cfg.minion_registry_root())?;
Ok(SysMaster { cfg, broadcast: tx, mkr, to_drop: HashSet::default(), session: SessionKeeper::new(30), mreg })
Ok(SysMaster { cfg, broadcast: tx, mkr, to_drop: HashSet::default(), session: Arc::clone(&SHARED_SESSION), mreg })
}

/// Open FIFO socket for command-line communication
Expand Down Expand Up @@ -103,15 +107,14 @@ impl SysMaster {
fn msg_query(&mut self, payload: &str) -> Option<MasterMessage> {
let query = payload.split(";").map(|s| s.to_string()).collect::<Vec<String>>();

if let [scheme, query, traits] = query.as_slice() {
if let [querypath, query, traits] = query.as_slice() {
let mut tgt = MinionTarget::default();
tgt.set_scheme(scheme);
tgt.set_scheme(querypath);
tgt.set_traits_query(traits);
for hostname in query.split(",") {
tgt.add_hostname(hostname);
}

// Collect downloadable model(s) files
let mut out: HashMap<String, String> = HashMap::default();
for em in self.cfg.fileserver_models() {
for (n, cs) in scan_files_sha256(self.cfg.fileserver_mdl_root(false).join(em), Some(MODEL_FILE_EXT)) {
Expand All @@ -122,10 +125,15 @@ impl SysMaster {
}
}

let mut payload = String::from("");
if tgt.scheme().eq(proto::query::SCHEME_COMMAND) {
payload = query.to_owned();
}

let mut msg = MasterMessage::new(
RequestType::Command,
json!(ModStatePayload::new("12345".to_string())
.set_uri(scheme.to_string())
json!(ModStatePayload::new(payload)
.set_uri(querypath.to_string())
.add_files(out)
.set_models_root(self.cfg.fileserver_mdl_root(true).to_str().unwrap_or_default())), // TODO: SID part
);
Expand Down Expand Up @@ -176,6 +184,18 @@ impl SysMaster {
m
}

fn msg_bye_ack(&mut self, mid: String, sid: String) -> MasterMessage {
let mut m = MasterMessage::new(RequestType::ByeAck, json!(sid));
m.set_target(MinionTarget::new(&mid, &sid));
m.set_retcode(ProtoErrorCode::Success);

m
}

pub fn get_session(&self) -> Arc<Mutex<session::SessionKeeper>> {
Arc::clone(&self.session)
}

/// Process incoming minion messages
#[allow(clippy::while_let_loop)]
pub async fn do_incoming(master: Arc<Mutex<Self>>, mut rx: tokio::sync::mpsc::Receiver<(Vec<u8>, String)>) {
Expand Down Expand Up @@ -226,15 +246,15 @@ impl SysMaster {
log::info!("Minion at {minion_addr} ({}) is not registered", req.id());
guard.to_drop.insert(minion_addr);
_ = c_bcast.send(guard.msg_not_registered(req.id().to_string()).sendable().unwrap());
} else if guard.session.exists(&c_id) {
} else if guard.get_session().lock().await.exists(&c_id) {
log::info!("Minion at {minion_addr} ({}) is already connected", req.id());
guard.to_drop.insert(minion_addr);
_ = c_bcast.send(
guard.msg_already_connected(req.id().to_string(), c_payload).sendable().unwrap(),
);
} else {
log::info!("{} connected successfully", c_id);
guard.session.ping(&c_id, &c_payload);
guard.get_session().lock().await.ping(&c_id, Some(&c_payload));
_ = c_bcast
.send(guard.msg_request_traits(req.id().to_string(), c_payload).sendable().unwrap());
log::info!("Syncing traits with minion at {}", c_id);
Expand All @@ -245,11 +265,10 @@ impl SysMaster {
RequestType::Pong => {
let c_master = Arc::clone(&master);
let c_id = req.id().to_string();
let c_payload = req.payload().to_string();
tokio::spawn(async move {
let mut guard = c_master.lock().await;
guard.session.ping(&c_id, &c_payload);
let uptime = guard.session.uptime(req.id()).unwrap_or_default();
let guard = c_master.lock().await;
guard.get_session().lock().await.ping(&c_id, None);
let uptime = guard.get_session().lock().await.uptime(req.id()).unwrap_or_default();
log::trace!(
"Update last contacted for {} (alive for {:.2} min)",
req.id().to_string(),
Expand All @@ -268,6 +287,19 @@ impl SysMaster {
guard.on_traits(c_id, c_payload).await;
});
}

RequestType::Bye => {
let c_master = Arc::clone(&master);
let c_bcast = bcast.clone();
log::info!("Minion {} disconnects", req.id());
tokio::spawn(async move {
let mut guard = c_master.lock().await;
guard.get_session().lock().await.remove(req.id());
let m = guard.msg_bye_ack(req.id().to_string(), req.payload().to_string());
_ = c_bcast.send(m.sendable().unwrap());
});
}

_ => {
log::error!("Minion sends unknown request type");
}
Expand Down Expand Up @@ -371,24 +403,26 @@ impl SysMaster {
loop {
if let Ok(msg) = bcast_sub.recv().await {
log::trace!("Sending message to minion at {} length of {}", local_addr.to_string(), msg.len());
let mut guard = c_master.lock().await;
if writer.write_all(&(msg.len() as u32).to_be_bytes()).await.is_err()
|| writer.write_all(&msg).await.is_err()
|| writer.flush().await.is_err()
{
if let Err(err) = cancel_tx.send(true) {
log::error!("Sending cancel notification: {err}");
log::debug!("Error sending cancel notification: {err}");
}
break;
}

if c_master.lock().await.to_drop.contains(&local_addr.to_string()) {
c_master.lock().await.to_drop.remove(&local_addr.to_string());
if guard.to_drop.contains(&local_addr.to_string()) {
guard.to_drop.remove(&local_addr.to_string());
log::info!("Dropping minion: {}", &local_addr.to_string());
log::info!("");
if let Err(err) = writer.shutdown().await {
log::error!("Error shutting down outgoing: {err}");
}
if let Err(err) = cancel_tx.send(true) {
log::error!("Sending cancel notification: {err}");
log::debug!("Error sending cancel notification: {err}");
}

return;
Expand Down
28 changes: 25 additions & 3 deletions sysmaster/src/registry/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ Keeps connected minions and updates their uptime via heartbeat.
This prevents simultaenous connection of multiple minions on the same machine.
*/

use std::{collections::HashMap, time::Instant};
use std::{
collections::HashMap,
time::{Duration, Instant},
};

#[derive(Debug, Clone)]
struct Session {
Expand Down Expand Up @@ -33,6 +36,10 @@ impl Session {
pub fn session_id(&self) -> String {
self.sid.to_string()
}

pub fn expire(&mut self) {
self.last -= Duration::from_secs(90);
}
}

#[derive(Debug, Default, Clone)]
Expand All @@ -55,8 +62,13 @@ impl SessionKeeper {
}

/// Create a new session or update the existing
pub fn ping(&mut self, mid: &str, sid: &str) {
self.sessions.entry(mid.to_string()).or_insert_with(|| Session::new(sid)).update();
pub fn ping(&mut self, mid: &str, sid: Option<&str>) {
if let Some(sid) = sid {
self.sessions.entry(mid.to_string()).or_insert_with(|| Session::new(sid)).update();
} else if let Some(session) = self.sessions.get_mut(mid) {
session.update();
}

self.gc();
}

Expand Down Expand Up @@ -86,4 +98,14 @@ impl SessionKeeper {
pub(crate) fn get_id(&self, mid: &str) -> Option<String> {
self.sessions.get(mid).map(|s| s.session_id())
}

pub(crate) fn remove(&mut self, id: &str) {
self.sessions.remove(id);
}

pub(crate) fn expire(&mut self, mid: &str) {
if let Some(session) = self.sessions.get_mut(mid) {
session.expire();
}
}
}
Loading

0 comments on commit 407bef6

Please sign in to comment.