Skip to content

Commit

Permalink
Merge pull request #25 from tinythings:move-tasks-to-async-fn
Browse files Browse the repository at this point in the history
Move-tasks-to-async-fn
  • Loading branch information
Ichmed authored Jun 26, 2024
2 parents b13bf30 + 16c4235 commit 99910d5
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 312 deletions.
16 changes: 9 additions & 7 deletions core/src/builtin/ctl.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{ops::ControlFlow, path::Path, time::Duration};

use crate::config::yaml::TaskConfigYaml;
use crate::{config::yaml::TaskConfigYaml, task::ExitReason};
use anyhow::Result;
use nix::{sys::stat::Mode, unistd::mkfifo};
use smallvec::smallvec;
use smol::{fs::{create_dir_all, File}, io::{AsyncBufReadExt, BufReader}, lock::RwLock};
use smol::{fs::{create_dir_all, File}, io::{AsyncBufReadExt, BufReader}};
use tracing::{info, error};

use crate::{builtin_fn, task::{ContextMap, TaskContext, TaskState}};
Expand All @@ -24,7 +24,7 @@ impl IntoConfig for CreateCtlPipe {
}
}

async fn create_ctl(_: &RwLock<TaskContext>, _context: ContextMap<'static>) -> Result<()> {
async fn create_ctl(_: &TaskContext, _context: ContextMap<'static>) -> Result<()> {
create_dir_all("/run/var").await?;
mkfifo("/run/var/alfad-ctl", Mode::S_IRWXU | Mode::S_IWOTH)?;
Ok(())
Expand All @@ -45,10 +45,13 @@ impl IntoConfig for WaitForCommands {
}


async fn wait_for_commands(_: &RwLock<TaskContext>, context: ContextMap<'static>) -> Result<()> {
async fn wait_for_commands(context: &TaskContext, context_map: ContextMap<'static>) -> Result<()> {
let mut buf = String::new();
smol::block_on(async {
loop {
if context.state().await == TaskState::Terminating {
context.update_state(TaskState::Concluded(ExitReason::Terminated)).await;
break Ok(());
};
let mut pipe = match create_pipe().await {
Ok(x) => x,
Err(error) => {
Expand All @@ -62,7 +65,7 @@ async fn wait_for_commands(_: &RwLock<TaskContext>, context: ContextMap<'static>
Ok(bytes) if bytes > 0 => {
let action = buf.trim();
info!(action);
if let Err(error) = crate::perform_action::perform(action, context).await {
if let Err(error) = crate::perform_action::perform(action, context_map).await {
error!(%error);
}
}
Expand All @@ -72,7 +75,6 @@ async fn wait_for_commands(_: &RwLock<TaskContext>, context: ContextMap<'static>
buf.clear();
}
}
})
}

async fn create_pipe() -> Result<BufReader<File>> {
Expand Down
69 changes: 64 additions & 5 deletions core/src/builtin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,78 @@
use crate::config::yaml::TaskConfigYaml;
use std::{
ops::ControlFlow,
pin::{pin, Pin},
task::Poll,
};

use crate::task::{ContextMap, ExitReason, TaskContext};
use async_trait::async_trait;
use futures::{ready, Future};
use tracing::{debug, info};

use crate::{
config::{payload::Runnable, yaml::TaskConfigYaml},
task::TaskState,
};

pub mod ctl;

pub trait IntoConfig {
fn into_config(self) -> TaskConfigYaml;
}

pub struct BuiltInService {
function: &'static (dyn Runnable + Sync + Send),
}

#[async_trait]
impl Runnable for BuiltInService {
async fn run<'a>(
&'a self,
context: &'a TaskContext,
context_map: ContextMap<'static>,
) -> ControlFlow<TaskState> {
BuiltInServiceManager {
function: pin!(self.function.run(context, context_map)),
context,
}
.await
}
}

pub struct BuiltInServiceManager<'a, T: Future<Output = ControlFlow<TaskState>>> {
function: Pin<&'a mut T>,
context: &'a TaskContext,
}

impl<'a, T: Future<Output = ControlFlow<TaskState>>> Future for BuiltInServiceManager<'a, T> {
type Output = ControlFlow<TaskState>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let state = ready!(pin!(self.context.state()).poll(cx));
debug!(name = self.context.config.name, %state);
if state == TaskState::Terminating {
info!(name = self.context.config.name, "Terminating");
Poll::Ready(ControlFlow::Break(TaskState::Concluded(
ExitReason::Terminated,
)))
} else {
ready!(pin!(self.context.set_waker(cx.waker())).poll(cx));
self.function.as_mut().poll(cx)
}
}
}

#[macro_export]
macro_rules! builtin_fn {
($name:ident: $function:ident) => {
pub struct $name;

impl $name {
pub fn box_fn() -> $crate::config::yaml::PayloadYaml {
$crate::config::yaml::PayloadYaml::Builtin(Box::leak(Box::new($name)))
$crate::config::yaml::PayloadYaml::Builtin($crate::builtin::BuiltInService{ function: Box::leak(Box::new($name))})
}
}

Expand All @@ -22,18 +81,18 @@ macro_rules! builtin_fn {
impl $crate::config::payload::Runnable for $name {
async fn run<'a>(
&'a self,
context: &'a RwLock<TaskContext>,
context: &'a TaskContext,
context_map: ContextMap<'static>,
) -> ControlFlow<TaskState> {
match $function(context, context_map).await {
Ok(_) => ControlFlow::Continue(()),
Err(error) => {
tracing::error!(%error);
ControlFlow::Break($crate::task::TaskState::Failed)
ControlFlow::Break($crate::task::TaskState::Concluded($crate::task::ExitReason::Failed))
}
}
}
}

};
}
}
30 changes: 15 additions & 15 deletions core/src/command_line/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use std::{
use lazy_static::lazy_static;
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
use smol::{lock::RwLock, process::Command};
use smol::process::Command;
use thiserror::Error;
use tracing::{error, info};
use tracing::{debug, error, info};

use crate::{
config::payload::Runnable,
task::{ContextMap, TaskContext, TaskState},
task::{ContextMap, ExitReason, TaskContext, TaskState},
};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -67,30 +67,30 @@ impl CommandLine {
Ok(Child(self.to_command()?.spawn()?, self.ignore_return))
}

async fn run_line(&self, context: &RwLock<TaskContext>) -> ControlFlow<TaskState> {
let mut context = context.write().await;

let child = match context.child.as_mut() {
Some(child) => child,
None => match self.spawn() {
Ok(c) => context.child.insert(c),
async fn run_line(&self, context: &TaskContext) -> ControlFlow<TaskState> {
// let mut context = context.write().await;

debug!(cmd = ?self.args, "Running");
let mut child = match self.spawn() {
Ok(c) => c,
Err(CommandLineError::EmptyCommand) => return ControlFlow::Continue(()),
Err(e) => {
error!(%e);
return ControlFlow::Break(TaskState::Failed);
return ControlFlow::Break(TaskState::Concluded(ExitReason::Failed));
}
},
};

(*context.child.write().await) = Some(child.id() as i32);

match child.status().await {
Ok(status) if status.success() => {
info!(?status);
context.child = None;
(*context.child.write().await) = None;
ControlFlow::Continue(())
}
status => {
error!(exit = ?status);
ControlFlow::Break(TaskState::Failed)
ControlFlow::Break(TaskState::Concluded(ExitReason::Failed))
}
}
}
Expand All @@ -100,7 +100,7 @@ impl CommandLine {
impl Runnable for CommandLine {
async fn run<'a>(
&'a self,
context: &'a RwLock<TaskContext>, _context_map: ContextMap<'static>
context: &'a TaskContext, _context_map: ContextMap<'static>
) -> ControlFlow<TaskState> {
self.run_line(context).await
}
Expand Down
13 changes: 6 additions & 7 deletions core/src/config/payload.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,34 @@
use std::{fmt::Debug, ops::ControlFlow, str::FromStr};

use serde::{de::DeserializeOwned, Deserialize, Serialize};
use smol::lock::RwLock;

use crate::{command_line::CommandLines, task::{ContextMap, TaskContext, TaskState}};
use crate::{command_line::CommandLines, task::{ContextMap, ExitReason, TaskContext, TaskState}, builtin::BuiltInService};


#[async_trait::async_trait]
pub trait Runnable {
async fn run<'a>(&'a self, context: &'a RwLock<TaskContext>, context_map: ContextMap<'static>) -> ControlFlow<TaskState>;
async fn run<'a>(&'a self, context: &'a TaskContext, context_map: ContextMap<'static>) -> ControlFlow<TaskState>;
}

#[derive(Serialize, Deserialize)]
pub enum Payload<T = CommandLines> {
Marker,
Service(T),
#[serde(skip)]
Builtin(&'static mut (dyn Runnable + Sync)),
Builtin(BuiltInService),
}

impl Payload {

pub async fn run(&self, x: usize, context: &RwLock<TaskContext>, context_map: ContextMap<'static>) -> ControlFlow<TaskState> {
pub async fn run(&self, x: usize, context: &TaskContext, context_map: ContextMap<'static>) -> ControlFlow<TaskState> {

match self {
Payload::Service(command_lines) => match command_lines.get(x) {
Some(command_line) => command_line.run(context, context_map).await,
None => ControlFlow::Break(TaskState::Done)
None => ControlFlow::Break(TaskState::Concluded(ExitReason::Done))
},
Payload::Builtin(runnable) if x == 0 => runnable.run(context, context_map).await,
_ => ControlFlow::Break(TaskState::Done),
_ => ControlFlow::Break(TaskState::Concluded(ExitReason::Done)),
}
}

Expand Down
7 changes: 3 additions & 4 deletions core/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use smallvec::SmallVec;


use crate::{
command_line,
config::{Respawn, TaskConfig},
builtin::BuiltInService, command_line, config::{Respawn, TaskConfig}
};

use super::payload::{Payload, Runnable};
use super::payload::Payload;


#[derive(Serialize, Deserialize)]
Expand All @@ -19,7 +18,7 @@ pub enum PayloadYaml {
// #[serde(deserialize_with = "T::deserialize")]
Service(String),
#[serde(skip)]
Builtin(&'static mut (dyn Runnable + Sync)),
Builtin(BuiltInService),
Marker
}

Expand Down
46 changes: 18 additions & 28 deletions core/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use crate::config::read_config;
use crate::{config::read_config, task::TaskContext};
use crate::config::yaml::TaskConfigYaml;

use anyhow::Result;
use futures::StreamExt;
use nix::{
libc::{SIGABRT, SIGCHLD, SIGHUP, SIGPIPE, SIGTERM, SIGTSTP},
sys::wait::waitpid,
unistd::Pid,
};
use signal_hook::{iterator::exfiltrator::WithOrigin, low_level::siginfo::Origin};
use nix::
libc::{SIGABRT, SIGHUP, SIGPIPE, SIGTERM, SIGTSTP}
;
use signal_hook::iterator::exfiltrator::WithOrigin;
use signal_hook_async_std::SignalsInfo;
use std::env;
use tracing::info;

use crate::task::{ContextMap, Task};
use crate::task::ContextMap;

const SIGS: &[i32] = &[SIGABRT, SIGTERM, SIGCHLD, SIGHUP, SIGPIPE, SIGTSTP];
const SIGS: &[i32] = &[SIGABRT, SIGTERM, SIGHUP, SIGPIPE, SIGTSTP];

pub struct Alfad {
pub builtin: Vec<TaskConfigYaml>,
Expand All @@ -26,33 +24,25 @@ impl Alfad {
let mut signals = SignalsInfo::<WithOrigin>::new(SIGS).unwrap();

smol::spawn(async move {
while let Some(sig) = signals.next().await {
if let Origin {
signal: SIGCHLD,
process: Some(proc),
..
} = sig
{
// Ignore Err(_) since ECHILD is expected
waitpid(Some(Pid::from_raw(proc.pid)), None).ok();
}
loop {
signals.next().await;
}
})
.detach();

env::set_var("SMOL_THREADS", "8");
info!("Starting alfad");
let configs = Box::leak(Box::new(read_config(self.builtin)));
let context: ContextMap = Box::leak(Box::new(
let configs = read_config(self.builtin);
let context: ContextMap = ContextMap(Box::leak(Box::new(
configs
.iter()
.map(|config| (config.name.as_str(), Default::default()))
.into_iter()
.map(|config| (&*config.name.clone().leak(), TaskContext::new(config)))
.collect(),
));
info!("Done parsing");
configs
.iter()
.for_each(|config| Task::spawn(config, context));
)));
info!("Done parsing ({} tasks)", context.0.len());
context.0
.values()
.for_each(|config| crate::task::spawn(config, context));
// smol::block_on(async { wait_for_commands(context).await });
smol::block_on(smol::Timer::never());
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ pub mod config;
pub mod task;
pub mod ordering;
pub mod validate;
pub mod builtin;
pub mod command_line;
pub mod perform_action;

pub static VERSION: &str = "0.1";
Loading

0 comments on commit 99910d5

Please sign in to comment.