Skip to content

Commit

Permalink
Add cycle Id. Each time a new instance of SysInspect runner starts, i…
Browse files Browse the repository at this point in the history
…t will have an unique Id for the current cycle
  • Loading branch information
isbm committed Feb 25, 2025
1 parent 4375e16 commit 195f663
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 19 deletions.
23 changes: 18 additions & 5 deletions libsysinspect/src/intp/actproc/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ pub struct ActionResponse {
// State Id
sid: String,

// Cycle Id
cid: String,

// Module response
pub response: ActionModResponse,
pub constraints: ConstraintResponse,
Expand All @@ -171,7 +174,7 @@ impl ActionResponse {
pub(crate) fn new(
eid: String, aid: String, sid: String, response: ActionModResponse, constraints: ConstraintResponse,
) -> Self {
Self { eid, aid, sid, response, constraints }
Self { eid, aid, sid, response, constraints, cid: "".to_string() }
}

/// Return an Entity Id to which this action was bound to
Expand All @@ -186,10 +189,20 @@ impl ActionResponse {

/// Return state Id of the action
pub fn sid(&self) -> &str {
if self.sid.is_empty() {
"$"
} else {
&self.sid
if self.sid.is_empty() { "$" } else { &self.sid }
}

/// Return cycle id. This one is set later by the callback.
pub fn cid(&self) -> &str {
&self.cid
}

/// Sets cycle id.
///
/// **NOTE: Does only once!**
pub fn set_cid(&mut self, cid: String) {
if self.cid.is_empty() {
self.cid = cid;
}
}

Expand Down
17 changes: 11 additions & 6 deletions sysmaster/src/evtreg/kvdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ impl EventData {
format!("{}/{}/{}", self.get_entity_id(), self.get_status_id(), self.get_action_id())
}

pub fn get_cycle_id(&self) -> String {
util::dataconv::as_str(self.data.get("cid").cloned())
}

pub fn get_response(&self) -> HashMap<String, Value> {
// Should work... :-)
serde_json::from_value(self.data.get("response").unwrap().clone()).unwrap()
Expand Down Expand Up @@ -132,17 +136,18 @@ impl EventsRegistry {
}

/// Add an event
pub fn add_event(&mut self, sid: EventSession, mid: EventMinion, payload: String) -> Result<(), SysinspectError> {
pub fn add_event(
&mut self, sid: EventSession, mid: EventMinion, payload: HashMap<String, Value>,
) -> Result<(), SysinspectError> {
let events = self.get_tree(&Self::to_tree_id(&sid, &mid))?;
let pl = serde_json::from_str::<HashMap<String, Value>>(&payload)?;
if let Err(err) = events.insert(
format!(
"{}/{}/{}",
util::dataconv::as_str(pl.get("eid").cloned()),
util::dataconv::as_str(pl.get("sid").cloned()),
util::dataconv::as_str(pl.get("aid").cloned())
util::dataconv::as_str(payload.get("eid").cloned()),
util::dataconv::as_str(payload.get("sid").cloned()),
util::dataconv::as_str(payload.get("aid").cloned())
),
payload.as_bytes().to_vec(),
serde_json::to_string(&payload)?.as_bytes(),
) {
Err(SysinspectError::MasterGeneralError(format!("{err}")))
} else {
Expand Down
22 changes: 17 additions & 5 deletions sysmaster/src/master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl SysMaster {
for m in self.evtreg.get_minions(&s).unwrap() {
log::warn!(">> ... {} - {}", m.id(), util::dataconv::as_str(m.get_trait(SYS_NET_HOSTNAME).cloned()));
for e in self.evtreg.get_events(&s, &m).unwrap() {
log::warn!(">> ... ... {}: {:#?}", e.get_event_id(), e.get_response());
log::warn!(">> ... ... ({}) - {}: {:#?}", e.get_cycle_id(), e.get_event_id(), e.get_response());
}
}
}
Expand Down Expand Up @@ -327,12 +327,24 @@ impl SysMaster {
tokio::spawn(async move {
let mut m = c_master.lock().await;
let mrec = m.mreg.get(req.id()).unwrap_or_default().unwrap_or_default();
let x = mrec.get_traits().keys().map(|s| s.to_string()).collect::<Vec<String>>();

let sid = m.evtreg.open_session("test model".to_string(), "blabla".to_string()).unwrap();
let pl = match serde_json::from_str::<HashMap<String, serde_json::Value>>(&req.payload()) {
Ok(pl) => pl,
Err(err) => {
log::error!("An event message with the bogus payload: {err}");
return;
}
};

let sid = m
.evtreg
.open_session(
util::dataconv::as_str(pl.get("eid").cloned()), // TODO: Should be an actual model name!
util::dataconv::as_str(pl.get("cid").cloned()),
)
.unwrap();
let mid =
m.evtreg.ensure_minion(&sid, req.id().to_string(), mrec.get_traits().to_owned()).unwrap();
m.evtreg.add_event(sid, EventMinion::new(mid), req.payload().to_string()).unwrap();
m.evtreg.add_event(sid, EventMinion::new(mid), pl).unwrap();
});
}

Expand Down
9 changes: 6 additions & 3 deletions sysminion/src/arcb.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use crate::minion::SysMinion;
use async_trait::async_trait;
use libsysinspect::{intp::actproc::response::ActionResponse, reactor::callback::EventProcessorCallback, SysinspectError};
use libsysinspect::{SysinspectError, intp::actproc::response::ActionResponse, reactor::callback::EventProcessorCallback};
use std::sync::Arc;
use uuid::Uuid;

#[derive(Debug)]
pub struct ActionResponseCallback {
cid: String,
minion: Arc<SysMinion>,
}

impl ActionResponseCallback {
pub(crate) fn new(minion: Arc<SysMinion>) -> Self {
Self { minion }
Self { minion, cid: Uuid::new_v4().to_string() }
}
}

#[async_trait]
impl EventProcessorCallback for ActionResponseCallback {
async fn on_action_response(&mut self, ar: ActionResponse) -> Result<(), SysinspectError> {
async fn on_action_response(&mut self, mut ar: ActionResponse) -> Result<(), SysinspectError> {
ar.set_cid(self.cid.to_owned());
self.minion.clone().send_callback(ar).await
}
}

0 comments on commit 195f663

Please sign in to comment.