Skip to content

Commit ae1abe3

Browse files
authored
A bunch of bug fixes / cleanups for cache feature. (#13)
* `s/enable_caching/enable_cache/` * Support eneabling cache in Python SDK. * Fix validation logic for `behavior_version`
1 parent 295d66e commit ae1abe3

File tree

6 files changed

+45
-12
lines changed

6 files changed

+45
-12
lines changed

python/cocoindex/op.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ def __call__(self, spec_json: str, *args, **kwargs):
6262

6363
_gpu_dispatch_lock = Lock()
6464

65-
def executor_class(gpu: bool = False) -> Callable[[type], type]:
65+
def executor_class(gpu: bool = False, cache: bool = False, behavior_version: int | None = None) -> Callable[[type], type]:
6666
"""
6767
Decorate a class to provide an executor for an op.
6868
6969
Args:
7070
gpu: Whether the executor will be executed on GPU.
71+
cache: Whether the executor will be cached.
72+
behavior_version: The behavior version of the executor. Cache will be invalidated if it changes. Must be provided if `cache` is True.
7173
"""
7274

7375
def _inner(cls: type[Executor]) -> type:
@@ -87,7 +89,15 @@ def _inner(cls: type[Executor]) -> type:
8789
expected_return = sig.return_annotation
8890

8991
cls_type: type = cls
90-
class _WrappedClass(cls_type):
92+
93+
class _Fallback:
94+
def enable_cache(self):
95+
return cache
96+
97+
def behavior_version(self):
98+
return behavior_version
99+
100+
class _WrappedClass(cls_type, _Fallback):
91101
def __init__(self, spec):
92102
super().__init__()
93103
self.spec = spec

src/builder/analyzer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,7 @@ impl<'a> AnalyzerContext<'a> {
706706
})?;
707707
let behavior_version = executor.behavior_version();
708708
let function_exec_info = AnalyzedFunctionExecInfo {
709-
enable_caching: executor.enable_caching(),
709+
enable_cache: executor.enable_cache(),
710710
behavior_version,
711711
fingerprinter: Fingerprinter::default()
712712
.with(&reactive_op.name)?
@@ -715,8 +715,8 @@ impl<'a> AnalyzerContext<'a> {
715715
.with(&output_type.without_attrs())?,
716716
output_type: output_type.typ.clone(),
717717
};
718-
if function_exec_info.enable_caching
719-
&& function_exec_info.behavior_version.is_some()
718+
if function_exec_info.enable_cache
719+
&& function_exec_info.behavior_version.is_none()
720720
{
721721
api_bail!(
722722
"When caching is enabled, behavior version must be specified for transform op: {}",

src/builder/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub struct AnalyzedSourceOp {
6464
}
6565

6666
pub struct AnalyzedFunctionExecInfo {
67-
pub enable_caching: bool,
67+
pub enable_cache: bool,
6868
pub behavior_version: Option<u32>,
6969

7070
/// Fingerprinter of the function's behavior.

src/execution/evaluator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ async fn evaluate_op_scope(
306306
let input_values = assemble_input_values(&op.inputs, scoped_entries);
307307
let output_value = if let Some(cache) = op
308308
.function_exec_info
309-
.enable_caching
309+
.enable_cache
310310
.then_some(cache)
311311
.flatten()
312312
{

src/ops/interface.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ pub trait SimpleFunctionExecutor: Send + Sync {
4040
/// Evaluate the operation.
4141
async fn evaluate(&self, args: Vec<Value>) -> Result<Value>;
4242

43-
fn enable_caching(&self) -> bool {
43+
fn enable_cache(&self) -> bool {
4444
false
4545
}
4646

47-
/// Must be Some if `enable_caching` is true.
47+
/// Must be Some if `enable_cache` is true.
4848
/// If it changes, the cache will be invalidated.
4949
fn behavior_version(&self) -> Option<u32> {
5050
None

src/ops/py_factory.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ struct PyFunctionExecutor {
149149
num_positional_args: usize,
150150
kw_args_names: Vec<Py<PyString>>,
151151
result_type: schema::EnrichedValueType,
152+
153+
enable_cache: bool,
154+
behavior_version: Option<u32>,
152155
}
153156

154157
#[async_trait]
@@ -193,6 +196,14 @@ impl SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
193196
})
194197
.await
195198
}
199+
200+
fn enable_cache(&self) -> bool {
201+
self.enable_cache
202+
}
203+
204+
fn behavior_version(&self) -> Option<u32> {
205+
self.behavior_version
206+
}
196207
}
197208

198209
pub(crate) struct PyFunctionFactory {
@@ -251,15 +262,27 @@ impl SimpleFunctionFactory for PyFunctionFactory {
251262

252263
let executor_fut = {
253264
let result_type = result_type.clone();
254-
async move {
255-
Python::with_gil(|py| executor.call_method(py, "prepare", (), None))?;
265+
unblock(move || {
266+
let (enable_cache, behavior_version) =
267+
Python::with_gil(|py| -> anyhow::Result<_> {
268+
executor.call_method(py, "prepare", (), None)?;
269+
let enable_cache = executor
270+
.call_method(py, "enable_cache", (), None)?
271+
.extract::<bool>(py)?;
272+
let behavior_version = executor
273+
.call_method(py, "behavior_version", (), None)?
274+
.extract::<Option<u32>>(py)?;
275+
Ok((enable_cache, behavior_version))
276+
})?;
256277
Ok(Box::new(Arc::new(PyFunctionExecutor {
257278
py_function_executor: executor,
258279
num_positional_args,
259280
kw_args_names,
260281
result_type,
282+
enable_cache,
283+
behavior_version,
261284
})) as Box<dyn SimpleFunctionExecutor>)
262-
}
285+
})
263286
};
264287

265288
Ok((result_type, executor_fut.boxed()))

0 commit comments

Comments
 (0)