diff --git a/core/src/action.rs b/core/src/action.rs index cda3da5..07e98bd 100644 --- a/core/src/action.rs +++ b/core/src/action.rs @@ -1,9 +1,8 @@ +use clap::{Parser, ValueEnum}; use std::{ fmt::{Debug, Display}, str::FromStr, }; - -use clap::{Parser, ValueEnum}; use strum::{Display, EnumIter}; use thiserror::Error; diff --git a/core/src/builtin/ctl.rs b/core/src/builtin/ctl.rs index 34b7c58..ba8ebac 100644 --- a/core/src/builtin/ctl.rs +++ b/core/src/builtin/ctl.rs @@ -1,13 +1,18 @@ -use std::{ops::ControlFlow, path::Path, time::Duration}; - use crate::{config::yaml::TaskConfigYaml, task::ExitReason}; use anyhow::Result; use nix::{sys::stat::Mode, unistd::mkfifo}; use smallvec::smallvec; -use smol::{fs::{create_dir_all, File}, io::{AsyncBufReadExt, BufReader}}; -use tracing::{info, error}; +use smol::{ + fs::{create_dir_all, File}, + io::{AsyncBufReadExt, BufReader}, +}; +use std::{ops::ControlFlow, path::Path, time::Duration}; +use tracing::{error, info}; -use crate::{builtin_fn, task::{ContextMap, TaskContext, TaskState}}; +use crate::{ + builtin_fn, + task::{ContextMap, TaskContext, TaskState}, +}; use super::IntoConfig; @@ -30,7 +35,6 @@ async fn create_ctl(_: &TaskContext, _context: ContextMap<'static>) -> Result<() Ok(()) } - builtin_fn!(WaitForCommands: wait_for_commands); impl IntoConfig for WaitForCommands { @@ -44,37 +48,38 @@ impl IntoConfig for WaitForCommands { } } - async fn wait_for_commands(context: &TaskContext, context_map: ContextMap<'static>) -> Result<()> { let mut buf = String::new(); + loop { + if context.state().await == TaskState::Terminating { + context + .update_state(TaskState::Concluded(ExitReason::Terminated)) + .await; + break Ok(()); + }; + let mut pipe = match create_pipe().await { + Ok(x) => x, + Err(error) => { + error!("Could not create pipe: {error}"); + smol::Timer::after(Duration::from_secs(10)).await; + continue; + } + }; loop { - if context.state().await == TaskState::Terminating { - context.update_state(TaskState::Concluded(ExitReason::Terminated)).await; - break Ok(()); - }; - let mut pipe = match create_pipe().await { - Ok(x) => x, - Err(error) => { - error!("Could not create pipe: {error}"); - smol::Timer::after(Duration::from_secs(10)).await; - continue; - } - }; - loop { - match pipe.read_line(&mut buf).await { - Ok(bytes) if bytes > 0 => { - let action = buf.trim(); - info!(action); - if let Err(error) = crate::perform_action::perform(action, context_map).await { - error!(%error); - } + match pipe.read_line(&mut buf).await { + Ok(bytes) if bytes > 0 => { + let action = buf.trim(); + info!(action); + if let Err(error) = crate::perform_action::perform(action, context_map).await { + error!(%error); } - _ => break, } - - buf.clear(); + _ => break, } + + buf.clear(); } + } } async fn create_pipe() -> Result> { diff --git a/core/src/builtin/mod.rs b/core/src/builtin/mod.rs index c30c0d3..53c33ca 100644 --- a/core/src/builtin/mod.rs +++ b/core/src/builtin/mod.rs @@ -1,19 +1,17 @@ +use crate::task::{ContextMap, ExitReason, TaskContext}; +use crate::{ + config::{payload::Runnable, yaml::TaskConfigYaml}, + task::TaskState, +}; +use async_trait::async_trait; +use futures::{ready, Future}; use std::{ ops::ControlFlow, pin::{pin, Pin}, task::Poll, }; - -use crate::task::{ContextMap, ExitReason, TaskContext}; -use async_trait::async_trait; -use futures::{ready, Future}; use tracing::{debug, info}; -use crate::{ - config::{payload::Runnable, yaml::TaskConfigYaml}, - task::TaskState, -}; - pub mod ctl; pub trait IntoConfig { diff --git a/core/src/command_line/complex.rs b/core/src/command_line/complex.rs index ee5c8fe..4d9ab4b 100644 --- a/core/src/command_line/complex.rs +++ b/core/src/command_line/complex.rs @@ -1,3 +1,11 @@ +use crate::{ + config::payload::Runnable, + task::{ContextMap, ExitReason, TaskContext, TaskState}, +}; +use lazy_static::lazy_static; +use regex::{Captures, Regex}; +use serde::{Deserialize, Serialize}; +use smol::process::Command; use std::{ env, ops::{ControlFlow, Deref, DerefMut}, @@ -5,19 +13,9 @@ use std::{ slice::Iter, str::FromStr, }; - -use lazy_static::lazy_static; -use regex::{Captures, Regex}; -use serde::{Deserialize, Serialize}; -use smol::process::Command; use thiserror::Error; use tracing::{debug, error, info}; -use crate::{ - config::payload::Runnable, - task::{ContextMap, ExitReason, TaskContext, TaskState}, -}; - #[derive(Debug, Serialize, Deserialize)] pub struct CommandLine { ignore_env: bool, @@ -69,15 +67,15 @@ impl CommandLine { async fn run_line(&self, context: &TaskContext) -> ControlFlow { // let mut context = context.write().await; - + debug!(cmd = ?self.args, "Running"); let mut child = match self.spawn() { - Ok(c) => c, - Err(CommandLineError::EmptyCommand) => return ControlFlow::Continue(()), - Err(e) => { - error!(%e); - return ControlFlow::Break(TaskState::Concluded(ExitReason::Failed)); - } + Ok(c) => c, + Err(CommandLineError::EmptyCommand) => return ControlFlow::Continue(()), + Err(e) => { + error!(%e); + return ControlFlow::Break(TaskState::Concluded(ExitReason::Failed)); + } }; (*context.child.write().await) = Some(child.id() as i32); @@ -100,7 +98,8 @@ impl CommandLine { impl Runnable for CommandLine { async fn run<'a>( &'a self, - context: &'a TaskContext, _context_map: ContextMap<'static> + context: &'a TaskContext, + _context_map: ContextMap<'static>, ) -> ControlFlow { self.run_line(context).await } diff --git a/core/src/command_line/mod.rs b/core/src/command_line/mod.rs index efcb15f..045674f 100644 --- a/core/src/command_line/mod.rs +++ b/core/src/command_line/mod.rs @@ -6,4 +6,4 @@ pub use complex::*; #[cfg(not(feature = "complex_commands"))] mod simple; #[cfg(not(feature = "complex_commands"))] -pub use simple::*; \ No newline at end of file +pub use simple::*; diff --git a/core/src/config/mod.rs b/core/src/config/mod.rs index 1ad0197..f7cd7f5 100644 --- a/core/src/config/mod.rs +++ b/core/src/config/mod.rs @@ -1,6 +1,10 @@ pub mod payload; pub mod yaml; - +use self::{payload::Payload, yaml::TaskConfigYaml}; +use crate::{ + ordering::{construct_markers, resolve_before, sort}, + validate, +}; use serde::{Deserialize, Serialize}; use smol::stream::StreamExt; use std::{ @@ -10,15 +14,8 @@ use std::{ path::Path, }; use tracing::{debug, info_span}; - -use crate::{ - ordering::{construct_markers, resolve_before, sort}, - validate, -}; use tracing::{error, instrument}; -use self::{payload::Payload, yaml::TaskConfigYaml}; - #[derive(Debug, Deserialize, Serialize, PartialEq, Eq)] pub enum Respawn { /// Never retry this task (default) diff --git a/core/src/config/payload.rs b/core/src/config/payload.rs index a089b6e..7dce284 100644 --- a/core/src/config/payload.rs +++ b/core/src/config/payload.rs @@ -1,13 +1,18 @@ -use std::{fmt::Debug, ops::ControlFlow, str::FromStr}; - +use crate::{ + builtin::BuiltInService, + command_line::CommandLines, + task::{ContextMap, ExitReason, TaskContext, TaskState}, +}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; - -use crate::{command_line::CommandLines, task::{ContextMap, ExitReason, TaskContext, TaskState}, builtin::BuiltInService}; - +use std::{fmt::Debug, ops::ControlFlow, str::FromStr}; #[async_trait::async_trait] pub trait Runnable { - async fn run<'a>(&'a self, context: &'a TaskContext, context_map: ContextMap<'static>) -> ControlFlow; + async fn run<'a>( + &'a self, + context: &'a TaskContext, + context_map: ContextMap<'static>, + ) -> ControlFlow; } #[derive(Serialize, Deserialize)] @@ -19,19 +24,22 @@ pub enum Payload { } impl Payload { - - pub async fn run(&self, x: usize, context: &TaskContext, context_map: ContextMap<'static>) -> ControlFlow { - + pub async fn run( + &self, + x: usize, + context: &TaskContext, + context_map: ContextMap<'static>, + ) -> ControlFlow { match self { Payload::Service(command_lines) => match command_lines.get(x) { Some(command_line) => command_line.run(context, context_map).await, - None => ControlFlow::Break(TaskState::Concluded(ExitReason::Done)) + None => ControlFlow::Break(TaskState::Concluded(ExitReason::Done)), }, Payload::Builtin(runnable) if x == 0 => runnable.run(context, context_map).await, _ => ControlFlow::Break(TaskState::Concluded(ExitReason::Done)), } } - + pub(crate) fn is_marker(&self) -> bool { matches!(self, Self::Marker) } @@ -59,4 +67,4 @@ impl FromStr for Payload { fn from_str(s: &str) -> Result { Ok(Self::Service(T::from_str(s)?)) } -} \ No newline at end of file +} diff --git a/core/src/config/yaml.rs b/core/src/config/yaml.rs index d42ea8a..d9db286 100644 --- a/core/src/config/yaml.rs +++ b/core/src/config/yaml.rs @@ -1,16 +1,12 @@ - -use std::fmt::Debug; - -use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize}; -use smallvec::SmallVec; - - +use super::payload::Payload; use crate::{ - builtin::BuiltInService, command_line, config::{Respawn, TaskConfig} + builtin::BuiltInService, + command_line, + config::{Respawn, TaskConfig}, }; - -use super::payload::Payload; - +use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize}; +use smallvec::SmallVec; +use std::fmt::Debug; #[derive(Serialize, Deserialize)] #[serde(untagged)] @@ -19,7 +15,7 @@ pub enum PayloadYaml { Service(String), #[serde(skip)] Builtin(BuiltInService), - Marker + Marker, } impl Default for PayloadYaml { @@ -33,12 +29,11 @@ impl Debug for PayloadYaml { match self { Self::Service(arg0) => f.debug_tuple("Service").field(arg0).finish(), Self::Builtin(_) => f.write_str(""), - Self::Marker => f.write_str("") + Self::Marker => f.write_str(""), } } } - #[derive(Debug, Deserialize, Serialize, Eq, Clone, Hash, PartialEq)] #[serde(untagged)] pub enum RespawnYaml { @@ -86,7 +81,7 @@ pub struct TaskConfigYaml { pub group: Option, #[serde(default)] #[serde(deserialize_with = "OneOrMany::read")] - pub provides: Vec + pub provides: Vec, } impl TaskConfigYaml { @@ -108,7 +103,7 @@ impl TaskConfigYaml { payload: match self.cmd { PayloadYaml::Service(x) => x.parse()?, PayloadYaml::Builtin(builtin) => Payload::Builtin(builtin), - PayloadYaml::Marker => Payload::Marker + PayloadYaml::Marker => Payload::Marker, }, with: self.with, after: self.after.into_vec(), @@ -170,7 +165,7 @@ mod test { r#" name: bar cmd: [echo, "hello from inside bar"] - _after: + _after: - foo - bar "#, diff --git a/core/src/init.rs b/core/src/init.rs index 24cd879..58d148b 100644 --- a/core/src/init.rs +++ b/core/src/init.rs @@ -1,18 +1,14 @@ -use crate::{config::read_config, task::TaskContext}; use crate::config::yaml::TaskConfigYaml; - +use crate::task::ContextMap; +use crate::{config::read_config, task::TaskContext}; use anyhow::Result; use futures::StreamExt; -use nix:: - libc::{SIGABRT, SIGHUP, SIGPIPE, SIGTERM, SIGTSTP} -; +use nix::libc::{SIGABRT, SIGHUP, SIGPIPE, SIGTERM, SIGTSTP}; use signal_hook::iterator::exfiltrator::WithOrigin; use signal_hook_async_std::SignalsInfo; use std::env; use tracing::info; -use crate::task::ContextMap; - const SIGS: &[i32] = &[SIGABRT, SIGTERM, SIGHUP, SIGPIPE, SIGTSTP]; pub struct Alfad { @@ -40,7 +36,8 @@ impl Alfad { .collect(), ))); info!("Done parsing ({} tasks)", context.0.len()); - context.0 + context + .0 .values() .for_each(|config| crate::task::spawn(config, context)); // smol::block_on(async { wait_for_commands(context).await }); diff --git a/core/src/main.rs b/core/src/main.rs index d1481d6..af91a6e 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -8,27 +8,24 @@ mod perform_action; pub mod task; mod validate; +use crate::builtin::{ + ctl::{CreateCtlPipe, WaitForCommands}, + IntoConfig, +}; +use alfad::action::{Action, SystemCommand}; +use anyhow::{Context, Result}; +use clap::Parser; +use config::{read_yaml_configs, yaml::TaskConfigYaml, TaskConfig}; +use itertools::Itertools; use std::{ env, fs::{self, OpenOptions}, io::Write, path::{Path, PathBuf}, }; - -use anyhow::{Context, Result}; -use clap::Parser; - -use alfad::action::{Action, SystemCommand}; -use config::{read_yaml_configs, yaml::TaskConfigYaml, TaskConfig}; -use itertools::Itertools; -use tracing::{Level}; +use tracing::Level; use tracing_subscriber::FmtSubscriber; -use crate::builtin::{ - ctl::{CreateCtlPipe, WaitForCommands}, - IntoConfig, -}; - pub static VERSION: &str = "0.1"; fn main() -> Result<()> { @@ -91,9 +88,6 @@ fn compile() -> Result<()> { let data = postcard::to_allocvec(&configs)?; let (_, _): (String, Vec) = postcard::from_bytes(data.as_ref())?; - fs::write( - cli.target.join("alfad.bin"), - data, - )?; + fs::write(cli.target.join("alfad.bin"), data)?; Ok(()) } diff --git a/core/src/ordering.rs b/core/src/ordering.rs index 0d20d51..83dd25d 100644 --- a/core/src/ordering.rs +++ b/core/src/ordering.rs @@ -1,12 +1,10 @@ -use std::collections::HashMap; - -use itertools::Itertools; -use tracing::warn; - use crate::config::{ yaml::{PayloadYaml, TaskConfigYaml}, TaskConfig, }; +use itertools::Itertools; +use std::collections::HashMap; +use tracing::warn; pub fn construct_markers(configs: &[TaskConfigYaml]) -> Vec { let mut map = HashMap::new(); @@ -35,7 +33,10 @@ pub fn construct_markers(configs: &[TaskConfigYaml]) -> Vec { }; conf.after(&config.name); if let Some(old) = map.insert(name, conf) { - warn!("Overriding feature::{feature}, already provided by {}", old.name) + warn!( + "Overriding feature::{feature}, already provided by {}", + old.name + ) } } }); @@ -103,7 +104,7 @@ pub fn sort(configs: Vec) -> Vec { } } } - + let mut res = no_deps .into_iter() .flat_map(|x| map.remove(&x)) diff --git a/core/src/perform_action.rs b/core/src/perform_action.rs index 2c66eb5..f0b4b75 100644 --- a/core/src/perform_action.rs +++ b/core/src/perform_action.rs @@ -1,5 +1,3 @@ -use std::{ffi::c_int, str::FromStr, time::Duration}; - use crate::{ action::{Action, ActionError, SystemCommand}, task::{ContextMap, ExitReason, TaskContext, TaskState}, @@ -12,6 +10,7 @@ use nix::{ }, sys::signal::Signal, }; +use std::{ffi::c_int, str::FromStr, time::Duration}; use thiserror::Error; use tracing::{error, info}; diff --git a/core/src/task.rs b/core/src/task.rs index f31c3f9..da3266e 100644 --- a/core/src/task.rs +++ b/core/src/task.rs @@ -1,3 +1,10 @@ +use crate::config::{payload::Payload, Respawn, TaskConfig}; +use nix::{sys::signal::Signal, unistd::Pid}; +use serde::Deserialize; +use smol::{ + lock::{RwLock, RwLockUpgradableReadGuard}, + ready, +}; use std::{ collections::HashMap, future::Future, @@ -5,21 +12,9 @@ use std::{ pin::{pin, Pin}, task::{Context, Poll, Waker}, }; - use strum::Display; - -use nix::{sys::signal::Signal, unistd::Pid}; - use tracing::{debug, error, info, trace, trace_span}; -use serde::Deserialize; -use smol::{ - lock::{RwLock, RwLockUpgradableReadGuard}, - ready, -}; - -use crate::config::{payload::Payload, Respawn, TaskConfig}; - #[derive(Debug, Clone, Copy)] pub struct ContextMap<'a>(pub &'a HashMap<&'a str, TaskContext>); @@ -102,7 +97,7 @@ pub enum ExitReason { Done, Failed, Terminated, - Deactivated + Deactivated, } impl TaskState { @@ -141,7 +136,9 @@ pub async fn drive(context: &'static TaskContext, context_map: ContextMap<'stati for task in context.config.with.iter() { trace!("{} waiting for {task} to be Running", context.config.name); if context_map.wait_for_running(task).await.is_none() { - context.update_state(TaskState::Concluded(ExitReason::Deactivated)).await; + context + .update_state(TaskState::Concluded(ExitReason::Deactivated)) + .await; return; } } @@ -154,15 +151,20 @@ pub async fn drive(context: &'static TaskContext, context_map: ContextMap<'stati trace!("{} waiting for {task} to be Done", context.config.name); if context_map .wait_for(task, TaskState::Concluded(ExitReason::Done)) - .await.is_none() + .await + .is_none() { - context.update_state(TaskState::Concluded(ExitReason::Deactivated)).await; + context + .update_state(TaskState::Concluded(ExitReason::Deactivated)) + .await; return; } } if context.config.payload.is_marker() { - context.update_state(TaskState::Concluded(ExitReason::Done)).await; + context + .update_state(TaskState::Concluded(ExitReason::Done)) + .await; break; } diff --git a/core/src/validate.rs b/core/src/validate.rs index 04f314b..9dd8d32 100644 --- a/core/src/validate.rs +++ b/core/src/validate.rs @@ -1,9 +1,7 @@ +use crate::config::TaskConfig; use std::collections::HashMap; - use tracing::{error, warn}; -use crate::config::TaskConfig; - pub fn validate(configs: Vec) -> Vec { let map: HashMap<_, _> = configs .iter()