From 93d2496d5cf69877525ed70eda488fe70732478a Mon Sep 17 00:00:00 2001 From: LJ Date: Fri, 16 May 2025 12:11:21 -0700 Subject: [PATCH] feat(app-namespace): support app namespace --- examples/docs_to_knowledge_graph/.env | 1 + python/cocoindex/cli.py | 6 ++--- python/cocoindex/flow.py | 34 +++++++++++++++++---------- python/cocoindex/lib.py | 1 + python/cocoindex/setting.py | 25 +++++++++++++++++++- python/cocoindex/setup.py | 10 ++++++-- src/execution/db_tracking_setup.rs | 9 ++++--- src/ops/storages/postgres.rs | 12 ++++++---- src/prelude.rs | 8 +++---- src/utils/db.rs | 12 ++++++++++ 10 files changed, 85 insertions(+), 33 deletions(-) diff --git a/examples/docs_to_knowledge_graph/.env b/examples/docs_to_knowledge_graph/.env index 335f3060..2ff10b65 100644 --- a/examples/docs_to_knowledge_graph/.env +++ b/examples/docs_to_knowledge_graph/.env @@ -1,2 +1,3 @@ # Postgres database address for cocoindex COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex +COCOINDEX_APP_NAMESPACE=Dev0 diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index fc82be92..69def96c 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -65,7 +65,7 @@ def show(flow_name: str | None, color: bool, verbose: bool): console.print() table = Table( - title=f"Schema for Flow: {flow.name}", + title=f"Schema for Flow: {flow.full_name}", show_header=True, header_style="bold magenta" ) @@ -108,7 +108,7 @@ def drop(flow_name: tuple[str, ...], drop_all: bool): if drop_all: flow_names = flow_names_with_setup() elif len(flow_name) == 0: - flow_names = [fl.name for fl in flow.flows()] + flow_names = flow.flow_names() else: flow_names = list(flow_name) setup_status = drop_setup(flow_names) @@ -160,7 +160,7 @@ def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True): """ fl = _flow_by_name(flow_name) if output_dir is None: - output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" + output_dir = f"eval_{setting.get_app_namespace(trailing_delimiter='_')}{flow_name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache) fl.evaluate_and_dump(options) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index f85f77b7..79382d41 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -19,6 +19,7 @@ from . import _engine from . import index from . import op +from . import setting from .convert import dump_engine_object from .typing import encode_enriched_type from .runtime import execution_context @@ -310,7 +311,7 @@ class _FlowBuilderState: def __init__(self, /, name: str | None = None): flow_name = _flow_name_builder.build_name(name, prefix="_flow_") - self.engine_flow_builder = _engine.FlowBuilder(flow_name) + self.engine_flow_builder = _engine.FlowBuilder(get_full_flow_name(flow_name)) self.field_name_builder = _NameBuilder() def get_data_slice(self, v: Any) -> _engine.DataSlice: @@ -481,7 +482,7 @@ def _render_spec(self, verbose: bool = False) -> Tree: Render the flow spec as a styled rich Tree with hierarchical structure. """ spec = self._get_spec(verbose=verbose) - tree = Tree(f"Flow: {self.name}", style="cyan") + tree = Tree(f"Flow: {self.full_name}", style="cyan") def build_tree(label: str, lines: list): node = Tree(label, style="bold magenta" if lines else "cyan") @@ -508,9 +509,9 @@ def __repr__(self): return repr(self._lazy_engine_flow()) @property - def name(self) -> str: + def full_name(self) -> str: """ - Get the name of the flow. + Get the full name of the flow. """ return self._lazy_engine_flow().name() @@ -566,8 +567,16 @@ def _create_engine_flow() -> _engine.Flow: _flows_lock = Lock() _flows: dict[str, Flow] = {} +def get_full_flow_name(name: str) -> str: + """ + Get the full name of a flow. + """ + return f"{setting.get_app_namespace(trailing_delimiter='.')}{name}" + def add_flow_def(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow: """Add a flow definition to the cocoindex library.""" + if not all(c.isalnum() or c == '_' for c in name): + raise ValueError(f"Flow name '{name}' contains invalid characters. Only alphanumeric characters and underscores are allowed.") with _flows_lock: if name in _flows: raise KeyError(f"Flow with name {name} already exists") @@ -587,12 +596,12 @@ def flow_names() -> list[str]: with _flows_lock: return list(_flows.keys()) -def flows() -> list[Flow]: +def flows() -> dict[str, Flow]: """ Get all flows. """ with _flows_lock: - return list(_flows.values()) + return dict(_flows) def flow_by_name(name: str) -> Flow: """ @@ -605,14 +614,13 @@ def ensure_all_flows_built() -> None: """ Ensure all flows are built. """ - for fl in flows(): - fl.internal_flow() + execution_context.run(ensure_all_flows_built_async()) async def ensure_all_flows_built_async() -> None: """ Ensure all flows are built. """ - for fl in flows(): + for fl in flows().values(): await fl.internal_flow_async() def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]: @@ -626,13 +634,13 @@ async def update_all_flows_async(options: FlowLiveUpdaterOptions) -> dict[str, _ Update all flows. """ await ensure_all_flows_built_async() - async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo: + async def _update_flow(name: str, fl: Flow) -> tuple[str, _engine.IndexUpdateInfo]: async with FlowLiveUpdater(fl, options) as updater: await updater.wait_async() - return updater.update_stats() + return (name, updater.update_stats()) fls = flows() - all_stats = await asyncio.gather(*(_update_flow(fl) for fl in fls)) - return {fl.name: stats for fl, stats in zip(fls, all_stats)} + all_stats = await asyncio.gather(*(_update_flow(name, fl) for (name, fl) in fls.items())) + return dict(all_stats) _transient_flow_name_builder = _NameBuilder() class TransientFlow: diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index c83627f7..1ef426ad 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -15,6 +15,7 @@ def init(settings: setting.Settings): """Initialize the cocoindex library.""" _engine.init(dump_engine_object(settings)) + setting.set_app_namespace(settings.app_namespace) def start_server(settings: setting.ServerSettings): diff --git a/python/cocoindex/setting.py b/python/cocoindex/setting.py index 00c8545a..bbdb87fd 100644 --- a/python/cocoindex/setting.py +++ b/python/cocoindex/setting.py @@ -6,6 +6,25 @@ from typing import Callable, Self, Any, overload from dataclasses import dataclass +_app_namespace: str = '' + +def get_app_namespace(*, trailing_delimiter: str | None = None) -> str: + """Get the application namespace. Append the `trailing_delimiter` if not empty.""" + if _app_namespace == '' or trailing_delimiter is None: + return _app_namespace + return f'{_app_namespace}{trailing_delimiter}' + +def split_app_namespace(full_name: str, delimiter: str) -> tuple[str, str]: + """Split the full name into the application namespace and the rest.""" + parts = full_name.split(delimiter, 1) + if len(parts) == 1: + return '', parts[0] + return (parts[0], parts[1]) + +def set_app_namespace(app_namespace: str): + """Set the application namespace.""" + global _app_namespace # pylint: disable=global-statement + _app_namespace = app_namespace @dataclass class DatabaseConnectionSpec: @@ -30,6 +49,7 @@ def _load_field(target: dict[str, Any], name: str, env_name: str, required: bool class Settings: """Settings for the cocoindex library.""" database: DatabaseConnectionSpec + app_namespace: str @classmethod def from_env(cls) -> Self: @@ -40,7 +60,10 @@ def from_env(cls) -> Self: _load_field(db_kwargs, "user", "COCOINDEX_DATABASE_USER") _load_field(db_kwargs, "password", "COCOINDEX_DATABASE_PASSWORD") database = DatabaseConnectionSpec(**db_kwargs) - return cls(database=database) + + app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", '') + + return cls(database=database, app_namespace=app_namespace) @dataclass class ServerSettings: diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index 0e0d20c1..9c8e5e62 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -1,4 +1,5 @@ from . import flow +from . import setting from . import _engine def sync_setup() -> _engine.SetupStatus: @@ -7,10 +8,15 @@ def sync_setup() -> _engine.SetupStatus: def drop_setup(flow_names: list[str]) -> _engine.SetupStatus: flow.ensure_all_flows_built() - return _engine.drop_setup(flow_names) + return _engine.drop_setup([flow.get_full_flow_name(name) for name in flow_names]) def flow_names_with_setup() -> list[str]: - return _engine.flow_names_with_setup() + result = [] + for name in _engine.flow_names_with_setup(): + app_namespace, name = setting.split_app_namespace(name, '.') + if app_namespace == setting.get_app_namespace(): + result.append(name) + return result def apply_setup_changes(setup_status: _engine.SetupStatus): _engine.apply_setup_changes(setup_status) diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index 25738c52..1aaa0a0e 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -5,11 +5,10 @@ use serde::{Deserialize, Serialize}; use sqlx::PgPool; pub fn default_tracking_table_name(flow_name: &str) -> String { - let sanitized_name = flow_name - .chars() - .map(|c| if c.is_alphanumeric() { c } else { '_' }) - .collect::(); - format!("{}__cocoindex_tracking", sanitized_name) + format!( + "{}__cocoindex_tracking", + utils::db::sanitize_identifier(flow_name) + ) } pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1; diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 6e630a20..4b6c8d86 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -11,8 +11,8 @@ use futures::FutureExt; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; use serde::Serialize; -use sqlx::postgres::types::PgRange; use sqlx::postgres::PgRow; +use sqlx::postgres::types::PgRange; use sqlx::{PgPool, Row}; use std::ops::Bound; use uuid::Uuid; @@ -934,10 +934,12 @@ impl StorageFactoryBase for Factory { .map(|d| { let table_id = TableId { database: d.spec.database.clone(), - table_name: d - .spec - .table_name - .unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, d.name)), + table_name: d.spec.table_name.unwrap_or_else(|| { + utils::db::sanitize_identifier(&format!( + "{}__{}", + context.flow_instance_name, d.name + )) + }), }; let setup_state = SetupState::new( &table_id, diff --git a/src/prelude.rs b/src/prelude.rs index 0ae8d03a..1edc74d2 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -3,15 +3,15 @@ pub(crate) use anyhow::{Context, Result}; pub(crate) use async_trait::async_trait; pub(crate) use chrono::{DateTime, Utc}; +pub(crate) use futures::{FutureExt, StreamExt}; pub(crate) use futures::{ future::{BoxFuture, Shared}, prelude::*, stream::BoxStream, }; -pub(crate) use futures::{FutureExt, StreamExt}; pub(crate) use indexmap::{IndexMap, IndexSet}; pub(crate) use itertools::Itertools; -pub(crate) use serde::{de::DeserializeOwned, Deserialize, Serialize}; +pub(crate) use serde::{Deserialize, Serialize, de::DeserializeOwned}; pub(crate) use std::any::Any; pub(crate) use std::borrow::Cow; pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; @@ -21,11 +21,11 @@ pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak}; pub(crate) use crate::base::{self, schema, spec, value}; pub(crate) use crate::builder::{self, plan}; pub(crate) use crate::execution; -pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext}; +pub(crate) use crate::lib_context::{FlowContext, LibContext, get_lib_context, get_runtime}; pub(crate) use crate::ops::interface; pub(crate) use crate::service::error::ApiError; pub(crate) use crate::setup::AuthRegistry; -pub(crate) use crate::utils::retryable; +pub(crate) use crate::utils::{self, retryable}; pub(crate) use crate::{api_bail, api_error}; diff --git a/src/utils/db.rs b/src/utils/db.rs index 033702d2..0c643c2d 100644 --- a/src/utils/db.rs +++ b/src/utils/db.rs @@ -31,3 +31,15 @@ pub enum WriteAction { Insert, Update, } + +pub fn sanitize_identifier(s: &str) -> String { + let mut result = String::new(); + for c in s.chars() { + if c.is_alphanumeric() || c == '_' { + result.push(c); + } else { + result.push_str("__"); + } + } + result +}