Skip to content

Commit ec5f39b

Browse files
authored
Implement evaluate_and_dump and provide a evaluate subcommand for it (#199)
* Expose `ScopeValueBuilder` from `evaluate_source_entry_with_cache()`. * Take cache enablement as an option in `evalute_source_entry_with_cache` * Add `to_strs()` for `KeyValue`. * Evaluate and write YTML files for exported data. * Expose `evaluate_and_dump()` method to Python. * Put source name into filename. * Expose eval functionality by the CLI. * Bug fix for the dumper to make it work as intended.
1 parent e9faf0d commit ec5f39b

File tree

10 files changed

+397
-44
lines changed

10 files changed

+397
-44
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,4 @@ yup-oauth2 = "12.1.0"
8888
rustls = { version = "0.23.25" }
8989
http-body-util = "0.1.3"
9090
yaml-rust2 = "0.10.0"
91+
urlencoding = "2.1.3"

python/cocoindex/cli.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import click
2+
import datetime
23

34
from . import flow, lib
45
from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes
@@ -52,6 +53,24 @@ def update(flow_name: str | None):
5253
stats = _flow_by_name(flow_name).update()
5354
print(stats)
5455

56+
@cli.command()
57+
@click.argument("flow_name", type=str, required=False)
58+
@click.option(
59+
"-o", "--output-dir", type=str, required=False,
60+
help="The directory to dump the evaluation output to.")
61+
@click.option(
62+
"-c", "--use-cache", is_flag=True, show_default=True, default=True,
63+
help="Use cached evaluation results if available.")
64+
def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = True):
65+
"""
66+
Evaluate and dump the flow.
67+
"""
68+
fl = _flow_by_name(flow_name)
69+
if output_dir is None:
70+
output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
71+
options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=use_cache)
72+
fl.evaluate_and_dump(options)
73+
5574
_default_server_settings = lib.ServerSettings.from_env()
5675

5776
@cli.command()

python/cocoindex/flow.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import Any, Callable, Sequence, TypeVar, get_origin
1010
from threading import Lock
1111
from enum import Enum
12+
from dataclasses import dataclass
1213

1314
from . import _engine
1415
from . import vector
@@ -61,18 +62,18 @@ def _create_data_slice(
6162
def _spec_kind(spec: Any) -> str:
6263
return spec.__class__.__name__
6364

64-
def _spec_value_dump(v: Any) -> Any:
65-
"""Recursively dump a spec object and its nested attributes to a dictionary."""
65+
def _dump_engine_object(v: Any) -> Any:
66+
"""Recursively dump an object for engine. Engine side uses `Pythonzized` to catch."""
6667
if isinstance(v, type) or get_origin(v) is not None:
6768
return encode_enriched_type(v)
6869
elif isinstance(v, Enum):
6970
return v.value
7071
elif hasattr(v, '__dict__'):
71-
return {k: _spec_value_dump(v) for k, v in v.__dict__.items()}
72+
return {k: _dump_engine_object(v) for k, v in v.__dict__.items()}
7273
elif isinstance(v, (list, tuple)):
73-
return [_spec_value_dump(item) for item in v]
74+
return [_dump_engine_object(item) for item in v]
7475
elif isinstance(v, dict):
75-
return {k: _spec_value_dump(v) for k, v in v.items()}
76+
return {k: _dump_engine_object(v) for k, v in v.items()}
7677
return v
7778

7879
T = TypeVar('T')
@@ -177,7 +178,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice:
177178
lambda target_scope, name:
178179
flow_builder_state.engine_flow_builder.transform(
179180
_spec_kind(fn_spec),
180-
_spec_value_dump(fn_spec),
181+
_dump_engine_object(fn_spec),
181182
transform_args,
182183
target_scope,
183184
flow_builder_state.field_name_builder.build_name(
@@ -267,7 +268,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
267268
{"field_name": field_name, "metric": metric.value}
268269
for field_name, metric in vector_index]
269270
self._flow_builder_state.engine_flow_builder.export(
270-
name, _spec_kind(target_spec), _spec_value_dump(target_spec),
271+
name, _spec_kind(target_spec), _dump_engine_object(target_spec),
271272
index_options, self._engine_data_collector)
272273

273274

@@ -316,13 +317,20 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
316317
self._state,
317318
lambda target_scope, name: self._state.engine_flow_builder.add_source(
318319
_spec_kind(spec),
319-
_spec_value_dump(spec),
320+
_dump_engine_object(spec),
320321
target_scope,
321322
self._state.field_name_builder.build_name(
322323
name, prefix=_to_snake_case(_spec_kind(spec))+'_'),
323324
),
324325
name
325326
)
327+
@dataclass
328+
class EvaluateAndDumpOptions:
329+
"""
330+
Options for evaluating and dumping a flow.
331+
"""
332+
output_dir: str
333+
use_cache: bool = True
326334

327335
class Flow:
328336
"""
@@ -348,20 +356,32 @@ def __str__(self):
348356
def __repr__(self):
349357
return repr(self._lazy_engine_flow())
350358

359+
@property
360+
def name(self) -> str:
361+
"""
362+
Get the name of the flow.
363+
"""
364+
return self._lazy_engine_flow().name()
365+
351366
def update(self):
352367
"""
353368
Update the index defined by the flow.
354369
Once the function returns, the indice is fresh up to the moment when the function is called.
355370
"""
356371
return self._lazy_engine_flow().update()
357372

373+
def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
374+
"""
375+
Evaluate and dump the flow.
376+
"""
377+
return self._lazy_engine_flow().evaluate_and_dump(_dump_engine_object(options))
378+
358379
def internal_flow(self) -> _engine.Flow:
359380
"""
360381
Get the engine flow.
361382
"""
362383
return self._lazy_engine_flow()
363384

364-
365385
def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
366386
"""
367387
Create a flow without really building it yet.

src/base/value.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,24 @@ impl KeyValue {
195195
Ok(result)
196196
}
197197

198+
fn parts_to_strs(&self, output: &mut Vec<String>) {
199+
match self {
200+
KeyValue::Bytes(v) => output.push(BASE64_STANDARD.encode(v)),
201+
KeyValue::Str(v) => output.push(v.to_string()),
202+
KeyValue::Bool(v) => output.push(v.to_string()),
203+
KeyValue::Int64(v) => output.push(v.to_string()),
204+
KeyValue::Range(v) => {
205+
output.push(v.start.to_string());
206+
output.push(v.end.to_string());
207+
}
208+
KeyValue::Struct(v) => {
209+
for part in v {
210+
part.parts_to_strs(output);
211+
}
212+
}
213+
}
214+
}
215+
198216
pub fn from_strs(value: impl IntoIterator<Item = String>, schema: &ValueType) -> Result<Self> {
199217
let mut values_iter = value.into_iter();
200218
let result = Self::parts_from_str(&mut values_iter, schema)?;
@@ -204,6 +222,12 @@ impl KeyValue {
204222
Ok(result)
205223
}
206224

225+
pub fn to_strs(&self) -> Vec<String> {
226+
let mut output = Vec::with_capacity(self.num_parts());
227+
self.parts_to_strs(&mut output);
228+
output
229+
}
230+
207231
pub fn kind_str(&self) -> &'static str {
208232
match self {
209233
KeyValue::Bytes(_) => "bytes",
@@ -256,6 +280,14 @@ impl KeyValue {
256280
_ => anyhow::bail!("expected struct value, but got {}", self.kind_str()),
257281
}
258282
}
283+
284+
pub fn num_parts(&self) -> usize {
285+
match self {
286+
KeyValue::Range(_) => 2,
287+
KeyValue::Struct(v) => v.iter().map(|v| v.num_parts()).sum(),
288+
_ => 1,
289+
}
290+
}
259291
}
260292

261293
#[derive(Debug, Clone)]
@@ -877,15 +909,15 @@ impl Serialize for TypedValue<'_> {
877909
(_, Value::Null) => serializer.serialize_none(),
878910
(ValueType::Basic(_), v) => v.serialize(serializer),
879911
(ValueType::Struct(s), Value::Struct(field_values)) => TypedFieldsValue {
880-
schema: s,
912+
schema: &s.fields,
881913
values_iter: field_values.fields.iter(),
882914
}
883915
.serialize(serializer),
884916
(ValueType::Collection(c), Value::Collection(rows) | Value::List(rows)) => {
885917
let mut seq = serializer.serialize_seq(Some(rows.len()))?;
886918
for row in rows {
887919
seq.serialize_element(&TypedFieldsValue {
888-
schema: &c.row,
920+
schema: &c.row.fields,
889921
values_iter: row.fields.iter(),
890922
})?;
891923
}
@@ -895,7 +927,7 @@ impl Serialize for TypedValue<'_> {
895927
let mut seq = serializer.serialize_seq(Some(rows.len()))?;
896928
for (k, v) in rows {
897929
seq.serialize_element(&TypedFieldsValue {
898-
schema: &c.row,
930+
schema: &c.row.fields,
899931
values_iter: std::iter::once(&Value::from(k.clone()))
900932
.chain(v.fields.iter()),
901933
})?;
@@ -911,15 +943,15 @@ impl Serialize for TypedValue<'_> {
911943
}
912944

913945
pub struct TypedFieldsValue<'a, I: Iterator<Item = &'a Value> + Clone> {
914-
schema: &'a StructSchema,
915-
values_iter: I,
946+
pub schema: &'a [FieldSchema],
947+
pub values_iter: I,
916948
}
917949

918950
impl<'a, I: Iterator<Item = &'a Value> + Clone> Serialize for TypedFieldsValue<'a, I> {
919951
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
920-
let mut map = serializer.serialize_map(Some(self.schema.fields.len()))?;
952+
let mut map = serializer.serialize_map(Some(self.schema.len()))?;
921953
let values_iter = self.values_iter.clone();
922-
for (field, value) in self.schema.fields.iter().zip(values_iter) {
954+
for (field, value) in self.schema.iter().zip(values_iter) {
923955
map.serialize_entry(
924956
&field.name,
925957
&TypedValue {

src/builder/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub struct AnalyzedExportOp {
104104
pub query_target: Option<Arc<dyn QueryTarget>>,
105105
pub primary_key_def: AnalyzedPrimaryKeyDef,
106106
pub primary_key_type: ValueType,
107+
/// idx for value fields - excluding the primary key field.
107108
pub value_fields: Vec<u32>,
108109
}
109110

0 commit comments

Comments
 (0)