Skip to content

Commit

Permalink
Fix import formats
Browse files Browse the repository at this point in the history
  • Loading branch information
isbm committed Jun 28, 2024
1 parent 5f03888 commit 3f6fa8c
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 151 deletions.
3 changes: 1 addition & 2 deletions core/src/action.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use clap::{Parser, ValueEnum};
use std::{
fmt::{Debug, Display},
str::FromStr,
};

use clap::{Parser, ValueEnum};
use strum::{Display, EnumIter};
use thiserror::Error;

Expand Down
65 changes: 35 additions & 30 deletions core/src/builtin/ctl.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::{ops::ControlFlow, path::Path, time::Duration};

use crate::{config::yaml::TaskConfigYaml, task::ExitReason};
use anyhow::Result;
use nix::{sys::stat::Mode, unistd::mkfifo};
use smallvec::smallvec;
use smol::{fs::{create_dir_all, File}, io::{AsyncBufReadExt, BufReader}};
use tracing::{info, error};
use smol::{
fs::{create_dir_all, File},
io::{AsyncBufReadExt, BufReader},
};
use std::{ops::ControlFlow, path::Path, time::Duration};
use tracing::{error, info};

use crate::{builtin_fn, task::{ContextMap, TaskContext, TaskState}};
use crate::{
builtin_fn,
task::{ContextMap, TaskContext, TaskState},
};

use super::IntoConfig;

Expand All @@ -30,7 +35,6 @@ async fn create_ctl(_: &TaskContext, _context: ContextMap<'static>) -> Result<()
Ok(())
}


builtin_fn!(WaitForCommands: wait_for_commands);

impl IntoConfig for WaitForCommands {
Expand All @@ -44,37 +48,38 @@ impl IntoConfig for WaitForCommands {
}
}


async fn wait_for_commands(context: &TaskContext, context_map: ContextMap<'static>) -> Result<()> {
let mut buf = String::new();
loop {
if context.state().await == TaskState::Terminating {
context
.update_state(TaskState::Concluded(ExitReason::Terminated))
.await;
break Ok(());
};
let mut pipe = match create_pipe().await {
Ok(x) => x,
Err(error) => {
error!("Could not create pipe: {error}");
smol::Timer::after(Duration::from_secs(10)).await;
continue;
}
};
loop {
if context.state().await == TaskState::Terminating {
context.update_state(TaskState::Concluded(ExitReason::Terminated)).await;
break Ok(());
};
let mut pipe = match create_pipe().await {
Ok(x) => x,
Err(error) => {
error!("Could not create pipe: {error}");
smol::Timer::after(Duration::from_secs(10)).await;
continue;
}
};
loop {
match pipe.read_line(&mut buf).await {
Ok(bytes) if bytes > 0 => {
let action = buf.trim();
info!(action);
if let Err(error) = crate::perform_action::perform(action, context_map).await {
error!(%error);
}
match pipe.read_line(&mut buf).await {
Ok(bytes) if bytes > 0 => {
let action = buf.trim();
info!(action);
if let Err(error) = crate::perform_action::perform(action, context_map).await {
error!(%error);
}
_ => break,
}

buf.clear();
_ => break,
}

buf.clear();
}
}
}

async fn create_pipe() -> Result<BufReader<File>> {
Expand Down
16 changes: 7 additions & 9 deletions core/src/builtin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use crate::task::{ContextMap, ExitReason, TaskContext};
use crate::{
config::{payload::Runnable, yaml::TaskConfigYaml},
task::TaskState,
};
use async_trait::async_trait;
use futures::{ready, Future};
use std::{
ops::ControlFlow,
pin::{pin, Pin},
task::Poll,
};

use crate::task::{ContextMap, ExitReason, TaskContext};
use async_trait::async_trait;
use futures::{ready, Future};
use tracing::{debug, info};

use crate::{
config::{payload::Runnable, yaml::TaskConfigYaml},
task::TaskState,
};

pub mod ctl;

pub trait IntoConfig {
Expand Down
35 changes: 17 additions & 18 deletions core/src/command_line/complex.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use crate::{
config::payload::Runnable,
task::{ContextMap, ExitReason, TaskContext, TaskState},
};
use lazy_static::lazy_static;
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
use smol::process::Command;
use std::{
env,
ops::{ControlFlow, Deref, DerefMut},
process::{ExitStatus, Stdio},
slice::Iter,
str::FromStr,
};

use lazy_static::lazy_static;
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
use smol::process::Command;
use thiserror::Error;
use tracing::{debug, error, info};

use crate::{
config::payload::Runnable,
task::{ContextMap, ExitReason, TaskContext, TaskState},
};

#[derive(Debug, Serialize, Deserialize)]
pub struct CommandLine {
ignore_env: bool,
Expand Down Expand Up @@ -69,15 +67,15 @@ impl CommandLine {

async fn run_line(&self, context: &TaskContext) -> ControlFlow<TaskState> {
// let mut context = context.write().await;

debug!(cmd = ?self.args, "Running");
let mut child = match self.spawn() {
Ok(c) => c,
Err(CommandLineError::EmptyCommand) => return ControlFlow::Continue(()),
Err(e) => {
error!(%e);
return ControlFlow::Break(TaskState::Concluded(ExitReason::Failed));
}
Ok(c) => c,
Err(CommandLineError::EmptyCommand) => return ControlFlow::Continue(()),
Err(e) => {
error!(%e);
return ControlFlow::Break(TaskState::Concluded(ExitReason::Failed));
}
};

(*context.child.write().await) = Some(child.id() as i32);
Expand All @@ -100,7 +98,8 @@ impl CommandLine {
impl Runnable for CommandLine {
async fn run<'a>(
&'a self,
context: &'a TaskContext, _context_map: ContextMap<'static>
context: &'a TaskContext,
_context_map: ContextMap<'static>,
) -> ControlFlow<TaskState> {
self.run_line(context).await
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/command_line/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ pub use complex::*;
#[cfg(not(feature = "complex_commands"))]
mod simple;
#[cfg(not(feature = "complex_commands"))]
pub use simple::*;
pub use simple::*;
13 changes: 5 additions & 8 deletions core/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
pub mod payload;
pub mod yaml;

use self::{payload::Payload, yaml::TaskConfigYaml};
use crate::{
ordering::{construct_markers, resolve_before, sort},
validate,
};
use serde::{Deserialize, Serialize};
use smol::stream::StreamExt;
use std::{
Expand All @@ -10,15 +14,8 @@ use std::{
path::Path,
};
use tracing::{debug, info_span};

use crate::{
ordering::{construct_markers, resolve_before, sort},
validate,
};
use tracing::{error, instrument};

use self::{payload::Payload, yaml::TaskConfigYaml};

#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub enum Respawn {
/// Never retry this task (default)
Expand Down
32 changes: 20 additions & 12 deletions core/src/config/payload.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::{fmt::Debug, ops::ControlFlow, str::FromStr};

use crate::{
builtin::BuiltInService,
command_line::CommandLines,
task::{ContextMap, ExitReason, TaskContext, TaskState},
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::{command_line::CommandLines, task::{ContextMap, ExitReason, TaskContext, TaskState}, builtin::BuiltInService};

use std::{fmt::Debug, ops::ControlFlow, str::FromStr};

#[async_trait::async_trait]
pub trait Runnable {
async fn run<'a>(&'a self, context: &'a TaskContext, context_map: ContextMap<'static>) -> ControlFlow<TaskState>;
async fn run<'a>(
&'a self,
context: &'a TaskContext,
context_map: ContextMap<'static>,
) -> ControlFlow<TaskState>;
}

#[derive(Serialize, Deserialize)]
Expand All @@ -19,19 +24,22 @@ pub enum Payload<T = CommandLines> {
}

impl Payload {

pub async fn run(&self, x: usize, context: &TaskContext, context_map: ContextMap<'static>) -> ControlFlow<TaskState> {

pub async fn run(
&self,
x: usize,
context: &TaskContext,
context_map: ContextMap<'static>,
) -> ControlFlow<TaskState> {
match self {
Payload::Service(command_lines) => match command_lines.get(x) {
Some(command_line) => command_line.run(context, context_map).await,
None => ControlFlow::Break(TaskState::Concluded(ExitReason::Done))
None => ControlFlow::Break(TaskState::Concluded(ExitReason::Done)),
},
Payload::Builtin(runnable) if x == 0 => runnable.run(context, context_map).await,
_ => ControlFlow::Break(TaskState::Concluded(ExitReason::Done)),
}
}

pub(crate) fn is_marker(&self) -> bool {
matches!(self, Self::Marker)
}
Expand Down Expand Up @@ -59,4 +67,4 @@ impl<T: FromStr + DeserializeOwned> FromStr for Payload<T> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self::Service(T::from_str(s)?))
}
}
}
29 changes: 12 additions & 17 deletions core/src/config/yaml.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@

use std::fmt::Debug;

use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize};
use smallvec::SmallVec;


use super::payload::Payload;
use crate::{
builtin::BuiltInService, command_line, config::{Respawn, TaskConfig}
builtin::BuiltInService,
command_line,
config::{Respawn, TaskConfig},
};

use super::payload::Payload;

use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize};
use smallvec::SmallVec;
use std::fmt::Debug;

#[derive(Serialize, Deserialize)]
#[serde(untagged)]
Expand All @@ -19,7 +15,7 @@ pub enum PayloadYaml {
Service(String),
#[serde(skip)]
Builtin(BuiltInService),
Marker
Marker,
}

impl Default for PayloadYaml {
Expand All @@ -33,12 +29,11 @@ impl Debug for PayloadYaml {
match self {
Self::Service(arg0) => f.debug_tuple("Service").field(arg0).finish(),
Self::Builtin(_) => f.write_str("<builtin>"),
Self::Marker => f.write_str("<marker>")
Self::Marker => f.write_str("<marker>"),
}
}
}


#[derive(Debug, Deserialize, Serialize, Eq, Clone, Hash, PartialEq)]
#[serde(untagged)]
pub enum RespawnYaml {
Expand Down Expand Up @@ -86,7 +81,7 @@ pub struct TaskConfigYaml {
pub group: Option<String>,
#[serde(default)]
#[serde(deserialize_with = "OneOrMany::read")]
pub provides: Vec<String>
pub provides: Vec<String>,
}

impl TaskConfigYaml {
Expand All @@ -108,7 +103,7 @@ impl TaskConfigYaml {
payload: match self.cmd {
PayloadYaml::Service(x) => x.parse()?,
PayloadYaml::Builtin(builtin) => Payload::Builtin(builtin),
PayloadYaml::Marker => Payload::Marker
PayloadYaml::Marker => Payload::Marker,
},
with: self.with,
after: self.after.into_vec(),
Expand Down Expand Up @@ -170,7 +165,7 @@ mod test {
r#"
name: bar
cmd: [echo, "hello from inside bar"]
_after:
_after:
- foo
- bar
"#,
Expand Down
13 changes: 5 additions & 8 deletions core/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use crate::{config::read_config, task::TaskContext};
use crate::config::yaml::TaskConfigYaml;

use crate::task::ContextMap;
use crate::{config::read_config, task::TaskContext};
use anyhow::Result;
use futures::StreamExt;
use nix::
libc::{SIGABRT, SIGHUP, SIGPIPE, SIGTERM, SIGTSTP}
;
use nix::libc::{SIGABRT, SIGHUP, SIGPIPE, SIGTERM, SIGTSTP};
use signal_hook::iterator::exfiltrator::WithOrigin;
use signal_hook_async_std::SignalsInfo;
use std::env;
use tracing::info;

use crate::task::ContextMap;

const SIGS: &[i32] = &[SIGABRT, SIGTERM, SIGHUP, SIGPIPE, SIGTSTP];

pub struct Alfad {
Expand Down Expand Up @@ -40,7 +36,8 @@ impl Alfad {
.collect(),
)));
info!("Done parsing ({} tasks)", context.0.len());
context.0
context
.0
.values()
.for_each(|config| crate::task::spawn(config, context));
// smol::block_on(async { wait_for_commands(context).await });
Expand Down
Loading

0 comments on commit 3f6fa8c

Please sign in to comment.