Skip to content

Commit

Permalink
Merge pull request #23 from tinythings/advanced-groups
Browse files Browse the repository at this point in the history
Refactor Payloads and adds Markers
  • Loading branch information
Ichmed authored Jun 17, 2024
2 parents f8a892e + ae0e7ed commit b13bf30
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 59 deletions.
12 changes: 6 additions & 6 deletions core/src/builtin/ctl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{ops::ControlFlow, path::Path, time::Duration};

use crate::config::{payload::Payload, yaml::TaskConfigYaml};
use crate::config::yaml::TaskConfigYaml;
use anyhow::Result;
use nix::{sys::stat::Mode, unistd::mkfifo};
use smallvec::smallvec;
Expand All @@ -16,9 +16,9 @@ builtin_fn!(CreateCtlPipe: create_ctl);
impl IntoConfig for CreateCtlPipe {
fn into_config(self) -> TaskConfigYaml {
TaskConfigYaml {
name: "builtin::ctl-create".to_string(),
cmd: Payload::Normal("mkdir -p /run/var\nmkfifo /run/var/alfad-ctl".to_string()),
after: smallvec!["mount-sys-fs".to_owned()],
name: "builtin::ctl::create".to_string(),
cmd: Self::box_fn(),
after: smallvec!["feature::fs::run".to_owned()],
..Default::default()
}
}
Expand All @@ -36,8 +36,8 @@ builtin_fn!(WaitForCommands: wait_for_commands);
impl IntoConfig for WaitForCommands {
fn into_config(self) -> TaskConfigYaml {
TaskConfigYaml {
name: "builtin::ctl-commands".to_string(),
after: smallvec!["builtin::ctl-create".to_owned()],
name: "builtin::ctl::daemon".to_string(),
after: smallvec!["builtin::ctl::create".to_owned()],
cmd: Self::box_fn(),
..Default::default()
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/builtin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ macro_rules! builtin_fn {
pub struct $name;

impl $name {
pub fn box_fn() -> $crate::config::payload::Payload<String> {
$crate::config::payload::Payload::Builtin(Box::leak(Box::new($name)))
pub fn box_fn() -> $crate::config::yaml::PayloadYaml {
$crate::config::yaml::PayloadYaml::Builtin(Box::leak(Box::new($name)))
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use tracing::{debug, info_span};

use crate::{
ordering::{construct_groups, resolve_before, sort},
ordering::{construct_markers, resolve_before, sort},
validate,
};
use tracing::{error, instrument};
Expand Down Expand Up @@ -125,7 +125,7 @@ pub fn read_yaml_configs(path: &Path, builtin: Vec<TaskConfigYaml>) -> Vec<TaskC
});

configs.extend(builtin);
let groups = construct_groups(&configs);
let groups = construct_markers(&configs);
configs.extend(groups);

#[cfg(feature = "before")]
Expand Down
47 changes: 18 additions & 29 deletions core/src/config/payload.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{fmt::Debug, ops::ControlFlow, str::FromStr};

use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use smol::lock::RwLock;

use crate::{command_line::CommandLines, task::{ContextMap, TaskContext, TaskState}};
Expand All @@ -11,64 +11,53 @@ pub trait Runnable {
async fn run<'a>(&'a self, context: &'a RwLock<TaskContext>, context_map: ContextMap<'static>) -> ControlFlow<TaskState>;
}

#[derive(Serialize, Deserialize)]
pub enum Payload<T = CommandLines> {
Normal(T),
Builtin(&'static mut (dyn Runnable + Sync))
Marker,
Service(T),
#[serde(skip)]
Builtin(&'static mut (dyn Runnable + Sync)),
}

impl Payload {

pub async fn run(&self, x: usize, context: &RwLock<TaskContext>, context_map: ContextMap<'static>) -> ControlFlow<TaskState> {

match self {
Payload::Normal(command_lines) => match command_lines.get(x) {
Payload::Service(command_lines) => match command_lines.get(x) {
Some(command_line) => command_line.run(context, context_map).await,
None => ControlFlow::Break(TaskState::Done)
},
Payload::Builtin(runnable) if x == 0 => runnable.run(context, context_map).await,
Payload::Builtin(_) => ControlFlow::Break(TaskState::Done),
_ => ControlFlow::Break(TaskState::Done),
}
}
}

impl Serialize for Payload {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer {
match self {
Payload::Normal(n) => n.serialize(serializer),
Payload::Builtin(_) => Err(<S::Error as serde::ser::Error>::custom("Cannot serialize builtin tasks")),
}
}
}

impl<'de, T: Deserialize<'de>> Deserialize<'de> for Payload<T> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de> {
Ok(Self::Normal(T::deserialize(deserializer)?))

pub(crate) fn is_marker(&self) -> bool {
matches!(self, Self::Marker)
}
}

impl<T: Debug> Debug for Payload<T> {
impl<T: Debug + DeserializeOwned> Debug for Payload<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Normal(arg0) => f.debug_tuple("Normal").field(arg0).finish(),
Self::Service(arg0) => f.debug_tuple("Service").field(arg0).finish(),
Self::Builtin(_) => f.write_str("<builtin>"),
Self::Marker => f.write_str("<marker>"),
}
}
}

impl<T: Default> Default for Payload<T> {
impl<T: Default + DeserializeOwned> Default for Payload<T> {
fn default() -> Self {
Self::Normal(T::default())
Self::Service(T::default())
}
}

impl<T: FromStr> FromStr for Payload<T> {
impl<T: FromStr + DeserializeOwned> FromStr for Payload<T> {
type Err = <T as FromStr>::Err;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self::Normal(T::from_str(s)?))
Ok(Self::Service(T::from_str(s)?))
}
}
40 changes: 36 additions & 4 deletions core/src/config/yaml.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

use std::fmt::Debug;

use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize};
use smallvec::SmallVec;

Expand All @@ -8,8 +10,34 @@ use crate::{
config::{Respawn, TaskConfig},
};

use super::payload::Payload;
use super::payload::{Payload, Runnable};


#[derive(Serialize, Deserialize)]
#[serde(untagged)]
pub enum PayloadYaml {
// #[serde(deserialize_with = "T::deserialize")]
Service(String),
#[serde(skip)]
Builtin(&'static mut (dyn Runnable + Sync)),
Marker
}

impl Default for PayloadYaml {
fn default() -> Self {
Self::Service(String::new())
}
}

impl Debug for PayloadYaml {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Service(arg0) => f.debug_tuple("Service").field(arg0).finish(),
Self::Builtin(_) => f.write_str("<builtin>"),
Self::Marker => f.write_str("<marker>")
}
}
}


#[derive(Debug, Deserialize, Serialize, Eq, Clone, Hash, PartialEq)]
Expand Down Expand Up @@ -43,7 +71,7 @@ impl From<RespawnYaml> for Respawn {
pub struct TaskConfigYaml {
pub name: String,
#[serde(default)]
pub cmd: Payload<String>,
pub cmd: PayloadYaml,
#[cfg(feature = "before")]
#[serde(default)]
#[serde(deserialize_with = "OneOrMany::read")]
Expand All @@ -57,6 +85,9 @@ pub struct TaskConfigYaml {
#[serde(default)]
pub respawn: RespawnYaml,
pub group: Option<String>,
#[serde(default)]
#[serde(deserialize_with = "OneOrMany::read")]
pub provides: Vec<String>
}

impl TaskConfigYaml {
Expand All @@ -76,8 +107,9 @@ impl TaskConfigYaml {
Ok(TaskConfig {
name: self.name,
payload: match self.cmd {
Payload::Normal(x) => x.parse()?,
Payload::Builtin(builtin) => Payload::Builtin(builtin),
PayloadYaml::Service(x) => x.parse()?,
PayloadYaml::Builtin(builtin) => Payload::Builtin(builtin),
PayloadYaml::Marker => Payload::Marker
},
with: self.with,
after: self.after.into_vec(),
Expand Down
12 changes: 8 additions & 4 deletions core/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use anyhow::{Context, Result};
use clap::Parser;

use alfad::action::{Action, SystemCommand};
use config::{read_yaml_configs, yaml::TaskConfigYaml};
use config::{read_yaml_configs, yaml::TaskConfigYaml, TaskConfig};
use itertools::Itertools;
use tracing::Level;
use tracing::{Level};
use tracing_subscriber::FmtSubscriber;

use crate::builtin::{
Expand All @@ -36,7 +36,7 @@ fn main() -> Result<()> {
let name = Path::new(&name).file_name().unwrap().to_str().unwrap();

let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.with_max_level(Level::TRACE)
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
Expand Down Expand Up @@ -87,9 +87,13 @@ fn compile() -> Result<()> {
.filter(|x| builtin.iter().all(|bi| bi.name != x.name))
.collect_vec(),
);

let data = postcard::to_allocvec(&configs)?;
let (_, _): (String, Vec<TaskConfig>) = postcard::from_bytes(data.as_ref())?;

fs::write(
cli.target.join("alfad.bin"),
postcard::to_allocvec(&configs)?,
data,
)?;
Ok(())
}
44 changes: 35 additions & 9 deletions core/src/ordering.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
use std::collections::HashMap;

use itertools::Itertools;
use tracing::warn;

use crate::{config::yaml::TaskConfigYaml, config::TaskConfig};
use crate::config::{
yaml::{PayloadYaml, TaskConfigYaml},
TaskConfig,
};

pub fn construct_groups(configs: &[TaskConfigYaml]) -> Vec<TaskConfigYaml> {
pub fn construct_markers(configs: &[TaskConfigYaml]) -> Vec<TaskConfigYaml> {
let mut map = HashMap::new();
configs.iter().for_each(|config| {
config
.group
.as_ref()
.map(|group| format!("~{group}"))
.map(|group| {
map.entry(group.clone())
.or_insert_with(|| TaskConfigYaml::new(group.clone()))
.map(|group| format!("group::{group}"))
.map(|name| {
map.entry(name.clone())
.or_insert_with(|| TaskConfigYaml {
name,
cmd: PayloadYaml::Marker,
..Default::default()
})
.after(&config.name)
});
});
configs.iter().for_each(|config| {
for feature in config.provides.iter() {
let name = format!("feature::{feature}");
let mut conf = TaskConfigYaml {
name: name.clone(),
cmd: PayloadYaml::Marker,
..Default::default()
};
conf.after(&config.name);
if let Some(old) = map.insert(name, conf) {
warn!("Overriding feature::{feature}, already provided by {}", old.name)
}
}
});
map.into_values().collect()
}

Expand Down Expand Up @@ -63,8 +85,8 @@ pub fn sort(configs: Vec<TaskConfig>) -> Vec<TaskConfig> {
continue;
}

// Move groups to the back of the list because they must always wait
if t.name.starts_with('~') {
// Move markers to the back of the list because they must always wait
if t.payload.is_marker() {
continue;
}

Expand All @@ -81,7 +103,11 @@ pub fn sort(configs: Vec<TaskConfig>) -> Vec<TaskConfig> {
}
}
}
let mut res = no_deps.into_iter().flat_map(|x| map.remove(&x)).collect_vec();

let mut res = no_deps
.into_iter()
.flat_map(|x| map.remove(&x))
.collect_vec();
res.extend(sorter.flat_map(|x| map.remove(&x)));

// Add all cyclical and orphaned tasks to the end, we may still want to force start them
Expand Down
14 changes: 11 additions & 3 deletions core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing::{error, info, info_span, trace, warn};
use serde::Deserialize;
use smol::{lock::RwLock, ready};

use crate::{command_line::Child, config::{Respawn, TaskConfig}};
use crate::{command_line::Child, config::{payload::Payload, Respawn, TaskConfig}};

pub type ContextMap<'a> = &'a HashMap<&'a str, RwLock<TaskContext>>;

Expand Down Expand Up @@ -57,6 +57,7 @@ impl Default for TaskState {
#[derive(Debug)]
pub struct Task<'a> {
pub state: TaskState,
old_state: TaskState,
pub config: &'a TaskConfig,
pub context_map: ContextMap<'static>,
context: &'a RwLock<TaskContext>,
Expand Down Expand Up @@ -114,7 +115,9 @@ impl<'a> Task<'a> {
}

pub fn spawn(config: &'static TaskConfig, context_map: ContextMap<'static>) {
info!("Spawning {}", config.name);
if matches!(config.payload, Payload::Service(_) | Payload::Builtin(_)) {
info!("Spawning {}", config.name);
}
smol::spawn(async move { Self::new(config, context_map).await }).detach()
}

Expand All @@ -124,6 +127,7 @@ impl<'a> Task<'a> {
) -> Self {
Self {
state: TaskState::Waiting,
old_state: TaskState::Waiting,
config,
context_map,
context: context_map
Expand Down Expand Up @@ -218,6 +222,10 @@ impl<'a> Task<'a> {
}

async fn propagate_state(&mut self) {
if self.state == self.old_state {
return;
}
self.old_state = self.state;
self.trace();
let state = self.state;
self.context.write().await.update_state(state);
Expand Down Expand Up @@ -279,8 +287,8 @@ impl TaskContext {
}

pub async fn wait_for_terminate(&mut self) -> TaskState {
info!("Killing {:?}", self.child.as_ref().map(|c| c.id()));
if let Some(child) = self.child.as_mut() {
info!("Killing {:?}", child.id());
child.status().await.ok();
self.state = TaskState::Terminated;
self.child = None;
Expand Down

0 comments on commit b13bf30

Please sign in to comment.