diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 5c95be7f..2bec1ae6 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -85,15 +85,15 @@ def setup(): Check and apply backend setup changes for flows, including the internal and target storage (to export). """ - status_check = sync_setup() - click.echo(status_check) - if status_check.is_up_to_date(): + setup_status = sync_setup() + click.echo(setup_status) + if setup_status.is_up_to_date(): click.echo("No changes need to be pushed.") return if not click.confirm( "Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False): return - apply_setup_changes(status_check) + apply_setup_changes(setup_status) @cli.command() @click.argument("flow_name", type=str, nargs=-1) @@ -112,15 +112,15 @@ def drop(flow_name: tuple[str, ...], drop_all: bool): flow_names = [fl.name for fl in flow.flows()] else: flow_names = list(flow_name) - status_check = drop_setup(flow_names) - click.echo(status_check) - if status_check.is_up_to_date(): + setup_status = drop_setup(flow_names) + click.echo(setup_status) + if setup_status.is_up_to_date(): click.echo("No flows need to be dropped.") return if not click.confirm( "Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False): return - apply_setup_changes(status_check) + apply_setup_changes(setup_status) @cli.command() @click.argument("flow_name", type=str, required=False) diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index 529751a8..0e0d20c1 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -1,16 +1,16 @@ from . import flow from . import _engine -def sync_setup() -> _engine.SetupStatusCheck: +def sync_setup() -> _engine.SetupStatus: flow.ensure_all_flows_built() return _engine.sync_setup() -def drop_setup(flow_names: list[str]) -> _engine.SetupStatusCheck: +def drop_setup(flow_names: list[str]) -> _engine.SetupStatus: flow.ensure_all_flows_built() return _engine.drop_setup(flow_names) def flow_names_with_setup() -> list[str]: return _engine.flow_names_with_setup() -def apply_setup_changes(status_check: _engine.SetupStatusCheck): - _engine.apply_setup_changes(status_check) +def apply_setup_changes(setup_status: _engine.SetupStatus): + _engine.apply_setup_changes(setup_status) diff --git a/src/builder/analyzed_flow.rs b/src/builder/analyzed_flow.rs index b08122e2..7435c5ba 100644 --- a/src/builder/analyzed_flow.rs +++ b/src/builder/analyzed_flow.rs @@ -4,7 +4,7 @@ use super::{analyzer, plan}; use crate::{ ops::registry::ExecutorFactoryRegistry, service::error::{shared_ok, SharedError, SharedResultExt}, - setup::{self, ObjectSetupStatusCheck}, + setup::{self, ObjectSetupStatus}, }; pub struct AnalyzedFlow { @@ -29,9 +29,9 @@ impl AnalyzedFlow { existing_flow_ss, registry, )?; - let setup_status_check = + let setup_status = setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss).await?; - let execution_plan = if setup_status_check.is_up_to_date() { + let execution_plan = if setup_status.is_up_to_date() { Some( async move { shared_ok(Arc::new( diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index 40399922..53fc054f 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -1,6 +1,6 @@ use crate::prelude::*; -use crate::setup::{CombinedState, ResourceSetupInfo, ResourceSetupStatusCheck, SetupChangeType}; +use crate::setup::{CombinedState, ResourceSetupInfo, ResourceSetupStatus, SetupChangeType}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -53,7 +53,7 @@ pub struct TrackingTableSetupState { } #[derive(Debug)] -pub struct TrackingTableSetupStatusCheck { +pub struct TrackingTableSetupStatus { pub desired_state: Option, pub legacy_table_names: Vec, @@ -62,7 +62,7 @@ pub struct TrackingTableSetupStatusCheck { pub source_ids_to_delete: Vec, } -impl TrackingTableSetupStatusCheck { +impl TrackingTableSetupStatus { pub fn new( desired: Option<&TrackingTableSetupState>, existing: &CombinedState, @@ -91,19 +91,19 @@ impl TrackingTableSetupStatusCheck { pub fn into_setup_info( self, - ) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatusCheck> { + ) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatus> { ResourceSetupInfo { key: (), state: self.desired_state.clone(), description: "Tracking Table".to_string(), - status_check: Some(self), + setup_status: Some(self), legacy_key: None, } } } #[async_trait] -impl ResourceSetupStatusCheck for TrackingTableSetupStatusCheck { +impl ResourceSetupStatus for TrackingTableSetupStatus { fn describe_changes(&self) -> Vec { let mut changes: Vec = vec![]; if self.desired_state.is_some() && !self.legacy_table_names.is_empty() { diff --git a/src/llm/anthropic.rs b/src/llm/anthropic.rs index 65cfcb49..1001f908 100644 --- a/src/llm/anthropic.rs +++ b/src/llm/anthropic.rs @@ -1,8 +1,11 @@ +use crate::llm::{ + LlmGenerateRequest, LlmGenerateResponse, LlmGenerationClient, LlmSpec, OutputFormat, + ToJsonSchemaOptions, +}; +use anyhow::{bail, Context, Result}; use async_trait::async_trait; -use crate::llm::{LlmGenerationClient, LlmSpec, LlmGenerateRequest, LlmGenerateResponse, ToJsonSchemaOptions, OutputFormat}; -use anyhow::{Result, bail, Context}; -use serde_json::Value; use json5; +use serde_json::Value; use crate::api_bail; use urlencoding::encode; @@ -64,7 +67,8 @@ impl LlmGenerationClient for Client { let encoded_api_key = encode(&self.api_key); - let resp = self.client + let resp = self + .client .post(url) .header("x-api-key", encoded_api_key.as_ref()) .header("anthropic-version", "2023-06-01") @@ -81,7 +85,7 @@ impl LlmGenerationClient for Client { // println!("Anthropic API full response: {resp_json:?}"); let resp_content = &resp_json["content"]; - let tool_name = "report_result"; + let tool_name = "report_result"; let mut extracted_json: Option = None; if let Some(array) = resp_content.as_array() { for item in array { @@ -116,14 +120,16 @@ impl LlmGenerationClient for Client { } } } - }, - _ => return Err(anyhow::anyhow!("No structured tool output or text found in response")), + } + _ => { + return Err(anyhow::anyhow!( + "No structured tool output or text found in response" + )) + } } }; - Ok(LlmGenerateResponse { - text, - }) + Ok(LlmGenerateResponse { text }) } fn json_schema_options(&self) -> ToJsonSchemaOptions { diff --git a/src/llm/gemini.rs b/src/llm/gemini.rs index f5e274ad..11c34ebb 100644 --- a/src/llm/gemini.rs +++ b/src/llm/gemini.rs @@ -1,8 +1,11 @@ +use crate::api_bail; +use crate::llm::{ + LlmGenerateRequest, LlmGenerateResponse, LlmGenerationClient, LlmSpec, OutputFormat, + ToJsonSchemaOptions, +}; +use anyhow::{bail, Context, Result}; use async_trait::async_trait; -use crate::llm::{LlmGenerationClient, LlmSpec, LlmGenerateRequest, LlmGenerateResponse, ToJsonSchemaOptions, OutputFormat}; -use anyhow::{Result, bail, Context}; use serde_json::Value; -use crate::api_bail; use urlencoding::encode; pub struct Client { @@ -76,10 +79,13 @@ impl LlmGenerationClient for Client { let api_key = &self.api_key; let url = format!( "https://generativelanguage.googleapis.com/v1beta/models/{}:generateContent?key={}", - encode(&self.model), encode(api_key) + encode(&self.model), + encode(api_key) ); - let resp = self.client.post(&url) + let resp = self + .client + .post(&url) .json(&payload) .send() .await @@ -107,4 +113,4 @@ impl LlmGenerationClient for Client { top_level_must_be_object: true, } } -} \ No newline at end of file +} diff --git a/src/llm/mod.rs b/src/llm/mod.rs index b91eed1c..5a2706aa 100644 --- a/src/llm/mod.rs +++ b/src/llm/mod.rs @@ -52,10 +52,10 @@ pub trait LlmGenerationClient: Send + Sync { fn json_schema_options(&self) -> ToJsonSchemaOptions; } +mod anthropic; +mod gemini; mod ollama; mod openai; -mod gemini; -mod anthropic; pub async fn new_llm_generation_client(spec: LlmSpec) -> Result> { let client = match spec.api_type { diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index b255f00f..36aba809 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -310,7 +310,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { desired_state: Option, existing_states: setup::CombinedState, auth_registry: &Arc, - ) -> Result; + ) -> Result; fn check_state_compatibility( &self, @@ -398,13 +398,13 @@ impl ExportTargetFactory for T { desired_state: Option, existing_states: setup::CombinedState, auth_registry: &Arc, - ) -> Result> { + ) -> Result> { let key: T::Key = serde_json::from_value(key.clone())?; let desired_state: Option = desired_state .map(|v| serde_json::from_value(v.clone())) .transpose()?; let existing_states = from_json_combined_state(existing_states)?; - let status_check = StorageFactoryBase::check_setup_status( + let setup_status = StorageFactoryBase::check_setup_status( self, key, desired_state, @@ -412,7 +412,7 @@ impl ExportTargetFactory for T { auth_registry, ) .await?; - Ok(Box::new(status_check)) + Ok(Box::new(setup_status)) } fn describe_resource(&self, key: &serde_json::Value) -> Result { diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 68e1f2bf..9cec7d7e 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -196,7 +196,7 @@ pub trait ExportTargetFactory: Send + Sync { desired_state: Option, existing_states: setup::CombinedState, auth_registry: &Arc, - ) -> Result>; + ) -> Result>; /// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed. /// This should always return the canonical serialized form. diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index 289f49b3..98bd5585 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -2,7 +2,7 @@ use crate::prelude::*; use super::spec::{GraphDeclaration, GraphElementMapping, NodeFromFieldsSpec, TargetFieldMapping}; use crate::setup::components::{self, State}; -use crate::setup::{ResourceSetupStatusCheck, SetupChangeType}; +use crate::setup::{ResourceSetupStatus, SetupChangeType}; use crate::{ops::sdk::*, setup::CombinedState}; use indoc::formatdoc; @@ -855,7 +855,7 @@ fn build_composite_field_names(qualifier: &str, field_names: &[String]) -> Strin } #[derive(Derivative)] #[derivative(Debug)] -struct SetupStatusCheck { +struct SetupStatus { key: GraphElement, #[derivative(Debug = "ignore")] graph_pool: Arc, @@ -864,7 +864,7 @@ struct SetupStatusCheck { change_type: SetupChangeType, } -impl SetupStatusCheck { +impl SetupStatus { fn new( key: GraphElement, graph_pool: Arc, @@ -908,7 +908,7 @@ impl SetupStatusCheck { } #[async_trait] -impl ResourceSetupStatusCheck for SetupStatusCheck { +impl ResourceSetupStatus for SetupStatus { fn describe_changes(&self) -> Vec { let mut result = vec![]; if let Some(data_clear) = &self.data_clear { @@ -1255,16 +1255,16 @@ impl StorageFactoryBase for Factory { desired: Option, existing: CombinedState, auth_registry: &Arc, - ) -> Result { + ) -> Result { let conn_spec = auth_registry.get::(&key.connection)?; - let base = SetupStatusCheck::new( + let base = SetupStatus::new( key, self.graph_pool.clone(), conn_spec.clone(), desired.as_ref(), &existing, ); - let comp = components::StatusCheck::create( + let comp = components::Status::create( SetupComponentOperator { graph_pool: self.graph_pool.clone(), conn_spec: conn_spec.clone(), @@ -1272,7 +1272,7 @@ impl StorageFactoryBase for Factory { desired, existing, )?; - Ok(components::combine_status_checks(base, comp)) + Ok(components::combine_setup_statuss(base, comp)) } fn check_state_compatibility( diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 7d0818cf..10b2af69 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -575,7 +575,7 @@ impl TableSetupAction { } #[derive(Debug)] -pub struct SetupStatusCheck { +pub struct SetupStatus { db_pool: PgPool, table_name: String, @@ -585,7 +585,7 @@ pub struct SetupStatusCheck { desired_table_setup: Option, } -impl SetupStatusCheck { +impl SetupStatus { fn new( db_pool: PgPool, table_name: String, @@ -739,7 +739,7 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String } #[async_trait] -impl setup::ResourceSetupStatusCheck for SetupStatusCheck { +impl setup::ResourceSetupStatus for SetupStatus { fn describe_changes(&self) -> Vec { let mut descriptions = vec![]; if self.drop_existing { @@ -976,8 +976,8 @@ impl StorageFactoryBase for Factory { desired: Option, existing: setup::CombinedState, auth_registry: &Arc, - ) -> Result { - Ok(SetupStatusCheck::new( + ) -> Result { + Ok(SetupStatus::new( get_db_pool(key.database.as_ref(), auth_registry).await?, key.table_name, desired, diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index da908095..cac6b8eb 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -390,7 +390,7 @@ impl StorageFactoryBase for Arc { _desired: Option<()>, _existing: setup::CombinedState<()>, _auth_registry: &Arc, - ) -> Result { + ) -> Result { Err(anyhow!("Set `setup_by_user` to `true` to export to Qdrant")) as Result } diff --git a/src/py/mod.rs b/src/py/mod.rs index c02001ca..280e855d 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -340,10 +340,10 @@ impl SimpleSemanticsQueryHandler { } #[pyclass] -pub struct SetupStatusCheck(setup::AllSetupStatusCheck); +pub struct SetupStatus(setup::AllSetupStatus); #[pymethods] -impl SetupStatusCheck { +impl SetupStatus { pub fn __str__(&self) -> String { format!("{}", &self.0) } @@ -358,7 +358,7 @@ impl SetupStatusCheck { } #[pyfunction] -fn sync_setup(py: Python<'_>) -> PyResult { +fn sync_setup(py: Python<'_>) -> PyResult { let lib_context = get_lib_context().into_py_result()?; let flows = lib_context.flows.lock().unwrap(); let all_setup_states = lib_context.all_setup_states.read().unwrap(); @@ -366,21 +366,21 @@ fn sync_setup(py: Python<'_>) -> PyResult { get_runtime() .block_on(async { let setup_status = setup::sync_setup(&flows, &all_setup_states).await?; - anyhow::Ok(SetupStatusCheck(setup_status)) + anyhow::Ok(SetupStatus(setup_status)) }) .into_py_result() }) } #[pyfunction] -fn drop_setup(py: Python<'_>, flow_names: Vec) -> PyResult { +fn drop_setup(py: Python<'_>, flow_names: Vec) -> PyResult { let lib_context = get_lib_context().into_py_result()?; let all_setup_states = lib_context.all_setup_states.read().unwrap(); py.allow_threads(|| { get_runtime() .block_on(async { let setup_status = setup::drop_setup(flow_names, &all_setup_states).await?; - anyhow::Ok(SetupStatusCheck(setup_status)) + anyhow::Ok(SetupStatus(setup_status)) }) .into_py_result() }) @@ -395,7 +395,7 @@ fn flow_names_with_setup() -> PyResult> { } #[pyfunction] -fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatusCheck) -> PyResult<()> { +fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatus) -> PyResult<()> { py.allow_threads(|| { get_runtime() .block_on(async { @@ -442,7 +442,7 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; Ok(()) diff --git a/src/setup/components.rs b/src/setup/components.rs index 5cd817cf..7036ce30 100644 --- a/src/setup/components.rs +++ b/src/setup/components.rs @@ -1,4 +1,4 @@ -use super::{CombinedState, ResourceSetupStatusCheck, SetupChangeType, StateChange}; +use super::{CombinedState, ResourceSetupStatus, SetupChangeType, StateChange}; use crate::prelude::*; use std::fmt::Debug; @@ -36,14 +36,14 @@ struct CompositeStateUpsert { #[derive(Derivative)] #[derivative(Debug)] -pub struct StatusCheck { +pub struct Status { #[derivative(Debug = "ignore")] desc: D, keys_to_delete: IndexSet, states_to_upsert: Vec>, } -impl StatusCheck { +impl Status { pub fn create( desc: D, desired: Option, @@ -109,7 +109,7 @@ impl StatusCheck { } #[async_trait] -impl ResourceSetupStatusCheck for StatusCheck { +impl ResourceSetupStatus for Status { fn describe_changes(&self) -> Vec { let mut result = vec![]; @@ -164,15 +164,13 @@ impl ResourceSetupStatusCheck for StatusCheck { } #[derive(Debug)] -struct CombinedStatusCheck { +struct CombinedStatus { a: A, b: B, } #[async_trait] -impl ResourceSetupStatusCheck - for CombinedStatusCheck -{ +impl ResourceSetupStatus for CombinedStatus { fn describe_changes(&self) -> Vec { let mut result = vec![]; result.extend(self.a.describe_changes()); @@ -196,9 +194,9 @@ impl ResourceSetupStat } } -pub fn combine_status_checks( +pub fn combine_setup_statuss( a: A, b: B, -) -> impl ResourceSetupStatusCheck { - CombinedStatusCheck { a, b } +) -> impl ResourceSetupStatus { + CombinedStatus { a, b } } diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index 5e40ba9c..39e4976f 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -1,6 +1,6 @@ use crate::prelude::*; -use super::{ResourceSetupInfo, ResourceSetupStatusCheck, SetupChangeType, StateChange}; +use super::{ResourceSetupInfo, ResourceSetupStatus, SetupChangeType, StateChange}; use crate::utils::db::WriteAction; use axum::http::StatusCode; use sqlx::PgPool; @@ -324,14 +324,14 @@ impl MetadataTableSetup { key: (), state: None, description: "CocoIndex Metadata Table".to_string(), - status_check: Some(self), + setup_status: Some(self), legacy_key: None, } } } #[async_trait] -impl ResourceSetupStatusCheck for MetadataTableSetup { +impl ResourceSetupStatus for MetadataTableSetup { fn describe_changes(&self) -> Vec { if self.metadata_table_missing { vec![format!( diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 68a154c0..b26cbf2f 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -7,11 +7,11 @@ use std::{ }; use super::{ - db_metadata, CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatusCheck, - ObjectSetupStatusCheck, ObjectStatus, ResourceIdentifier, ResourceSetupInfo, - ResourceSetupStatusCheck, SetupChangeType, StateChange, TargetSetupState, + db_metadata, CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatus, + ObjectSetupStatus, ObjectStatus, ResourceIdentifier, ResourceSetupInfo, + ResourceSetupStatus, SetupChangeType, StateChange, TargetSetupState, }; -use super::{AllSetupState, AllSetupStatusCheck}; +use super::{AllSetupState, AllSetupStatus}; use crate::execution::db_tracking_setup; use crate::{ lib_context::FlowContext, @@ -243,7 +243,7 @@ fn group_resource_states<'a>( pub async fn check_flow_setup_status( desired_state: Option<&FlowSetupState>, existing_state: Option<&FlowSetupState>, -) -> Result { +) -> Result { let metadata_change = diff_state( existing_state.map(|e| &e.metadata), desired_state.map(|d| &d.metadata), @@ -254,7 +254,7 @@ pub async fn check_flow_setup_status( .iter() .flat_map(|d| d.metadata.sources.values().map(|v| v.source_id)) .collect::>(); - let tracking_table_change = db_tracking_setup::TrackingTableSetupStatusCheck::new( + let tracking_table_change = db_tracking_setup::TrackingTableSetupStatus::new( desired_state.map(|d| &d.tracking_table), &existing_state .map(|e| Cow::Borrowed(&e.tracking_table)) @@ -318,7 +318,7 @@ pub async fn check_flow_setup_status( let never_setup_by_sys = target_state.is_none() && existing_without_setup_by_user.current.is_none() && existing_without_setup_by_user.staging.is_empty(); - let status_check = if never_setup_by_sys { + let setup_status = if never_setup_by_sys { None } else { Some( @@ -336,7 +336,7 @@ pub async fn check_flow_setup_status( key: resource_id.clone(), state, description: factory.describe_resource(&resource_id.key)?, - status_check, + setup_status, legacy_key: v .existing .legacy_state_key @@ -346,7 +346,7 @@ pub async fn check_flow_setup_status( }), }); } - Ok(FlowSetupStatusCheck { + Ok(FlowSetupStatus { status: to_object_status(existing_state, desired_state)?, seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version), metadata_change, @@ -359,61 +359,61 @@ pub async fn check_flow_setup_status( pub async fn sync_setup( flows: &BTreeMap>, all_setup_state: &AllSetupState, -) -> Result { - let mut flow_status_checks = BTreeMap::new(); +) -> Result { + let mut flow_setup_statuss = BTreeMap::new(); for (flow_name, flow_context) in flows { let existing_state = all_setup_state.flows.get(flow_name); - flow_status_checks.insert( + flow_setup_statuss.insert( flow_name.clone(), check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state).await?, ); } - Ok(AllSetupStatusCheck { + Ok(AllSetupStatus { metadata_table: db_metadata::MetadataTableSetup { metadata_table_missing: !all_setup_state.has_metadata_table, } .into_setup_info(), - flows: flow_status_checks, + flows: flow_setup_statuss, }) } pub async fn drop_setup( flow_names: impl IntoIterator, all_setup_state: &AllSetupState, -) -> Result { +) -> Result { if !all_setup_state.has_metadata_table { api_bail!("CocoIndex metadata table is missing."); } - let mut flow_status_checks = BTreeMap::new(); + let mut flow_setup_statuss = BTreeMap::new(); for flow_name in flow_names.into_iter() { if let Some(existing_state) = all_setup_state.flows.get(&flow_name) { - flow_status_checks.insert( + flow_setup_statuss.insert( flow_name, check_flow_setup_status(None, Some(existing_state)).await?, ); } } - Ok(AllSetupStatusCheck { + Ok(AllSetupStatus { metadata_table: db_metadata::MetadataTableSetup { metadata_table_missing: false, } .into_setup_info(), - flows: flow_status_checks, + flows: flow_setup_statuss, }) } -async fn maybe_update_resource_setup( +async fn maybe_update_resource_setup( write: &mut impl std::io::Write, resource: &ResourceSetupInfo, ) -> Result<()> { - if let Some(status_check) = &resource.status_check { - if status_check.change_type() != SetupChangeType::NoChange { + if let Some(setup_status) = &resource.setup_status { + if setup_status.change_type() != SetupChangeType::NoChange { writeln!(write, "{}:", resource.description)?; - for change in status_check.describe_changes() { + for change in setup_status.describe_changes() { writeln!(write, " - {}", change)?; } write!(write, "Pushing...")?; - status_check.apply_change().await?; + setup_status.apply_change().await?; writeln!(write, "DONE")?; } } @@ -422,12 +422,12 @@ async fn maybe_update_resource_setup( pub async fn apply_changes( write: &mut impl std::io::Write, - status_check: &AllSetupStatusCheck, + setup_status: &AllSetupStatus, pool: &PgPool, ) -> Result<()> { - maybe_update_resource_setup(write, &status_check.metadata_table).await?; + maybe_update_resource_setup(write, &setup_status.metadata_table).await?; - for (flow_name, flow_status) in &status_check.flows { + for (flow_name, flow_status) in &setup_status.flows { if flow_status.is_up_to_date() { continue; } @@ -453,7 +453,7 @@ pub async fn apply_changes( } if let Some(tracking_table) = &flow_status.tracking_table { if tracking_table - .status_check + .setup_status .as_ref() .map(|c| c.change_type() != SetupChangeType::NoChange) .unwrap_or_default() diff --git a/src/setup/states.rs b/src/setup/states.rs index 8924ea69..ad508d35 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -21,7 +21,7 @@ use std::hash::Hash; use super::db_metadata; use crate::execution::db_tracking_setup::{ - self, TrackingTableSetupState, TrackingTableSetupStatusCheck, + self, TrackingTableSetupState, TrackingTableSetupStatus, }; const INDENT: &str = " "; @@ -219,7 +219,7 @@ pub enum SetupChangeType { } #[async_trait] -pub trait ResourceSetupStatusCheck: Send + Sync + Debug { +pub trait ResourceSetupStatus: Send + Sync + Debug { fn describe_changes(&self) -> Vec; fn change_type(&self) -> SetupChangeType; @@ -228,7 +228,7 @@ pub trait ResourceSetupStatusCheck: Send + Sync + Debug { } #[async_trait] -impl ResourceSetupStatusCheck for Box { +impl ResourceSetupStatus for Box { fn describe_changes(&self) -> Vec { self.as_ref().describe_changes() } @@ -243,7 +243,7 @@ impl ResourceSetupStatusCheck for Box { } #[async_trait] -impl ResourceSetupStatusCheck for std::convert::Infallible { +impl ResourceSetupStatus for std::convert::Infallible { fn describe_changes(&self) -> Vec { unreachable!() } @@ -258,20 +258,20 @@ impl ResourceSetupStatusCheck for std::convert::Infallible { } #[derive(Debug)] -pub struct ResourceSetupInfo { +pub struct ResourceSetupInfo { pub key: K, pub state: Option, pub description: String, /// If `None`, the resource is managed by users. - pub status_check: Option, + pub setup_status: Option, pub legacy_key: Option, } -impl std::fmt::Display for ResourceSetupInfo { +impl std::fmt::Display for ResourceSetupInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let status_code = match self.status_check.as_ref().map(|c| c.change_type()) { + let status_code = match self.setup_status.as_ref().map(|c| c.change_type()) { Some(SetupChangeType::NoChange) => "READY", Some(SetupChangeType::Create) => "TO CREATE", Some(SetupChangeType::Update) => "TO UPDATE", @@ -283,8 +283,8 @@ impl std::fmt::Display for ResourceSetupInfo< let status_full = status_str.color(AnsiColors::Cyan); let desc_colored = &self.description; writeln!(f, "{} {}", status_full, desc_colored)?; - if let Some(status_check) = &self.status_check { - let changes = status_check.describe_changes(); + if let Some(setup_status) = &self.setup_status { + let changes = setup_status.describe_changes(); if !changes.is_empty() { let mut f = indented(f).with_str(INDENT); writeln!(f, "{}", "TODO:".color(AnsiColors::BrightBlack))?; @@ -298,9 +298,9 @@ impl std::fmt::Display for ResourceSetupInfo< } } -impl ResourceSetupInfo { +impl ResourceSetupInfo { pub fn is_up_to_date(&self) -> bool { - self.status_check + self.setup_status .as_ref() .is_none_or(|c| c.change_type() == SetupChangeType::NoChange) } @@ -314,28 +314,27 @@ pub enum ObjectStatus { Deleted, } -pub trait ObjectSetupStatusCheck { +pub trait ObjectSetupStatus { fn status(&self) -> ObjectStatus; fn is_up_to_date(&self) -> bool; } #[derive(Debug)] -pub struct FlowSetupStatusCheck { +pub struct FlowSetupStatus { pub status: ObjectStatus, pub seen_flow_metadata_version: Option, pub metadata_change: Option>, pub tracking_table: - Option>, - pub target_resources: Vec< - ResourceSetupInfo>, - >, + Option>, + pub target_resources: + Vec>>, pub unknown_resources: Vec, } -impl ObjectSetupStatusCheck for FlowSetupStatusCheck { +impl ObjectSetupStatus for FlowSetupStatus { fn status(&self) -> ObjectStatus { self.status } @@ -354,23 +353,21 @@ impl ObjectSetupStatusCheck for FlowSetupStatusCheck { } #[derive(Debug)] -pub struct AllSetupStatusCheck { +pub struct AllSetupStatus { pub metadata_table: ResourceSetupInfo<(), (), db_metadata::MetadataTableSetup>, - pub flows: BTreeMap, + pub flows: BTreeMap, } -impl AllSetupStatusCheck { +impl AllSetupStatus { pub fn is_up_to_date(&self) -> bool { self.metadata_table.is_up_to_date() && self.flows.iter().all(|(_, flow)| flow.is_up_to_date()) } } -pub struct ObjectSetupStatusCode<'a, StatusCheck: ObjectSetupStatusCheck>(&'a StatusCheck); -impl std::fmt::Display - for ObjectSetupStatusCode<'_, StatusCheck> -{ +pub struct ObjectSetupStatusCode<'a, Status: ObjectSetupStatus>(&'a Status); +impl std::fmt::Display for ObjectSetupStatusCode<'_, Status> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -390,9 +387,9 @@ impl std::fmt::Display } } -pub struct FormattedFlowSetupStatusCheck<'a>(&'a str, &'a FlowSetupStatusCheck); +pub struct FormattedFlowSetupStatus<'a>(&'a str, &'a FlowSetupStatus); -impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> { +impl std::fmt::Display for FormattedFlowSetupStatus<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let flow_ssc = self.1; @@ -420,15 +417,11 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> { } } -impl std::fmt::Display for AllSetupStatusCheck { +impl std::fmt::Display for AllSetupStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "{}", self.metadata_table)?; for (flow_name, flow_status) in &self.flows { - writeln!( - f, - "{}", - FormattedFlowSetupStatusCheck(flow_name, flow_status) - )?; + writeln!(f, "{}", FormattedFlowSetupStatus(flow_name, flow_status))?; } Ok(()) }