diff --git a/pyproject.toml b/pyproject.toml index 13a83091..ac0dab29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,10 +9,13 @@ description = "With CocoIndex, users declare the transformation, CocoIndex creat authors = [{ name = "CocoIndex", email = "cocoindex.io@gmail.com" }] readme = "README.md" requires-python = ">=3.11" -dependencies = ["sentence-transformers>=3.3.1", "click>=8.1.8", "rich>=14.0.0"] +dependencies = ["sentence-transformers>=3.3.1", "click>=8.1.8", "rich>=14.0.0", "python-dotenv>=1.1.0"] license = "Apache-2.0" urls = { Homepage = "https://cocoindex.io/" } +[project.scripts] +cocoindex = "cocoindex.cli:cli" + [tool.maturin] bindings = "pyo3" python-source = "python" diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 20b27426..29a6e134 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -1,89 +1,234 @@ import click import datetime +import sys +import importlib.util +import os +import atexit +import types +from dotenv import load_dotenv, find_dotenv from rich.console import Console from rich.table import Table -from . import flow, lib, setting +from . import flow, lib, setting, query from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes +# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc. +COCOINDEX_HOST = 'https://cocoindex.io' + +def _parse_app_flow_specifier(specifier: str) -> tuple[str, str | None]: + """Parses 'module_or_path[:flow_name]' into (module_or_path, flow_name | None).""" + parts = specifier.split(":", 1) # Split only on the first colon + app_ref = parts[0] + + if not app_ref: + raise click.BadParameter( + f"Application module/path part is missing or invalid in specifier: '{specifier}'. " + "Expected format like 'myapp.py' or 'myapp:MyFlow'.", + param_hint="APP_SPECIFIER" + ) + + if len(parts) == 1: + return app_ref, None + + flow_ref_part = parts[1] + + if not flow_ref_part: # Handles empty string after colon + return app_ref, None + + if not flow_ref_part.isidentifier(): + raise click.BadParameter( + f"Invalid format for flow name part ('{flow_ref_part}') in specifier '{specifier}'. " + "If a colon separates the application from the flow name, the flow name should typically be " + "a valid identifier (e.g., alphanumeric with underscores, not starting with a number).", + param_hint="APP_SPECIFIER" + ) + return app_ref, flow_ref_part + +def _get_app_ref_from_specifier( + specifier: str, +) -> str: + """ + Parses the APP_TARGET to get the application reference (path or module). + Issues a warning if a flow name component is also provided in it. + """ + app_ref, flow_ref = _parse_app_flow_specifier(specifier) + + if flow_ref is not None: + click.echo( + click.style( + f"Ignoring flow name '{flow_ref}' in '{specifier}': " + f"this command operates on the entire app/module '{app_ref}'.", + fg='yellow' + ), + err=True + ) + return app_ref + +def _load_user_app(app_target: str) -> types.ModuleType: + """ + Loads the user's application, which can be a file path or an installed module name. + Exits on failure. + """ + if not app_target: + raise click.ClickException("Application target not provided.") + + looks_like_path = os.sep in app_target or app_target.lower().endswith(".py") + + if looks_like_path: + if not os.path.isfile(app_target): + raise click.ClickException(f"Application file path not found: {app_target}") + app_path = os.path.abspath(app_target) + app_dir = os.path.dirname(app_path) + module_name = os.path.splitext(os.path.basename(app_path))[0] + + if app_dir not in sys.path: + sys.path.insert(0, app_dir) + try: + spec = importlib.util.spec_from_file_location(module_name, app_path) + if spec is None: + raise ImportError(f"Could not create spec for file: {app_path}") + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + except (ImportError, FileNotFoundError, PermissionError) as e: + raise click.ClickException(f"Failed importing file '{app_path}': {e}") + finally: + if app_dir in sys.path and sys.path[0] == app_dir: + sys.path.pop(0) + + # Try as module + try: + return importlib.import_module(app_target) + except ImportError as e: + raise click.ClickException(f"Failed to load module '{app_target}': {e}") + except Exception as e: + raise click.ClickException(f"Unexpected error importing module '{app_target}': {e}") + @click.group() -def cli(): +@click.version_option(package_name="cocoindex", message="%(prog)s version %(version)s") +@click.option( + "--env-file", + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True), + help="Path to a .env file to load environment variables from. " + "If not provided, attempts to load '.env' from the current directory.", + default=None, + show_default=False +) +def cli(env_file: str | None): """ CLI for Cocoindex. """ + dotenv_path = env_file or find_dotenv(usecwd=True) + + if load_dotenv(dotenv_path=dotenv_path): + loaded_env_path = os.path.abspath(dotenv_path) + click.echo(f"Loaded environment variables from: {loaded_env_path}", err=True) + + try: + settings = setting.Settings.from_env() + lib.init(settings) + atexit.register(lib.stop) + except Exception as e: + raise click.ClickException(f"Failed to initialize CocoIndex library: {e}") @cli.command() -@click.option( - "-a", "--all", "show_all", is_flag=True, show_default=True, default=False, - help="Also show all flows with persisted setup under the current app namespace, even if not defined in the current process.") -def ls(show_all: bool): +@click.argument("app_target", type=str, required=False) +def ls(app_target: str | None): """ List all flows. + + If APP_TARGET (path/to/app.py or a module) is provided, lists flows + defined in the app and their backend setup status. + + If APP_TARGET is omitted, lists all flows that have a persisted + setup in the backend. """ - current_flow_names = flow.flow_names() - persisted_flow_names = flow_names_with_setup() - remaining_persisted_flow_names = set(persisted_flow_names) + if app_target: + app_ref = _get_app_ref_from_specifier(app_target) + _load_user_app(app_ref) - has_missing_setup = False - has_extra_setup = False + current_flow_names = set(flow.flow_names()) + persisted_flow_names = set(flow_names_with_setup()) - for name in current_flow_names: - if name in remaining_persisted_flow_names: - remaining_persisted_flow_names.remove(name) - suffix = '' - else: - suffix = ' [+]' - has_missing_setup = True - click.echo(f'{name}{suffix}') + if not current_flow_names: + click.echo(f"No flows are defined in '{app_ref}'.") + return - if show_all: - for name in persisted_flow_names: - if name in remaining_persisted_flow_names: - click.echo(f'{name} [?]') - has_extra_setup = True - - if has_missing_setup or has_extra_setup: - click.echo('') - click.echo('Notes:') - if has_missing_setup: + has_missing = False + for name in sorted(current_flow_names): + if name in persisted_flow_names: + click.echo(name) + else: + click.echo(f"{name} [+]") + has_missing = True + + if has_missing: + click.echo('') + click.echo('Notes:') click.echo(' [+]: Flows present in the current process, but missing setup.') - if has_extra_setup: - click.echo(' [?]: Flows with persisted setup, but not in the current process.') + + else: + persisted_flow_names = sorted(flow_names_with_setup()) + + if not persisted_flow_names: + click.echo("No persisted flow setups found in the backend.") + return + + for name in persisted_flow_names: + click.echo(name) @cli.command() -@click.argument("flow_name", type=str, required=False) +@click.argument("app_flow_specifier", type=str) @click.option("--color/--no-color", default=True, help="Enable or disable colored output.") @click.option("--verbose", is_flag=True, help="Show verbose output with full details.") -def show(flow_name: str | None, color: bool, verbose: bool): +def show(app_flow_specifier: str, color: bool, verbose: bool): """ - Show the flow spec and schema in a readable format with colored output. + Show the flow spec and schema. + + APP_FLOW_SPECIFIER: Specifies the application and optionally the target flow. + Can be one of the following formats: + + \b + - path/to/your_app.py + - an_installed.module_name + - path/to/your_app.py:SpecificFlowName + - an_installed.module_name:SpecificFlowName + + :SpecificFlowName can be omitted only if the application defines a single flow. """ - flow = _flow_by_name(flow_name) - console = Console(no_color=not color) - console.print(flow._render_spec(verbose=verbose)) + app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) + _load_user_app(app_ref) + fl = _flow_by_name(flow_ref) + console = Console(no_color=not color) + console.print(fl._render_spec(verbose=verbose)) console.print() table = Table( - title=f"Schema for Flow: {flow.full_name}", - show_header=True, + title=f"Schema for Flow: {fl.name}", + title_style="cyan", header_style="bold magenta" ) table.add_column("Field", style="cyan") table.add_column("Type", style="green") table.add_column("Attributes", style="yellow") - - for field_name, field_type, attr_str in flow._get_schema(): + for field_name, field_type, attr_str in fl._get_schema(): table.add_row(field_name, field_type, attr_str) - console.print(table) @cli.command() -def setup(): +@click.argument("app_target", type=str) +def setup(app_target: str): """ Check and apply backend setup changes for flows, including the internal and target storage (to export). + + APP_TARGET: path/to/app.py or installed_module. """ + app_ref = _get_app_ref_from_specifier(app_target) + _load_user_app(app_ref) + setup_status = sync_setup() click.echo(setup_status) if setup_status.is_up_to_date(): @@ -94,80 +239,125 @@ def setup(): return apply_setup_changes(setup_status) -@cli.command() +@cli.command("drop") +@click.argument("app_target", type=str, required=False) @click.argument("flow_name", type=str, nargs=-1) @click.option( "-a", "--all", "drop_all", is_flag=True, show_default=True, default=False, help="Drop the backend setup for all flows with persisted setup, " - "even if not defined in the current process.") -def drop(flow_name: tuple[str, ...], drop_all: bool): + "even if not defined in the current process." + "If used, APP_TARGET and any listed flow names are ignored.") +def drop(app_target: str | None, flow_name: tuple[str, ...], drop_all: bool): """ - Drop the backend setup for specified flows. - If no flow is specified, all flows defined in the current process will be dropped. + Drop the backend setup for flows. + + Modes of operation:\n + 1. Drop ALL persisted setups: `cocoindex drop --all`\n + 2. Drop all flows defined in an app: `cocoindex drop `\n + 3. Drop specific named flows: `cocoindex drop [FLOW_NAME...]` """ + app_ref = None + flow_names = [] + if drop_all: + if app_target or flow_name: + click.echo("Warning: When --all is used, APP_TARGET and any individual flow names are ignored.", err=True) flow_names = flow_names_with_setup() - elif len(flow_name) == 0: - flow_names = flow.flow_names() + elif app_target: + app_ref = _get_app_ref_from_specifier(app_target) + _load_user_app(app_ref) + if flow_name: + flow_names = list(flow_name) + click.echo(f"Preparing to drop specified flows: {', '.join(flow_names)} (in '{app_ref}').", err=True) + else: + flow_names = [fl.name for fl in flow.flows()] + if not flow_names: + click.echo(f"No flows found defined in '{app_ref}' to drop.") + return + click.echo(f"Preparing to drop all flows defined in '{app_ref}': {', '.join(flow_names)}.", err=True) else: - flow_names = list(flow_name) + raise click.UsageError( + "Missing arguments. You must either provide an APP_TARGET (to target app-specific flows) " + "or use the --all flag." + ) + + if not flow_names: + click.echo("No flows identified for the drop operation.") + return + setup_status = drop_setup(flow_names) click.echo(setup_status) if setup_status.is_up_to_date(): click.echo("No flows need to be dropped.") return if not click.confirm( - "Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False): + f"\nThis will apply changes to drop setup for: {', '.join(flow_names)}. Continue? [yes/N]", + default=False, show_default=False): + click.echo("Drop operation aborted by user.") return apply_setup_changes(setup_status) @cli.command() -@click.argument("flow_name", type=str, required=False) +@click.argument("app_flow_specifier", type=str) @click.option( "-L", "--live", is_flag=True, show_default=True, default=False, help="Continuously watch changes from data sources and apply to the target index.") @click.option( "-q", "--quiet", is_flag=True, show_default=True, default=False, help="Avoid printing anything to the standard output, e.g. statistics.") -def update(flow_name: str | None, live: bool, quiet: bool): +def update(app_flow_specifier: str, live: bool, quiet: bool): """ Update the index to reflect the latest data from data sources. + + APP_FLOW_SPECIFIER: path/to/app.py, module, path/to/app.py:FlowName, or module:FlowName. + If :FlowName is omitted, updates all flows. """ + app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) + _load_user_app(app_ref) + options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet) - if flow_name is None: + if flow_ref is None: return flow.update_all_flows(options) else: - with flow.FlowLiveUpdater(_flow_by_name(flow_name), options) as updater: + with flow.FlowLiveUpdater(_flow_by_name(flow_ref), options) as updater: updater.wait() return updater.update_stats() @cli.command() -@click.argument("flow_name", type=str, required=False) +@click.argument("app_flow_specifier", type=str) @click.option( "-o", "--output-dir", type=str, required=False, help="The directory to dump the output to.") @click.option( "--cache/--no-cache", is_flag=True, show_default=True, default=True, - help="Use already-cached intermediate data if available. " - "Note that we only reuse existing cached data without updating the cache " - "even if it's turned on.") -def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True): + help="Use already-cached intermediate data if available.") +def evaluate(app_flow_specifier: str, output_dir: str | None, cache: bool = True): """ Evaluate the flow and dump flow outputs to files. Instead of updating the index, it dumps what should be indexed to files. Mainly used for evaluation purpose. + + APP_FLOW_SPECIFIER: Specifies the application and optionally the target flow. + Can be one of the following formats:\n + - path/to/your_app.py\n + - an_installed.module_name\n + - path/to/your_app.py:SpecificFlowName\n + - an_installed.module_name:SpecificFlowName + + :SpecificFlowName can be omitted only if the application defines a single flow. """ - fl = _flow_by_name(flow_name) + app_ref, flow_ref = _parse_app_flow_specifier(app_flow_specifier) + _load_user_app(app_ref) + + fl = _flow_by_name(flow_ref) if output_dir is None: output_dir = f"eval_{setting.get_app_namespace(trailing_delimiter='_')}{flow_name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache) fl.evaluate_and_dump(options) -# Create ServerSettings lazily upon first call, as environment variables may be loaded from files, etc. -COCOINDEX_HOST = 'https://cocoindex.io' - @cli.command() +@click.argument("app_target", type=str) @click.option( "-a", "--address", type=str, help="The address to bind the server to, in the format of IP:PORT. " @@ -190,13 +380,18 @@ def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True): @click.option( "-q", "--quiet", is_flag=True, show_default=True, default=False, help="Avoid printing anything to the standard output, e.g. statistics.") -def server(address: str | None, live_update: bool, quiet: bool, cors_origin: str | None, - cors_cocoindex: bool, cors_local: int | None): +def server(app_target: str, address: str | None, live_update: bool, quiet: bool, + cors_origin: str | None, cors_cocoindex: bool, cors_local: int | None): """ Start a HTTP server providing REST APIs. It will allow tools like CocoInsight to access the server. + + APP_TARGET: path/to/app.py or installed_module. """ + app_ref = _get_app_ref_from_specifier(app_target) + _load_user_app(app_ref) + server_settings = setting.ServerSettings.from_env() cors_origins: set[str] = set(server_settings.cors_origins or []) if cors_origin is not None: @@ -223,16 +418,20 @@ def server(address: str | None, live_update: bool, quiet: bool, cors_origin: str def _flow_name(name: str | None) -> str: names = flow.flow_names() + available = ', '.join(sorted(names)) if name is not None: if name not in names: - raise click.BadParameter(f"Flow {name} not found") + raise click.BadParameter(f"Flow '{name}' not found.\nAvailable: {available if names else 'None'}") return name if len(names) == 0: - raise click.UsageError("No flows available") + raise click.UsageError("No flows available in the loaded application.") elif len(names) == 1: return names[0] else: - raise click.UsageError("Multiple flows available, please specify --name") + raise click.UsageError(f"Multiple flows available, please specify which flow to target by appending :FlowName to the APP_TARGET.\nAvailable: {available}") def _flow_by_name(name: str | None) -> flow.Flow: return flow.flow_by_name(_flow_name(name)) + +if __name__ == "__main__": + cli() \ No newline at end of file diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index ba170789..db53004f 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -489,7 +489,7 @@ def _render_spec(self, verbose: bool = False) -> Tree: tree = Tree(f"Flow: {self.full_name}", style="cyan") def build_tree(label: str, lines: list): - node = Tree(label, style="bold magenta" if lines else "cyan") + node = Tree(label=label if lines else label + " None", style="cyan") for line in lines: child_node = node.add(Text(line.content, style="yellow")) child_node.children = build_tree("", line.children).children diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index 1ef426ad..85344f00 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -1,14 +1,10 @@ """ Library level functions and states. """ -import sys -import functools -import inspect +import warnings +from typing import Callable, Any -from typing import Callable - -from . import _engine -from . import flow, query, cli, setting +from . import _engine, flow, query, setting from .convert import dump_engine_object @@ -29,50 +25,42 @@ def stop(): _engine.stop() def main_fn( - settings: setting.Settings | None = None, - cocoindex_cmd: str = 'cocoindex', + settings: Any | None = None, + cocoindex_cmd: str | None = None, ) -> Callable[[Callable], Callable]: """ - A decorator to wrap the main function. - If the python binary is called with the given command, it yields control to the cocoindex CLI. + DEPRECATED: The @cocoindex.main_fn() decorator is obsolete and has no effect. + It will be removed in a future version, which will cause an AttributeError. - If the settings are not provided, they are loaded from the environment variables. + Please remove this decorator from your code and use the standalone 'cocoindex' CLI. + See the updated CLI usage examples in the warning message. """ - - def _pre_init() -> None: - effective_settings = settings or setting.Settings.from_env() - init(effective_settings) - - def _should_run_cli() -> bool: - return len(sys.argv) > 1 and sys.argv[1] == cocoindex_cmd - - def _run_cli(): - return cli.cli.main(sys.argv[2:], prog_name=f"{sys.argv[0]} {sys.argv[1]}") + warnings.warn( + "\n\n" + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n" + "CRITICAL DEPRECATION NOTICE from CocoIndex:\n" + "The @cocoindex.main_fn() decorator in your script is DEPRECATED and IGNORED.\n" + "It provides NO functionality and will be REMOVED entirely in a future version.\n" + "If not removed, your script will FAIL with an AttributeError in the future.\n\n" + "ACTION REQUIRED: Please REMOVE @cocoindex.main_fn() from your Python script.\n\n" + "To use CocoIndex, invoke the standalone 'cocoindex' CLI." + " Examples of new CLI usage:\n\n" + " To list flows from 'main.py' (previously 'python main.py cocoindex ls'):\n" + " cocoindex ls main.py\n\n" + " To list all persisted flows (previously 'python main.py cocoindex ls --all'):\n" + " cocoindex ls\n\n" + " To show 'MyFlow' defined in 'main.py' (previously 'python main.py cocoindex show MyFlow'):\n" + " cocoindex show main.py:MyFlow\n\n" + " To update all flows in 'my_package.flows_module':\n" + " cocoindex update my_package.flows_module\n\n" + " To update 'SpecificFlow' in 'my_package.flows_module':\n" + " cocoindex update my_package.flows_module:SpecificFlow\n\n" + "See cocoindex --help for more details.\n" + "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n", + DeprecationWarning, + stacklevel=2 + ) def _main_wrapper(fn: Callable) -> Callable: - if inspect.iscoroutinefunction(fn): - @functools.wraps(fn) - async def _inner(*args, **kwargs): - _pre_init() - try: - if _should_run_cli(): - # Schedule to a separate thread as it invokes nested event loop. - # return await asyncio.to_thread(_run_cli) - return _run_cli() - return await fn(*args, **kwargs) - finally: - stop() - return _inner - else: - @functools.wraps(fn) - def _inner(*args, **kwargs): - _pre_init() - try: - if _should_run_cli(): - return _run_cli() - return fn(*args, **kwargs) - finally: - stop() - return _inner - + return fn return _main_wrapper