diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 5c95be7f..a119bc8c 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -55,16 +55,17 @@ def ls(show_all: bool): @cli.command() @click.argument("flow_name", type=str, required=False) -@click.option("--color/--no-color", default=True) -def show(flow_name: str | None, color: bool): +@click.option("--color/--no-color", default=True, help="Enable or disable colored output.") +@click.option("--verbose", is_flag=True, help="Show verbose output with full details.") +def show(flow_name: str | None, color: bool, verbose: bool): """ - Show the flow spec in a readable format with colored output, - including the schema. + Show the flow spec and schema in a readable format with colored output. """ flow = _flow_by_name(flow_name) console = Console(no_color=not color) - console.print(flow._render_text()) + console.print(flow._render_spec(verbose=verbose)) + console.print() table = Table( title=f"Schema for Flow: {flow.name}", show_header=True, @@ -74,7 +75,7 @@ def show(flow_name: str | None, color: bool): table.add_column("Type", style="green") table.add_column("Attributes", style="yellow") - for field_name, field_type, attr_str in flow._render_schema(): + for field_name, field_type, attr_str in flow._get_schema(): table.add_row(field_name, field_type, attr_str) console.print(table) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index cbc095ea..20c6d285 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -15,7 +15,7 @@ from enum import Enum from dataclasses import dataclass from rich.text import Text -from rich.console import Console +from rich.tree import Tree from . import _engine from . import index @@ -454,61 +454,33 @@ def _lazy_engine_flow() -> _engine.Flow: return engine_flow self._lazy_engine_flow = _lazy_engine_flow - def _format_flow(self, flow_dict: dict) -> Text: - output = Text() + def _render_spec(self, verbose: bool = False) -> Tree: + """ + Render the flow spec as a styled rich Tree with hierarchical structure. + """ + spec = self._get_spec(verbose=verbose) + tree = Tree(f"Flow: {self.name}", style="cyan") - def add_line(content, indent=0, style=None, end="\n"): - output.append(" " * indent) - output.append(content, style=style) - output.append(end) + def build_tree(label: str, lines: list): + node = Tree(label, style="bold magenta" if lines else "cyan") + for line in lines: + child_node = node.add(Text(line.content, style="yellow")) + child_node.children = build_tree("", line.children).children + return node - def format_key_value(key, value, indent): - if isinstance(value, (dict, list)): - add_line(f"- {key}:", indent, style="green") - format_data(value, indent + 2) - else: - add_line(f"- {key}:", indent, style="green", end="") - add_line(f" {value}", style="yellow") - - def format_data(data, indent=0): - if isinstance(data, dict): - for key, value in data.items(): - format_key_value(key, value, indent) - elif isinstance(data, list): - for i, item in enumerate(data): - format_key_value(f"[{i}]", item, indent) - else: - add_line(str(data), indent, style="yellow") - - # Header - flow_name = flow_dict.get("name", "Unnamed") - add_line(f"Flow: {flow_name}", style="bold cyan") - - # Section - for section_title, section_key in [ - ("Sources:", "import_ops"), - ("Processing:", "reactive_ops"), - ("Targets:", "export_ops"), - ]: - add_line("") - add_line(section_title, style="bold cyan") - format_data(flow_dict.get(section_key, []), indent=0) - - return output - - def _render_text(self) -> Text: - flow_spec_str = str(self._lazy_engine_flow()) - try: - flow_dict = json.loads(flow_spec_str) - return self._format_flow(flow_dict) - except json.JSONDecodeError: - return Text(flow_spec_str) + for section, lines in spec.sections: + section_node = build_tree(f"{section}:", lines) + tree.children.append(section_node) + return tree + + def _get_spec(self, verbose: bool = False) -> list[tuple[str, str, int]]: + return self._lazy_engine_flow().get_spec(output_mode="verbose" if verbose else "concise") - def _render_schema(self) -> list[tuple[str, str, str]]: + def _get_schema(self) -> list[tuple[str, str, str]]: return self._lazy_engine_flow().get_schema() def __str__(self): - return str(self._render_text()) + return str(self._get_spec()) def __repr__(self): return repr(self._lazy_engine_flow()) diff --git a/src/base/spec.rs b/src/base/spec.rs index 0b61f0b2..6a426908 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -1,8 +1,23 @@ use crate::prelude::*; use super::schema::{EnrichedValueType, FieldSchema}; +use serde::{Deserialize, Serialize}; +use std::fmt; use std::ops::Deref; +/// OutputMode enum for displaying spec info in different granularity +#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum OutputMode { + Concise, + Verbose, +} + +/// Formatting spec per output mode +pub trait SpecFormatter { + fn format(&self, mode: OutputMode) -> String; +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind")] pub enum SpecString { @@ -34,8 +49,8 @@ impl Deref for FieldPath { } } -impl std::fmt::Display for FieldPath { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for FieldPath { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if self.is_empty() { write!(f, "*") } else { @@ -49,8 +64,8 @@ impl std::fmt::Display for FieldPath { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] pub struct OpArgName(pub Option); -impl std::fmt::Display for OpArgName { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for OpArgName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if let Some(arg_name) = &self.0 { write!(f, "${}", arg_name) } else { @@ -73,6 +88,12 @@ pub struct NamedSpec { pub spec: T, } +impl fmt::Display for NamedSpec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}: {}", self.name, self.spec) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FieldMapping { /// If unspecified, means the current scope. @@ -83,12 +104,35 @@ pub struct FieldMapping { pub field_path: FieldPath, } +impl fmt::Display for FieldMapping { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let scope = self.scope.as_deref().unwrap_or(""); + write!( + f, + "{}{}", + if scope.is_empty() { + "".to_string() + } else { + format!("{}.", scope) + }, + self.field_path + ) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ConstantMapping { pub schema: EnrichedValueType, pub value: serde_json::Value, } +impl fmt::Display for ConstantMapping { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let value = serde_json::to_string(&self.value).unwrap_or("#serde_error".to_string()); + write!(f, "{}", value) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CollectionMapping { pub field: FieldMapping, @@ -100,6 +144,18 @@ pub struct StructMapping { pub fields: Vec>, } +impl fmt::Display for StructMapping { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let fields = self + .fields + .iter() + .map(|field| field.name.clone()) + .collect::>() + .join(","); + write!(f, "{}", fields) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind")] pub enum ValueMapping { @@ -122,7 +178,7 @@ impl ValueMapping { } impl std::fmt::Display for ValueMapping { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result { match self { ValueMapping::Constant(v) => write!( f, @@ -155,6 +211,16 @@ pub struct OpArgBinding { pub value: ValueMapping, } +impl fmt::Display for OpArgBinding { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.arg_name.is_unnamed() { + write!(f, "{}", self.value) + } else { + write!(f, "{}={}", self.arg_name, self.value) + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OpSpec { pub kind: String, @@ -162,11 +228,46 @@ pub struct OpSpec { pub spec: serde_json::Map, } +impl SpecFormatter for OpSpec { + fn format(&self, mode: OutputMode) -> String { + match mode { + OutputMode::Concise => self.kind.clone(), + OutputMode::Verbose => { + let spec_str = serde_json::to_string_pretty(&self.spec) + .map(|s| { + let lines: Vec<&str> = s.lines().collect(); + if lines.len() < s.lines().count() { + lines + .into_iter() + .chain(["..."]) + .collect::>() + .join("\n ") + } else { + lines.join("\n ") + } + }) + .unwrap_or("#serde_error".to_string()); + format!("{}({})", self.kind, spec_str) + } + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct SourceRefreshOptions { pub refresh_interval: Option, } +impl fmt::Display for SourceRefreshOptions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let refresh = self + .refresh_interval + .map(|d| format!("{:?}", d)) + .unwrap_or("none".to_string()); + write!(f, "{}", refresh) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ImportOpSpec { pub source: OpSpec, @@ -175,6 +276,19 @@ pub struct ImportOpSpec { pub refresh_options: SourceRefreshOptions, } +impl SpecFormatter for ImportOpSpec { + fn format(&self, mode: OutputMode) -> String { + let source = self.source.format(mode); + format!("source={}, refresh={}", source, self.refresh_options) + } +} + +impl fmt::Display for ImportOpSpec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.format(OutputMode::Concise)) + } +} + /// Transform data using a given operator. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TransformOpSpec { @@ -182,6 +296,22 @@ pub struct TransformOpSpec { pub op: OpSpec, } +impl SpecFormatter for TransformOpSpec { + fn format(&self, mode: OutputMode) -> String { + let inputs = self + .inputs + .iter() + .map(ToString::to_string) + .collect::>() + .join(","); + let op_str = self.op.format(mode); + match mode { + OutputMode::Concise => format!("op={}, inputs={}", op_str, inputs), + OutputMode::Verbose => format!("op={}, inputs=[{}]", op_str, inputs), + } + } +} + /// Apply reactive operations to each row of the input field. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ForEachOpSpec { @@ -190,6 +320,21 @@ pub struct ForEachOpSpec { pub op_scope: ReactiveOpScope, } +impl ForEachOpSpec { + pub fn get_label(&self) -> String { + format!("Loop over {}", self.field_path) + } +} + +impl SpecFormatter for ForEachOpSpec { + fn format(&self, mode: OutputMode) -> String { + match mode { + OutputMode::Concise => self.get_label(), + OutputMode::Verbose => format!("field={}", self.field_path), + } + } +} + /// Emit data to a given collector at the given scope. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CollectOpSpec { @@ -204,6 +349,26 @@ pub struct CollectOpSpec { pub auto_uuid_field: Option, } +impl SpecFormatter for CollectOpSpec { + fn format(&self, mode: OutputMode) -> String { + let uuid = self.auto_uuid_field.as_deref().unwrap_or("none"); + match mode { + OutputMode::Concise => { + format!( + "collector={}, input={}, uuid={}", + self.collector_name, self.input, uuid + ) + } + OutputMode::Verbose => { + format!( + "scope={}, collector={}, input=[{}], uuid={}", + self.scope_name, self.collector_name, self.input, uuid + ) + } + } + } +} + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum VectorSimilarityMetric { CosineSimilarity, @@ -211,8 +376,8 @@ pub enum VectorSimilarityMetric { InnerProduct, } -impl std::fmt::Display for VectorSimilarityMetric { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for VectorSimilarityMetric { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { VectorSimilarityMetric::CosineSimilarity => write!(f, "Cosine"), VectorSimilarityMetric::L2Distance => write!(f, "L2"), @@ -227,6 +392,12 @@ pub struct VectorIndexDef { pub metric: VectorSimilarityMetric, } +impl fmt::Display for VectorIndexDef { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}:{}", self.field_name, self.metric) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct IndexOptions { #[serde(default, skip_serializing_if = "Option::is_none")] @@ -235,6 +406,23 @@ pub struct IndexOptions { pub vector_indexes: Vec, } +impl fmt::Display for IndexOptions { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let primary_keys = self + .primary_key_fields + .as_ref() + .map(|p| p.join(",")) + .unwrap_or_default(); + let vector_indexes = self + .vector_indexes + .iter() + .map(|v| v.to_string()) + .collect::>() + .join(","); + write!(f, "keys={}, indexes={}", primary_keys, vector_indexes) + } +} + /// Store data to a given sink. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ExportOpSpec { @@ -244,6 +432,20 @@ pub struct ExportOpSpec { pub setup_by_user: bool, } +impl SpecFormatter for ExportOpSpec { + fn format(&self, mode: OutputMode) -> String { + let target_str = self.target.format(mode); + let base = format!( + "collector={}, target={}, {}", + self.collector_name, target_str, self.index_options + ); + match mode { + OutputMode::Concise => base, + OutputMode::Verbose => format!("{}, setup_by_user={}", base, self.setup_by_user), + } + } +} + /// A reactive operation reacts on given input values. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "action")] @@ -253,6 +455,19 @@ pub enum ReactiveOpSpec { Collect(CollectOpSpec), } +impl SpecFormatter for ReactiveOpSpec { + fn format(&self, mode: OutputMode) -> String { + match self { + ReactiveOpSpec::Transform(t) => format!("Transform: {}", t.format(mode)), + ReactiveOpSpec::ForEach(fe) => match mode { + OutputMode::Concise => format!("{}", fe.get_label()), + OutputMode::Verbose => format!("ForEach: {}", fe.format(mode)), + }, + ReactiveOpSpec::Collect(c) => format!("Collect: {}", c.format(mode)), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ReactiveOpScope { pub name: ScopeName, @@ -260,6 +475,12 @@ pub struct ReactiveOpScope { // TODO: Suport collectors } +impl fmt::Display for ReactiveOpScope { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Scope: name={}", self.name) + } +} + /// A flow defines the rule to sync data from given sources to given sinks with given transformations. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowInstanceSpec { @@ -301,14 +522,14 @@ pub struct AuthEntryReference { _phantom: std::marker::PhantomData, } -impl std::fmt::Debug for AuthEntryReference { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for AuthEntryReference { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "AuthEntryReference({})", self.key) } } -impl std::fmt::Display for AuthEntryReference { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for AuthEntryReference { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "AuthEntryReference({})", self.key) } } diff --git a/src/py/mod.rs b/src/py/mod.rs index c02001ca..3c09c99f 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -2,6 +2,7 @@ use crate::prelude::*; use crate::base::schema::{FieldSchema, ValueType}; use crate::base::spec::VectorSimilarityMetric; +use crate::base::spec::{NamedSpec, OutputMode, ReactiveOpSpec, SpecFormatter}; use crate::execution::query; use crate::lib_context::{clear_lib_context, get_auth_registry, init_lib_context}; use crate::ops::interface::{QueryResult, QueryResults}; @@ -113,6 +114,24 @@ impl IndexUpdateInfo { #[pyclass] pub struct Flow(pub Arc); +/// A single line in the rendered spec, with hierarchical children +#[pyclass(get_all, set_all)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RenderedSpecLine { + /// The formatted content of the line (e.g., "Import: name=documents, source=LocalFile") + pub content: String, + /// Child lines in the hierarchy + pub children: Vec, +} + +/// A rendered specification, grouped by sections +#[pyclass(get_all, set_all)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RenderedSpec { + /// List of (section_name, lines) pairs + pub sections: Vec<(String, Vec)>, +} + #[pyclass] pub struct FlowLiveUpdater(pub Arc>); @@ -196,6 +215,75 @@ impl Flow { }) } + #[pyo3(signature = (output_mode=None))] + pub fn get_spec(&self, output_mode: Option>) -> PyResult { + let mode = output_mode.map_or(OutputMode::Concise, |m| m.into_inner()); + let spec = &self.0.flow.flow_instance; + let mut sections: IndexMap> = IndexMap::new(); + + // Sources + sections.insert( + "Source".to_string(), + spec.import_ops + .iter() + .map(|op| RenderedSpecLine { + content: format!("Import: name={}, {}", op.name, op.spec.format(mode)), + children: vec![], + }) + .collect(), + ); + + // Processing + fn walk(op: &NamedSpec, mode: OutputMode) -> RenderedSpecLine { + let content = format!("{}: {}", op.name, op.spec.format(mode)); + + let children = match &op.spec { + ReactiveOpSpec::ForEach(fe) => fe + .op_scope + .ops + .iter() + .map(|nested| walk(nested, mode)) + .collect(), + _ => vec![], + }; + + RenderedSpecLine { content, children } + } + + sections.insert( + "Processing".to_string(), + spec.reactive_ops.iter().map(|op| walk(op, mode)).collect(), + ); + + // Targets + sections.insert( + "Targets".to_string(), + spec.export_ops + .iter() + .map(|op| RenderedSpecLine { + content: format!("Export: name={}, {}", op.name, op.spec.format(mode)), + children: vec![], + }) + .collect(), + ); + + // Declarations + sections.insert( + "Declarations".to_string(), + spec.declarations + .iter() + .map(|decl| RenderedSpecLine { + content: format!("Declaration: {}", decl.format(mode)), + children: vec![], + }) + .collect(), + ); + + Ok(RenderedSpec { + sections: sections.into_iter().collect(), + }) + } + pub fn get_schema(&self) -> Vec<(String, String, String)> { let schema = &self.0.flow.data_schema; let mut result = Vec::new(); @@ -444,6 +532,8 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) }