Skip to content

feat(app-namespace): support app namespace #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/docs_to_knowledge_graph/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Postgres database address for cocoindex
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
COCOINDEX_APP_NAMESPACE=Dev0
6 changes: 3 additions & 3 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def show(flow_name: str | None, color: bool, verbose: bool):

console.print()
table = Table(
title=f"Schema for Flow: {flow.name}",
title=f"Schema for Flow: {flow.full_name}",
show_header=True,
header_style="bold magenta"
)
Expand Down Expand Up @@ -108,7 +108,7 @@ def drop(flow_name: tuple[str, ...], drop_all: bool):
if drop_all:
flow_names = flow_names_with_setup()
elif len(flow_name) == 0:
flow_names = [fl.name for fl in flow.flows()]
flow_names = flow.flow_names()
else:
flow_names = list(flow_name)
setup_status = drop_setup(flow_names)
Expand Down Expand Up @@ -160,7 +160,7 @@ def evaluate(flow_name: str | None, output_dir: str | None, cache: bool = True):
"""
fl = _flow_by_name(flow_name)
if output_dir is None:
output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
output_dir = f"eval_{setting.get_app_namespace(trailing_delimiter='_')}{flow_name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=cache)
fl.evaluate_and_dump(options)

Expand Down
34 changes: 21 additions & 13 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from . import _engine
from . import index
from . import op
from . import setting
from .convert import dump_engine_object
from .typing import encode_enriched_type
from .runtime import execution_context
Expand Down Expand Up @@ -310,7 +311,7 @@ class _FlowBuilderState:

def __init__(self, /, name: str | None = None):
flow_name = _flow_name_builder.build_name(name, prefix="_flow_")
self.engine_flow_builder = _engine.FlowBuilder(flow_name)
self.engine_flow_builder = _engine.FlowBuilder(get_full_flow_name(flow_name))
self.field_name_builder = _NameBuilder()

def get_data_slice(self, v: Any) -> _engine.DataSlice:
Expand Down Expand Up @@ -481,7 +482,7 @@ def _render_spec(self, verbose: bool = False) -> Tree:
Render the flow spec as a styled rich Tree with hierarchical structure.
"""
spec = self._get_spec(verbose=verbose)
tree = Tree(f"Flow: {self.name}", style="cyan")
tree = Tree(f"Flow: {self.full_name}", style="cyan")

def build_tree(label: str, lines: list):
node = Tree(label, style="bold magenta" if lines else "cyan")
Expand All @@ -508,9 +509,9 @@ def __repr__(self):
return repr(self._lazy_engine_flow())

@property
def name(self) -> str:
def full_name(self) -> str:
"""
Get the name of the flow.
Get the full name of the flow.
"""
return self._lazy_engine_flow().name()

Expand Down Expand Up @@ -566,8 +567,16 @@ def _create_engine_flow() -> _engine.Flow:
_flows_lock = Lock()
_flows: dict[str, Flow] = {}

def get_full_flow_name(name: str) -> str:
"""
Get the full name of a flow.
"""
return f"{setting.get_app_namespace(trailing_delimiter='.')}{name}"

def add_flow_def(name: str, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
"""Add a flow definition to the cocoindex library."""
if not all(c.isalnum() or c == '_' for c in name):
raise ValueError(f"Flow name '{name}' contains invalid characters. Only alphanumeric characters and underscores are allowed.")
with _flows_lock:
if name in _flows:
raise KeyError(f"Flow with name {name} already exists")
Expand All @@ -587,12 +596,12 @@ def flow_names() -> list[str]:
with _flows_lock:
return list(_flows.keys())

def flows() -> list[Flow]:
def flows() -> dict[str, Flow]:
"""
Get all flows.
"""
with _flows_lock:
return list(_flows.values())
return dict(_flows)

def flow_by_name(name: str) -> Flow:
"""
Expand All @@ -605,14 +614,13 @@ def ensure_all_flows_built() -> None:
"""
Ensure all flows are built.
"""
for fl in flows():
fl.internal_flow()
execution_context.run(ensure_all_flows_built_async())

async def ensure_all_flows_built_async() -> None:
"""
Ensure all flows are built.
"""
for fl in flows():
for fl in flows().values():
await fl.internal_flow_async()

def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
Expand All @@ -626,13 +634,13 @@ async def update_all_flows_async(options: FlowLiveUpdaterOptions) -> dict[str, _
Update all flows.
"""
await ensure_all_flows_built_async()
async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo:
async def _update_flow(name: str, fl: Flow) -> tuple[str, _engine.IndexUpdateInfo]:
async with FlowLiveUpdater(fl, options) as updater:
await updater.wait_async()
return updater.update_stats()
return (name, updater.update_stats())
fls = flows()
all_stats = await asyncio.gather(*(_update_flow(fl) for fl in fls))
return {fl.name: stats for fl, stats in zip(fls, all_stats)}
all_stats = await asyncio.gather(*(_update_flow(name, fl) for (name, fl) in fls.items()))
return dict(all_stats)

_transient_flow_name_builder = _NameBuilder()
class TransientFlow:
Expand Down
1 change: 1 addition & 0 deletions python/cocoindex/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
def init(settings: setting.Settings):
"""Initialize the cocoindex library."""
_engine.init(dump_engine_object(settings))
setting.set_app_namespace(settings.app_namespace)


def start_server(settings: setting.ServerSettings):
Expand Down
25 changes: 24 additions & 1 deletion python/cocoindex/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@
from typing import Callable, Self, Any, overload
from dataclasses import dataclass

_app_namespace: str = ''

def get_app_namespace(*, trailing_delimiter: str | None = None) -> str:
"""Get the application namespace. Append the `trailing_delimiter` if not empty."""
if _app_namespace == '' or trailing_delimiter is None:
return _app_namespace
return f'{_app_namespace}{trailing_delimiter}'

def split_app_namespace(full_name: str, delimiter: str) -> tuple[str, str]:
"""Split the full name into the application namespace and the rest."""
parts = full_name.split(delimiter, 1)
if len(parts) == 1:
return '', parts[0]
return (parts[0], parts[1])

def set_app_namespace(app_namespace: str):
"""Set the application namespace."""
global _app_namespace # pylint: disable=global-statement
_app_namespace = app_namespace

@dataclass
class DatabaseConnectionSpec:
Expand All @@ -30,6 +49,7 @@ def _load_field(target: dict[str, Any], name: str, env_name: str, required: bool
class Settings:
"""Settings for the cocoindex library."""
database: DatabaseConnectionSpec
app_namespace: str

@classmethod
def from_env(cls) -> Self:
Expand All @@ -40,7 +60,10 @@ def from_env(cls) -> Self:
_load_field(db_kwargs, "user", "COCOINDEX_DATABASE_USER")
_load_field(db_kwargs, "password", "COCOINDEX_DATABASE_PASSWORD")
database = DatabaseConnectionSpec(**db_kwargs)
return cls(database=database)

app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", '')

return cls(database=database, app_namespace=app_namespace)

@dataclass
class ServerSettings:
Expand Down
10 changes: 8 additions & 2 deletions python/cocoindex/setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import flow
from . import setting
from . import _engine

def sync_setup() -> _engine.SetupStatus:
Expand All @@ -7,10 +8,15 @@ def sync_setup() -> _engine.SetupStatus:

def drop_setup(flow_names: list[str]) -> _engine.SetupStatus:
flow.ensure_all_flows_built()
return _engine.drop_setup(flow_names)
return _engine.drop_setup([flow.get_full_flow_name(name) for name in flow_names])

def flow_names_with_setup() -> list[str]:
return _engine.flow_names_with_setup()
result = []
for name in _engine.flow_names_with_setup():
app_namespace, name = setting.split_app_namespace(name, '.')
if app_namespace == setting.get_app_namespace():
result.append(name)
return result

def apply_setup_changes(setup_status: _engine.SetupStatus):
_engine.apply_setup_changes(setup_status)
9 changes: 4 additions & 5 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use serde::{Deserialize, Serialize};
use sqlx::PgPool;

pub fn default_tracking_table_name(flow_name: &str) -> String {
let sanitized_name = flow_name
.chars()
.map(|c| if c.is_alphanumeric() { c } else { '_' })
.collect::<String>();
format!("{}__cocoindex_tracking", sanitized_name)
format!(
"{}__cocoindex_tracking",
utils::db::sanitize_identifier(flow_name)
)
}

pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;
Expand Down
12 changes: 7 additions & 5 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use futures::FutureExt;
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
use serde::Serialize;
use sqlx::postgres::types::PgRange;
use sqlx::postgres::PgRow;
use sqlx::postgres::types::PgRange;
use sqlx::{PgPool, Row};
use std::ops::Bound;
use uuid::Uuid;
Expand Down Expand Up @@ -934,10 +934,12 @@ impl StorageFactoryBase for Factory {
.map(|d| {
let table_id = TableId {
database: d.spec.database.clone(),
table_name: d
.spec
.table_name
.unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, d.name)),
table_name: d.spec.table_name.unwrap_or_else(|| {
utils::db::sanitize_identifier(&format!(
"{}__{}",
context.flow_instance_name, d.name
))
}),
};
let setup_state = SetupState::new(
&table_id,
Expand Down
8 changes: 4 additions & 4 deletions src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
pub(crate) use anyhow::{Context, Result};
pub(crate) use async_trait::async_trait;
pub(crate) use chrono::{DateTime, Utc};
pub(crate) use futures::{FutureExt, StreamExt};
pub(crate) use futures::{
future::{BoxFuture, Shared},
prelude::*,
stream::BoxStream,
};
pub(crate) use futures::{FutureExt, StreamExt};
pub(crate) use indexmap::{IndexMap, IndexSet};
pub(crate) use itertools::Itertools;
pub(crate) use serde::{de::DeserializeOwned, Deserialize, Serialize};
pub(crate) use serde::{Deserialize, Serialize, de::DeserializeOwned};
pub(crate) use std::any::Any;
pub(crate) use std::borrow::Cow;
pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
Expand All @@ -21,11 +21,11 @@ pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};
pub(crate) use crate::base::{self, schema, spec, value};
pub(crate) use crate::builder::{self, plan};
pub(crate) use crate::execution;
pub(crate) use crate::lib_context::{get_lib_context, get_runtime, FlowContext, LibContext};
pub(crate) use crate::lib_context::{FlowContext, LibContext, get_lib_context, get_runtime};
pub(crate) use crate::ops::interface;
pub(crate) use crate::service::error::ApiError;
pub(crate) use crate::setup::AuthRegistry;
pub(crate) use crate::utils::retryable;
pub(crate) use crate::utils::{self, retryable};

pub(crate) use crate::{api_bail, api_error};

Expand Down
12 changes: 12 additions & 0 deletions src/utils/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,15 @@ pub enum WriteAction {
Insert,
Update,
}

pub fn sanitize_identifier(s: &str) -> String {
let mut result = String::new();
for c in s.chars() {
if c.is_alphanumeric() || c == '_' {
result.push(c);
} else {
result.push_str("__");
}
}
result
}
Loading