From 62a714e99c75c13053c68604e13c7fff534fb60c Mon Sep 17 00:00:00 2001 From: lemorage Date: Tue, 13 May 2025 11:02:55 +0200 Subject: [PATCH 1/7] feat(flow): update text label to handle empty lines instead of color style --- python/cocoindex/flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index f85f77b7..37d1f209 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -484,7 +484,7 @@ def _render_spec(self, verbose: bool = False) -> Tree: tree = Tree(f"Flow: {self.name}", style="cyan") def build_tree(label: str, lines: list): - node = Tree(label, style="bold magenta" if lines else "cyan") + node = Tree(label=label if lines else label + " None", style="cyan") for line in lines: child_node = node.add(Text(line.content, style="yellow")) child_node.children = build_tree("", line.children).children From 2c28bed7a2bd8e27a0c0fda420e6ad29b2fe5a11 Mon Sep 17 00:00:00 2001 From: lemorage Date: Wed, 14 May 2025 05:04:00 +0200 Subject: [PATCH 2/7] feat: update to use standalone cocoindex cli --- pyproject.toml | 3 + python/cocoindex/cli.py | 137 +++++++++++++++++++++++++++++++++++----- python/cocoindex/lib.py | 73 +++++++-------------- 3 files changed, 147 insertions(+), 66 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 13a83091..3b1366ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,9 @@ dependencies = ["sentence-transformers>=3.3.1", "click>=8.1.8", "rich>=14.0.0"] license = "Apache-2.0" urls = { Homepage = "https://cocoindex.io/" } +[project.scripts] +cocoindex = "cocoindex.cli:cli" + [tool.maturin] bindings = "pyo3" python-source = "python" diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 11dfe00c..45326973 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -1,27 +1,90 @@ import click import datetime +import sys +import importlib.util +import os +import atexit from rich.console import Console from rich.table import Table -from . import flow, lib, setting +from . import flow, lib, setting, query from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes +# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc. +COCOINDEX_HOST = 'https://cocoindex.io' + +def _load_user_app(app_path: str): + """Loads the user's application file as a module. Exits on failure.""" + if not app_path: + click.echo("Internal Error: Application path not provided.", err=True) + sys.exit(1) + + app_path = os.path.abspath(app_path) + app_dir = os.path.dirname(app_path) + module_name = os.path.splitext(os.path.basename(app_path))[0] + + original_sys_path = list(sys.path) + if app_dir not in sys.path: + sys.path.insert(0, app_dir) + + try: + spec = importlib.util.spec_from_file_location(module_name, app_path) + if spec is None: + raise ImportError(f"Could not load spec for file: {app_path}") + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + except Exception as e: + raise click.ClickException(f"Failed importing application module '{os.path.basename(app_path)}': {e}") + finally: + sys.path = original_sys_path + +def _ensure_flows_and_handlers_built(): + """Builds flows and handlers after app load. Exits on failure.""" + try: + flow.ensure_all_flows_built() + query.ensure_all_handlers_built() + except Exception as e: + click.echo(f"\nError: Failed processing flows/handlers from application.", err=True) + click.echo(f"Reason: {e}", err=True) + sys.exit(1) + @click.group() +@click.version_option(package_name="cocoindex", message="%(prog)s version %(version)s") def cli(): """ - CLI for Cocoindex. + CLI for Cocoindex. Requires --app for most commands. """ + try: + settings = setting.Settings.from_env() + lib.init(settings) + atexit.register(lib.stop) + except Exception as e: + raise click.ClickException(f"Failed to initialize CocoIndex library: {e}") @cli.command() +@click.option( + '--app', 'app_path', required=False, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to the Python file defining flows." +) @click.option( "-a", "--all", "show_all", is_flag=True, show_default=True, default=False, help="Also show all flows with persisted setup, even if not defined in the current process.") -def ls(show_all: bool): +def ls(app_path: str | None, show_all: bool): """ List all flows. """ - current_flow_names = flow.flow_names() + current_flow_names = set() + + if app_path: + _load_user_app(app_path) + current_flow_names = set(flow.flow_names()) + elif not show_all: + raise click.UsageError("The --app option is required unless using --all.") + persisted_flow_names = flow_names_with_setup() remaining_persisted_flow_names = set(persisted_flow_names) @@ -52,13 +115,20 @@ def ls(show_all: bool): click.echo(' [?]: Flows with persisted setup, but not in the current process.') @cli.command() +@click.option( + '--app', 'app_path', required=True, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to the Python file defining the flow." +) @click.argument("flow_name", type=str, required=False) @click.option("--color/--no-color", default=True, help="Enable or disable colored output.") @click.option("--verbose", is_flag=True, help="Show verbose output with full details.") -def show(flow_name: str | None, color: bool, verbose: bool): +def show(app_path: str, flow_name: str | None, color: bool, verbose: bool): """ - Show the flow spec and schema in a readable format with colored output. + Show the flow spec and schema in a readable format. """ + _load_user_app(app_path) + flow = _flow_by_name(flow_name) console = Console(no_color=not color) console.print(flow._render_spec(verbose=verbose)) @@ -66,7 +136,7 @@ def show(flow_name: str | None, color: bool, verbose: bool): console.print() table = Table( title=f"Schema for Flow: {flow.name}", - show_header=True, + title_style="cyan", header_style="bold magenta" ) table.add_column("Field", style="cyan") @@ -79,11 +149,17 @@ def show(flow_name: str | None, color: bool, verbose: bool): console.print(table) @cli.command() -def setup(): +@click.option( + '--app', 'app_path', required=True, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to the Python file defining flows to set up." +) +def setup(app_path: str): """ Check and apply backend setup changes for flows, including the internal and target storage (to export). """ + _load_user_app(app_path) setup_status = sync_setup() click.echo(setup_status) if setup_status.is_up_to_date(): @@ -95,16 +171,25 @@ def setup(): apply_setup_changes(setup_status) @cli.command() +@click.option( + '--app', 'app_path', required=False, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to the app file (needed if not using --all or specific names)." +) @click.argument("flow_name", type=str, nargs=-1) @click.option( "-a", "--all", "drop_all", is_flag=True, show_default=True, default=False, help="Drop the backend setup for all flows with persisted setup, " "even if not defined in the current process.") -def drop(flow_name: tuple[str, ...], drop_all: bool): +def drop(app_path: str | None, flow_name: tuple[str, ...], drop_all: bool): """ Drop the backend setup for specified flows. If no flow is specified, all flows defined in the current process will be dropped. """ + if not app_path: + raise click.UsageError("The --app option is required when dropping flows defined in the app (and not using --all or specific flow names).") + _load_user_app(app_path) + if drop_all: flow_names = flow_names_with_setup() elif len(flow_name) == 0: @@ -122,6 +207,11 @@ def drop(flow_name: tuple[str, ...], drop_all: bool): apply_setup_changes(setup_status) @cli.command() +@click.option( + '--app', 'app_path', required=True, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to the Python file defining flows." +) @click.argument("flow_name", type=str, required=False) @click.option( "-L", "--live", is_flag=True, show_default=True, default=False, @@ -129,10 +219,11 @@ def drop(flow_name: tuple[str, ...], drop_all: bool): @click.option( "-q", "--quiet", is_flag=True, show_default=True, default=False, help="Avoid printing anything to the standard output, e.g. statistics.") -def update(flow_name: str | None, live: bool, quiet: bool): +def update(app_path: str, flow_name: str | None, live: bool, quiet: bool): """ Update the index to reflect the latest data from data sources. """ + _load_user_app(app_path) options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet) if flow_name is None: return flow.update_all_flows(options) @@ -142,6 +233,11 @@ def update(flow_name: str | None, live: bool, quiet: bool): return updater.update_stats() @cli.command() +@click.option( + '--app', 'app_path', required=True, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to the Python file defining the flow." +) @click.argument("flow_name", type=str, required=False) @click.option( "-o", "--output-dir", type=str, required=False, @@ -151,23 +247,26 @@ def update(flow_name: str | None, live: bool, quiet: bool): help="Use already-cached intermediate data if available. " "Note that we only reuse existing cached data without updating the cache " "even if it's turned on.") -def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True): +def evaluate(app_path: str, flow_name: str | None, output_dir: str | None, cache: bool = True): """ Evaluate the flow and dump flow outputs to files. Instead of updating the index, it dumps what should be indexed to files. Mainly used for evaluation purpose. """ + _load_user_app(app_path) fl = _flow_by_name(flow_name) if output_dir is None: output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache) fl.evaluate_and_dump(options) -# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc. -COCOINDEX_HOST = 'https://cocoindex.io' - @cli.command() +@click.option( + "--app", "app_path", required=True, + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to the Python file defining flows and handlers." +) @click.option( "-a", "--address", type=str, help="The address to bind the server to, in the format of IP:PORT. " @@ -190,13 +289,16 @@ def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True): @click.option( "-q", "--quiet", is_flag=True, show_default=True, default=False, help="Avoid printing anything to the standard output, e.g. statistics.") -def server(address: str | None, live_update: bool, quiet: bool, cors_origin: str | None, - cors_cocoindex: bool, cors_local: int | None): +def server(app_path: str, address: str | None, live_update: bool, quiet: bool, + cors_origin: str | None, cors_cocoindex: bool, cors_local: int | None): """ Start a HTTP server providing REST APIs. It will allow tools like CocoInsight to access the server. """ + _load_user_app(app_path) + _ensure_flows_and_handlers_built() + server_settings = setting.ServerSettings.from_env() cors_origins: set[str] = set(server_settings.cors_origins or []) if cors_origin is not None: @@ -235,3 +337,6 @@ def _flow_name(name: str | None) -> str: def _flow_by_name(name: str | None) -> flow.Flow: return flow.flow_by_name(_flow_name(name)) + +if __name__ == "__main__": + cli() \ No newline at end of file diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index c83627f7..88738b7d 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -1,14 +1,10 @@ """ Library level functions and states. """ -import sys -import functools -import inspect +import warnings +from typing import Callable, Any -from typing import Callable - -from . import _engine -from . import flow, query, cli, setting +from . import _engine, setting from .convert import dump_engine_object @@ -19,8 +15,6 @@ def init(settings: setting.Settings): def start_server(settings: setting.ServerSettings): """Start the cocoindex server.""" - flow.ensure_all_flows_built() - query.ensure_all_handlers_built() _engine.start_server(settings.__dict__) def stop(): @@ -28,50 +22,29 @@ def stop(): _engine.stop() def main_fn( - settings: setting.Settings | None = None, - cocoindex_cmd: str = 'cocoindex', + settings: Any | None = None, + cocoindex_cmd: str | None = None, ) -> Callable[[Callable], Callable]: """ - A decorator to wrap the main function. - If the python binary is called with the given command, it yields control to the cocoindex CLI. - - If the settings are not provided, they are loaded from the environment variables. + DEPRECATED: Using @cocoindex.main_fn() is no longer supported and has no effect. + This decorator will be removed in a future version, which will cause an AttributeError. + Please remove it from your code and use the standalone 'cocoindex' CLI. """ - - def _pre_init() -> None: - effective_settings = settings or setting.Settings.from_env() - init(effective_settings) - - def _should_run_cli() -> bool: - return len(sys.argv) > 1 and sys.argv[1] == cocoindex_cmd - - def _run_cli(): - return cli.cli.main(sys.argv[2:], prog_name=f"{sys.argv[0]} {sys.argv[1]}") + warnings.warn( + "\n\n" + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + "CRITICAL DEPRECATION NOTICE from CocoIndex:\n" + "The @cocoindex.main_fn() decorator found in your script is DEPRECATED and IGNORED.\n" + "It provides NO functionality and will be REMOVED entirely in a future version.\n" + "If not removed, your script will FAIL with an AttributeError in the future.\n\n" + "ACTION REQUIRED: Please REMOVE @cocoindex.main_fn() from your Python script.\n\n" + "To use CocoIndex commands, invoke the standalone 'cocoindex' CLI:\n" + " cocoindex [options] --app \n" + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n", + DeprecationWarning, + stacklevel=2 + ) def _main_wrapper(fn: Callable) -> Callable: - if inspect.iscoroutinefunction(fn): - @functools.wraps(fn) - async def _inner(*args, **kwargs): - _pre_init() - try: - if _should_run_cli(): - # Schedule to a separate thread as it invokes nested event loop. - # return await asyncio.to_thread(_run_cli) - return _run_cli() - return await fn(*args, **kwargs) - finally: - stop() - return _inner - else: - @functools.wraps(fn) - def _inner(*args, **kwargs): - _pre_init() - try: - if _should_run_cli(): - return _run_cli() - return fn(*args, **kwargs) - finally: - stop() - return _inner - + return fn return _main_wrapper From 468c5506156396b019dcad177c26bc74ed0045b2 Mon Sep 17 00:00:00 2001 From: lemorage Date: Thu, 15 May 2025 06:30:30 +0200 Subject: [PATCH 3/7] feat: pass in `:` directly in new CLI --- python/cocoindex/cli.py | 351 ++++++++++++++++++++++++---------------- python/cocoindex/lib.py | 29 +++- 2 files changed, 236 insertions(+), 144 deletions(-) diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 45326973..a80a97b4 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -14,48 +14,83 @@ # Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc. COCOINDEX_HOST = 'https://cocoindex.io' -def _load_user_app(app_path: str): - """Loads the user's application file as a module. Exits on failure.""" - if not app_path: - click.echo("Internal Error: Application path not provided.", err=True) - sys.exit(1) - - app_path = os.path.abspath(app_path) - app_dir = os.path.dirname(app_path) - module_name = os.path.splitext(os.path.basename(app_path))[0] - - original_sys_path = list(sys.path) - if app_dir not in sys.path: - sys.path.insert(0, app_dir) +def _parse_app_flow_specifier(specifier: str) -> tuple[str, str | None]: + """Parses 'module_or_path[:flow_name]' into (module_or_path, flow_name | None).""" + parts = specifier.split(":", 1) # Split only on the first colon + app_ref = parts[0] + + if not app_ref: + raise click.BadParameter( + f"Application module/path part is missing or invalid in specifier: '{specifier}'. " + "Expected format like 'myapp.py' or 'myapp:MyFlow'.", + param_hint="APP_SPECIFIER" + ) + + if len(parts) > 1: + flow_ref_part = parts[1] + + if not flow_ref_part: + # Handles "app_ref:" (empty string after colon) + return app_ref, None + + if not flow_ref_part.isidentifier(): + raise click.BadParameter( + f"Invalid format for flow name part ('{flow_ref_part}') in specifier '{specifier}'. " + "If a colon separates the application from the flow name, the flow name should typically be " + "a valid identifier (e.g., alphanumeric with underscores, not starting with a number).", + param_hint="APP_SPECIFIER" + ) + return app_ref, flow_ref_part + else: + return app_ref, None +def _load_user_app(app_target: str): + """ + Loads the user's application, which can be a file path or an installed module name. + Exits on failure. + """ + if not app_target: + raise click.ClickException("Application target not provided.") + + looks_like_path = os.sep in app_target or app_target.lower().endswith(".py") + + if looks_like_path or os.path.isfile(app_target): + if not os.path.isfile(app_target): + if looks_like_path and not os.path.exists(app_target): + raise click.ClickException(f"Application file path not found: {app_target}") + app_path = os.path.abspath(app_target) + app_dir = os.path.dirname(app_path) + module_name = os.path.splitext(os.path.basename(app_path))[0] + + if app_dir not in sys.path: + sys.path.insert(0, app_dir) + try: + spec = importlib.util.spec_from_file_location(module_name, app_path) + if spec is None: + raise ImportError(f"Could not create spec for file: {app_path}") + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + except (ImportError, FileNotFoundError, PermissionError) as e: + raise click.ClickException(f"Failed importing file '{app_path}': {e}") + finally: + if app_dir in sys.path and sys.path[0] == app_dir: + sys.path.pop(0) + + # Try as module try: - spec = importlib.util.spec_from_file_location(module_name, app_path) - if spec is None: - raise ImportError(f"Could not load spec for file: {app_path}") - module = importlib.util.module_from_spec(spec) - sys.modules[spec.name] = module - spec.loader.exec_module(module) - return module + return importlib.import_module(app_target) + except ImportError as e: + raise click.ClickException(f"Failed to load module '{app_target}': {e}") except Exception as e: - raise click.ClickException(f"Failed importing application module '{os.path.basename(app_path)}': {e}") - finally: - sys.path = original_sys_path - -def _ensure_flows_and_handlers_built(): - """Builds flows and handlers after app load. Exits on failure.""" - try: - flow.ensure_all_flows_built() - query.ensure_all_handlers_built() - except Exception as e: - click.echo(f"\nError: Failed processing flows/handlers from application.", err=True) - click.echo(f"Reason: {e}", err=True) - sys.exit(1) + raise click.ClickException(f"Unexpected error importing module '{app_target}': {e}") @click.group() @click.version_option(package_name="cocoindex", message="%(prog)s version %(version)s") def cli(): """ - CLI for Cocoindex. Requires --app for most commands. + CLI for Cocoindex. """ try: settings = setting.Settings.from_env() @@ -65,101 +100,118 @@ def cli(): raise click.ClickException(f"Failed to initialize CocoIndex library: {e}") @cli.command() -@click.option( - '--app', 'app_path', required=False, - type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), - help="Path to the Python file defining flows." -) -@click.option( - "-a", "--all", "show_all", is_flag=True, show_default=True, default=False, - help="Also show all flows with persisted setup, even if not defined in the current process.") -def ls(app_path: str | None, show_all: bool): +@click.argument("app_target", type=str, required=False) +def ls(app_target: str | None): """ List all flows. + + If APP_TARGET (path/to/app.py or module) is provided, lists flows + defined in the app and their backend setup status, also showing any + other flows with persisted setup not defined in the app. + + If APP_TARGET is omitted, lists all flows that have a persisted + setup in the backend. """ current_flow_names = set() + app_ref = None + + if app_target: + app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore flow name for ls + _load_user_app(app_ref) + if current_flow_names := set(flow.flow_names()): + click.echo(f"Displaying flows relative to application: '{app_ref}'", err=True) + else: + click.echo(f"No flows are defined in '{app_ref}'. Here are all the flows in the backend:", err=True) + else: + click.echo("Displaying all flows with persisted setup in the backend:", err=True) - if app_path: - _load_user_app(app_path) - current_flow_names = set(flow.flow_names()) - elif not show_all: - raise click.UsageError("The --app option is required unless using --all.") + persisted_flow_names = set(flow_names_with_setup()) + all_known_names = sorted(list(current_flow_names | persisted_flow_names)) - persisted_flow_names = flow_names_with_setup() - remaining_persisted_flow_names = set(persisted_flow_names) + if not all_known_names: + click.echo("No persisted flow setups found in the backend.") + return has_missing_setup = False has_extra_setup = False - for name in current_flow_names: - if name in remaining_persisted_flow_names: - remaining_persisted_flow_names.remove(name) - suffix = '' + for name in all_known_names: + is_in_current_app = name in current_flow_names + is_persisted = name in persisted_flow_names + suffix = '' + + if is_in_current_app: + if is_persisted: + suffix = '' # Defined in app and setup exists + else: + suffix = ' [+]' # Defined in app, setup missing + has_missing_setup = True + elif is_persisted: + suffix = ' [?]' # Setup exists in backend, not defined in loaded app + has_extra_setup = True else: - suffix = ' [+]' - has_missing_setup = True + # Ideally not gonna happen + continue click.echo(f'{name}{suffix}') - if show_all: - for name in persisted_flow_names: - if name in remaining_persisted_flow_names: - click.echo(f'{name} [?]') - has_extra_setup = True - if has_missing_setup or has_extra_setup: click.echo('') click.echo('Notes:') if has_missing_setup: - click.echo(' [+]: Flows present in the current process, but missing setup.') + app_context_message = f" (in '{app_ref}')" if app_ref else "" + click.echo(f' [+]: Flows defined in the application{app_context_message}, but backend setup is missing.') if has_extra_setup: - click.echo(' [?]: Flows with persisted setup, but not in the current process.') + app_context_message = f" (not defined in'{app_ref}')" if app_ref else "" + click.echo(f' [?]: Flows with persisted setup in the backend{app_context_message}.') @cli.command() -@click.option( - '--app', 'app_path', required=True, - type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), - help="Path to the Python file defining the flow." -) -@click.argument("flow_name", type=str, required=False) +@click.argument("app_flow_specifier", type=str) @click.option("--color/--no-color", default=True, help="Enable or disable colored output.") @click.option("--verbose", is_flag=True, help="Show verbose output with full details.") -def show(app_path: str, flow_name: str | None, color: bool, verbose: bool): +def show(app_flow_specifier: str, color: bool, verbose: bool): """ - Show the flow spec and schema in a readable format. + Show the flow spec and schema. + + APP_FLOW_SPECIFIER: Specifies the application and optionally the target flow. + Can be one of the following formats:\n + - path/to/your_app.py\n + - an_installed.module_name\n + - path/to/your_app.py:SpecificFlowName\n + - an_installed.module_name:SpecificFlowName + + :SpecificFlowName can be omitted only if the application defines a single flow. """ - _load_user_app(app_path) + app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) + _load_user_app(app_ref) - flow = _flow_by_name(flow_name) + fl = _flow_by_name(flow_ref) console = Console(no_color=not color) - console.print(flow._render_spec(verbose=verbose)) - + console.print(fl._render_spec(verbose=verbose)) console.print() table = Table( - title=f"Schema for Flow: {flow.name}", + title=f"Schema for Flow: {fl.name}", title_style="cyan", header_style="bold magenta" ) table.add_column("Field", style="cyan") table.add_column("Type", style="green") table.add_column("Attributes", style="yellow") - - for field_name, field_type, attr_str in flow._get_schema(): + for field_name, field_type, attr_str in fl._get_schema(): table.add_row(field_name, field_type, attr_str) - console.print(table) @cli.command() -@click.option( - '--app', 'app_path', required=True, - type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), - help="Path to the Python file defining flows to set up." -) -def setup(app_path: str): +@click.argument("app_target", type=str) +def setup(app_target: str): """ Check and apply backend setup changes for flows, including the internal and target storage (to export). + + APP_TARGET: path/to/app.py or installed_module. """ - _load_user_app(app_path) + app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore flow name for setup + _load_user_app(app_ref) + setup_status = sync_setup() click.echo(setup_status) if setup_status.is_up_to_date(): @@ -170,103 +222,125 @@ def setup(app_path: str): return apply_setup_changes(setup_status) -@cli.command() -@click.option( - '--app', 'app_path', required=False, - type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), - help="Path to the app file (needed if not using --all or specific names)." -) +@cli.command("drop") +@click.argument("app_target", type=str, required=False) @click.argument("flow_name", type=str, nargs=-1) @click.option( "-a", "--all", "drop_all", is_flag=True, show_default=True, default=False, help="Drop the backend setup for all flows with persisted setup, " - "even if not defined in the current process.") -def drop(app_path: str | None, flow_name: tuple[str, ...], drop_all: bool): + "even if not defined in the current process." + "If used, APP_TARGET and any listed flow names are ignored.") +def drop(app_target: str | None, flow_name: tuple[str, ...], drop_all: bool): """ - Drop the backend setup for specified flows. - If no flow is specified, all flows defined in the current process will be dropped. + Drop the backend setup for flows. + + Modes of operation:\n + 1. Drop ALL persisted setups: `cocoindex drop --all`\n + 2. Drop all flows defined in an app: `cocoindex drop `\n + 3. Drop specific named flows: `cocoindex drop [FLOW_NAME...]` """ - if not app_path: - raise click.UsageError("The --app option is required when dropping flows defined in the app (and not using --all or specific flow names).") - _load_user_app(app_path) + app_ref = None + flow_names = [] if drop_all: + if app_target or flow_name: + click.echo("Warning: When --all is used, APP_TARGET and any individual flow names are ignored.", err=True) flow_names = flow_names_with_setup() - elif len(flow_name) == 0: - flow_names = [fl.name for fl in flow.flows()] + elif app_target: + app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore any :FlowName part + _load_user_app(app_ref) + if flow_name: + flow_names = list(flow_name) + click.echo(f"Preparing to drop specified flows: {', '.join(flow_names)} (in '{app_ref}').", err=True) + else: + flow_names = [fl.name for fl in flow.flows()] + if not flow_names: + click.echo(f"No flows found defined in '{app_ref}' to drop.") + return + click.echo(f"Preparing to drop all flows defined in '{app_ref}': {', '.join(flow_names)}.", err=True) else: - flow_names = list(flow_name) + raise click.UsageError( + "Missing arguments. You must either provide an APP_TARGET (to target app-specific flows) " + "or use the --all flag." + ) + + if not flow_names: + click.echo("No flows identified for the drop operation.") + return + setup_status = drop_setup(flow_names) click.echo(setup_status) if setup_status.is_up_to_date(): click.echo("No flows need to be dropped.") return if not click.confirm( - "Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False): + f"\nThis will apply changes to drop setup for: {', '.join(flow_names)}. Continue? [yes/N]", + default=False, show_default=False): + click.echo("Drop operation aborted by user.") return apply_setup_changes(setup_status) @cli.command() -@click.option( - '--app', 'app_path', required=True, - type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), - help="Path to the Python file defining flows." -) -@click.argument("flow_name", type=str, required=False) +@click.argument("app_flow_specifier", type=str) @click.option( "-L", "--live", is_flag=True, show_default=True, default=False, help="Continuously watch changes from data sources and apply to the target index.") @click.option( "-q", "--quiet", is_flag=True, show_default=True, default=False, help="Avoid printing anything to the standard output, e.g. statistics.") -def update(app_path: str, flow_name: str | None, live: bool, quiet: bool): +def update(app_flow_specifier: str, live: bool, quiet: bool): """ Update the index to reflect the latest data from data sources. + + APP_FLOW_SPECIFIER: path/to/app.py, module, path/to/app.py:FlowName, or module:FlowName. + If :FlowName is omitted, updates all flows. """ - _load_user_app(app_path) + app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) + _load_user_app(app_ref) + options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet) - if flow_name is None: + if flow_ref is None: return flow.update_all_flows(options) else: - with flow.FlowLiveUpdater(_flow_by_name(flow_name), options) as updater: + with flow.FlowLiveUpdater(_flow_by_name(flow_ref), options) as updater: updater.wait() return updater.update_stats() @cli.command() -@click.option( - '--app', 'app_path', required=True, - type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), - help="Path to the Python file defining the flow." -) -@click.argument("flow_name", type=str, required=False) +@click.argument("app_flow_specifier", type=str) @click.option( "-o", "--output-dir", type=str, required=False, help="The directory to dump the output to.") @click.option( "--cache/--no-cache", is_flag=True, show_default=True, default=True, - help="Use already-cached intermediate data if available. " - "Note that we only reuse existing cached data without updating the cache " - "even if it's turned on.") -def evaluate(app_path: str, flow_name: str | None, output_dir: str | None, cache: bool = True): + help="Use already-cached intermediate data if available.") +def evaluate(app_flow_specifier: str, output_dir: str | None, cache: bool = True): """ Evaluate the flow and dump flow outputs to files. Instead of updating the index, it dumps what should be indexed to files. Mainly used for evaluation purpose. + + APP_FLOW_SPECIFIER: Specifies the application and optionally the target flow. + Can be one of the following formats:\n + - path/to/your_app.py\n + - an_installed.module_name\n + - path/to/your_app.py:SpecificFlowName\n + - an_installed.module_name:SpecificFlowName + + :SpecificFlowName can be omitted only if the application defines a single flow. """ - _load_user_app(app_path) - fl = _flow_by_name(flow_name) + app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) + _load_user_app(app_ref) + + fl = _flow_by_name(flow_ref) if output_dir is None: output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache) fl.evaluate_and_dump(options) @cli.command() -@click.option( - "--app", "app_path", required=True, - type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), - help="Path to the Python file defining flows and handlers." -) +@click.argument("app_target", type=str) @click.option( "-a", "--address", type=str, help="The address to bind the server to, in the format of IP:PORT. " @@ -289,15 +363,17 @@ def evaluate(app_path: str, flow_name: str | None, output_dir: str | None, cache @click.option( "-q", "--quiet", is_flag=True, show_default=True, default=False, help="Avoid printing anything to the standard output, e.g. statistics.") -def server(app_path: str, address: str | None, live_update: bool, quiet: bool, +def server(app_target: str, address: str | None, live_update: bool, quiet: bool, cors_origin: str | None, cors_cocoindex: bool, cors_local: int | None): """ Start a HTTP server providing REST APIs. It will allow tools like CocoInsight to access the server. + + APP_TARGET: path/to/app.py or installed_module. """ - _load_user_app(app_path) - _ensure_flows_and_handlers_built() + app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore flow name for server + _load_user_app(app_ref) server_settings = setting.ServerSettings.from_env() cors_origins: set[str] = set(server_settings.cors_origins or []) @@ -324,16 +400,17 @@ def server(app_path: str, address: str | None, live_update: bool, quiet: bool, def _flow_name(name: str | None) -> str: names = flow.flow_names() + available = ', '.join(sorted(names)) if name is not None: if name not in names: - raise click.BadParameter(f"Flow {name} not found") + raise click.BadParameter(f"Flow '{name}' not found.\nAvailable: {available if names else 'None'}") return name if len(names) == 0: - raise click.UsageError("No flows available") + raise click.UsageError("No flows available in the loaded application.") elif len(names) == 1: return names[0] else: - raise click.UsageError("Multiple flows available, please specify --name") + raise click.UsageError(f"Multiple flows available, please specify which flow to target by appending :FlowName to the APP_TARGET.\nAvailable: {available}") def _flow_by_name(name: str | None) -> flow.Flow: return flow.flow_by_name(_flow_name(name)) diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index 88738b7d..7f27688b 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -4,7 +4,7 @@ import warnings from typing import Callable, Any -from . import _engine, setting +from . import _engine, flow, query, setting from .convert import dump_engine_object @@ -15,6 +15,8 @@ def init(settings: setting.Settings): def start_server(settings: setting.ServerSettings): """Start the cocoindex server.""" + flow.ensure_all_flows_built() + query.ensure_all_handlers_built() _engine.start_server(settings.__dict__) def stop(): @@ -26,20 +28,33 @@ def main_fn( cocoindex_cmd: str | None = None, ) -> Callable[[Callable], Callable]: """ - DEPRECATED: Using @cocoindex.main_fn() is no longer supported and has no effect. - This decorator will be removed in a future version, which will cause an AttributeError. - Please remove it from your code and use the standalone 'cocoindex' CLI. + DEPRECATED: The @cocoindex.main_fn() decorator is obsolete and has no effect. + It will be removed in a future version, which will cause an AttributeError. + + Please remove this decorator from your code and use the standalone 'cocoindex' CLI. + See the updated CLI usage examples in the warning message. """ warnings.warn( "\n\n" "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" "CRITICAL DEPRECATION NOTICE from CocoIndex:\n" - "The @cocoindex.main_fn() decorator found in your script is DEPRECATED and IGNORED.\n" + "The @cocoindex.main_fn() decorator in your script is DEPRECATED and IGNORED.\n" "It provides NO functionality and will be REMOVED entirely in a future version.\n" "If not removed, your script will FAIL with an AttributeError in the future.\n\n" "ACTION REQUIRED: Please REMOVE @cocoindex.main_fn() from your Python script.\n\n" - "To use CocoIndex commands, invoke the standalone 'cocoindex' CLI:\n" - " cocoindex [options] --app \n" + "To use CocoIndex, invoke the standalone 'cocoindex' CLI." + " Examples of new CLI usage:\n\n" + " To list flows from 'main.py' (previously 'python main.py cocoindex ls'):\n" + " cocoindex ls main.py\n\n" + " To list all persisted flows (previously 'python main.py cocoindex ls --all'):\n" + " cocoindex ls\n\n" + " To show 'MyFlow' defined in 'main.py' (previously 'python main.py cocoindex show MyFlow'):\n" + " cocoindex show main.py:MyFlow\n\n" + " To update all flows in 'my_package.flows_module':\n" + " cocoindex update my_package.flows_module\n\n" + " To update 'SpecificFlow' in 'my_package.flows_module':\n" + " cocoindex update my_package.flows_module:SpecificFlow\n\n" + "See cocoindex --help for more details.\n" "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n", DeprecationWarning, stacklevel=2 From f96eeb2cae271b80cfe4fc59cf480b0540cd929b Mon Sep 17 00:00:00 2001 From: lemorage Date: Fri, 16 May 2025 17:04:11 +0200 Subject: [PATCH 4/7] feat: only show relevant flows given ls targets --- python/cocoindex/cli.py | 114 +++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 67 deletions(-) diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index a80a97b4..a8515106 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -4,6 +4,7 @@ import importlib.util import os import atexit +import types from rich.console import Console from rich.table import Table @@ -26,25 +27,24 @@ def _parse_app_flow_specifier(specifier: str) -> tuple[str, str | None]: param_hint="APP_SPECIFIER" ) - if len(parts) > 1: - flow_ref_part = parts[1] - - if not flow_ref_part: - # Handles "app_ref:" (empty string after colon) - return app_ref, None - - if not flow_ref_part.isidentifier(): - raise click.BadParameter( - f"Invalid format for flow name part ('{flow_ref_part}') in specifier '{specifier}'. " - "If a colon separates the application from the flow name, the flow name should typically be " - "a valid identifier (e.g., alphanumeric with underscores, not starting with a number).", - param_hint="APP_SPECIFIER" - ) - return app_ref, flow_ref_part - else: + if len(parts) == 1: return app_ref, None -def _load_user_app(app_target: str): + flow_ref_part = parts[1] + + if not flow_ref_part: # Handles empty string after colon + return app_ref, None + + if not flow_ref_part.isidentifier(): + raise click.BadParameter( + f"Invalid format for flow name part ('{flow_ref_part}') in specifier '{specifier}'. " + "If a colon separates the application from the flow name, the flow name should typically be " + "a valid identifier (e.g., alphanumeric with underscores, not starting with a number).", + param_hint="APP_SPECIFIER" + ) + return app_ref, flow_ref_part + +def _load_user_app(app_target: str) -> types.ModuleType: """ Loads the user's application, which can be a file path or an installed module name. Exits on failure. @@ -54,10 +54,9 @@ def _load_user_app(app_target: str): looks_like_path = os.sep in app_target or app_target.lower().endswith(".py") - if looks_like_path or os.path.isfile(app_target): + if looks_like_path: if not os.path.isfile(app_target): - if looks_like_path and not os.path.exists(app_target): - raise click.ClickException(f"Application file path not found: {app_target}") + raise click.ClickException(f"Application file path not found: {app_target}") app_path = os.path.abspath(app_target) app_dir = os.path.dirname(app_path) module_name = os.path.splitext(os.path.basename(app_path))[0] @@ -105,64 +104,45 @@ def ls(app_target: str | None): """ List all flows. - If APP_TARGET (path/to/app.py or module) is provided, lists flows - defined in the app and their backend setup status, also showing any - other flows with persisted setup not defined in the app. + If APP_TARGET (path/to/app.py or a module) is provided, lists flows + defined in the app and their backend setup status. If APP_TARGET is omitted, lists all flows that have a persisted setup in the backend. """ - current_flow_names = set() - app_ref = None - if app_target: - app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore flow name for ls + app_ref, _ = _parse_app_flow_specifier(app_target) _load_user_app(app_ref) - if current_flow_names := set(flow.flow_names()): - click.echo(f"Displaying flows relative to application: '{app_ref}'", err=True) - else: - click.echo(f"No flows are defined in '{app_ref}'. Here are all the flows in the backend:", err=True) - else: - click.echo("Displaying all flows with persisted setup in the backend:", err=True) - persisted_flow_names = set(flow_names_with_setup()) - all_known_names = sorted(list(current_flow_names | persisted_flow_names)) + current_flow_names = set(flow.flow_names()) + persisted_flow_names = set(flow_names_with_setup()) - if not all_known_names: - click.echo("No persisted flow setups found in the backend.") - return + if not current_flow_names: + click.echo(f"No flows are defined in '{app_ref}'.") + return - has_missing_setup = False - has_extra_setup = False + has_missing = False + for name in sorted(current_flow_names): + if name in persisted_flow_names: + click.echo(name) + else: + click.echo(f"{name} [+]") + has_missing = True - for name in all_known_names: - is_in_current_app = name in current_flow_names - is_persisted = name in persisted_flow_names - suffix = '' + if has_missing: + click.echo('') + click.echo('Notes:') + click.echo(' [+]: Flows present in the current process, but missing setup.') - if is_in_current_app: - if is_persisted: - suffix = '' # Defined in app and setup exists - else: - suffix = ' [+]' # Defined in app, setup missing - has_missing_setup = True - elif is_persisted: - suffix = ' [?]' # Setup exists in backend, not defined in loaded app - has_extra_setup = True - else: - # Ideally not gonna happen - continue - click.echo(f'{name}{suffix}') - - if has_missing_setup or has_extra_setup: - click.echo('') - click.echo('Notes:') - if has_missing_setup: - app_context_message = f" (in '{app_ref}')" if app_ref else "" - click.echo(f' [+]: Flows defined in the application{app_context_message}, but backend setup is missing.') - if has_extra_setup: - app_context_message = f" (not defined in'{app_ref}')" if app_ref else "" - click.echo(f' [?]: Flows with persisted setup in the backend{app_context_message}.') + else: + persisted_flow_names = sorted(flow_names_with_setup()) + + if not persisted_flow_names: + click.echo("No persisted flow setups found in the backend.") + return + + for name in persisted_flow_names: + click.echo(name) @cli.command() @click.argument("app_flow_specifier", type=str) From c28f650ffebd0fb49f03d6a02bc1ce2057c53a09 Mon Sep 17 00:00:00 2001 From: lemorage Date: Sun, 18 May 2025 16:45:03 +0200 Subject: [PATCH 5/7] feat: add function to parse app target and warn on flow name usage --- python/cocoindex/cli.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index a8515106..f74760d8 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -44,6 +44,26 @@ def _parse_app_flow_specifier(specifier: str) -> tuple[str, str | None]: ) return app_ref, flow_ref_part +def _get_app_ref_from_specifier( + specifier: str, +) -> str: + """ + Parses the APP_TARGET to get the application reference (path or module). + Issues a warning if a flow name component is also provided in it. + """ + app_ref, flow_ref = _parse_app_flow_specifier(specifier) + + if flow_ref is not None: + click.echo( + click.style( + f"Ignoring flow name '{flow_ref}' in '{specifier}': " + f"this command operates on the entire app/module '{app_ref}'.", + fg='yellow' + ), + err=True + ) + return app_ref + def _load_user_app(app_target: str) -> types.ModuleType: """ Loads the user's application, which can be a file path or an installed module name. @@ -111,7 +131,7 @@ def ls(app_target: str | None): setup in the backend. """ if app_target: - app_ref, _ = _parse_app_flow_specifier(app_target) + app_ref = _get_app_ref_from_specifier(app_target) _load_user_app(app_ref) current_flow_names = set(flow.flow_names()) @@ -189,7 +209,7 @@ def setup(app_target: str): APP_TARGET: path/to/app.py or installed_module. """ - app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore flow name for setup + app_ref = _get_app_ref_from_specifier(app_target) _load_user_app(app_ref) setup_status = sync_setup() @@ -227,7 +247,7 @@ def drop(app_target: str | None, flow_name: tuple[str, ...], drop_all: bool): click.echo("Warning: When --all is used, APP_TARGET and any individual flow names are ignored.", err=True) flow_names = flow_names_with_setup() elif app_target: - app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore any :FlowName part + app_ref = _get_app_ref_from_specifier(app_target) _load_user_app(app_ref) if flow_name: flow_names = list(flow_name) @@ -352,7 +372,7 @@ def server(app_target: str, address: str | None, live_update: bool, quiet: bool, APP_TARGET: path/to/app.py or installed_module. """ - app_ref, _ = _parse_app_flow_specifier(app_target) # Ignore flow name for server + app_ref = _get_app_ref_from_specifier(app_target) _load_user_app(app_ref) server_settings = setting.ServerSettings.from_env() From 75e231658864ce77babbc35b6e229c0427eec3ea Mon Sep 17 00:00:00 2001 From: lemorage Date: Sun, 18 May 2025 17:23:45 +0200 Subject: [PATCH 6/7] feat: add support for loading dot-env files --- pyproject.toml | 2 +- python/cocoindex/cli.py | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3b1366ae..ac0dab29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ description = "With CocoIndex, users declare the transformation, CocoIndex creat authors = [{ name = "CocoIndex", email = "cocoindex.io@gmail.com" }] readme = "README.md" requires-python = ">=3.11" -dependencies = ["sentence-transformers>=3.3.1", "click>=8.1.8", "rich>=14.0.0"] +dependencies = ["sentence-transformers>=3.3.1", "click>=8.1.8", "rich>=14.0.0", "python-dotenv>=1.1.0"] license = "Apache-2.0" urls = { Homepage = "https://cocoindex.io/" } diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index f74760d8..d2c3fe97 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -6,6 +6,7 @@ import atexit import types +from dotenv import load_dotenv from rich.console import Console from rich.table import Table @@ -107,10 +108,25 @@ def _load_user_app(app_target: str) -> types.ModuleType: @click.group() @click.version_option(package_name="cocoindex", message="%(prog)s version %(version)s") -def cli(): +@click.option( + "--env-file", + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to a .env file to load environment variables from. " + "If not provided, attempts to load '.env' from the current directory.", + default=None, + show_default=False +) +def cli(env_file: str | None): """ CLI for Cocoindex. """ + loaded_env_path = None + dotenv_path = env_file or os.path.join(os.getcwd(), ".env") + + if load_dotenv(dotenv_path=dotenv_path, override=True): + loaded_env_path = os.path.abspath(dotenv_path) + click.echo(f"Loaded environment variables from: {loaded_env_path}", err=True) + try: settings = setting.Settings.from_env() lib.init(settings) From 3c66582b976a863f28f880b7c1a80214ab30420d Mon Sep 17 00:00:00 2001 From: lemorage Date: Tue, 20 May 2025 08:11:22 +0200 Subject: [PATCH 7/7] feat: use `find_dotenv` for better path resolution --- python/cocoindex/cli.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 090295ec..29a6e134 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -6,7 +6,7 @@ import atexit import types -from dotenv import load_dotenv +from dotenv import load_dotenv, find_dotenv from rich.console import Console from rich.table import Table @@ -120,8 +120,7 @@ def cli(env_file: str | None): """ CLI for Cocoindex. """ - loaded_env_path = None - dotenv_path = env_file or os.path.join(os.getcwd(), ".env") + dotenv_path = env_file or find_dotenv(usecwd=True) if load_dotenv(dotenv_path=dotenv_path): loaded_env_path = os.path.abspath(dotenv_path)