Skip to content

Commit 804c664

Browse files
committed
refactor(python-sdk): make async APIs clear
1 parent 6fb0f65 commit 804c664

File tree

4 files changed

+48
-30
lines changed

4 files changed

+48
-30
lines changed

examples/gdrive_text_embedding/main.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ def gdrive_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope:
5353
default_similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)
5454

5555
@cocoindex.main_fn()
56-
async def _run():
56+
def _run():
5757
# Use a `FlowLiveUpdater` to keep the flow data updated.
58-
async with cocoindex.FlowLiveUpdater(gdrive_text_embedding_flow):
58+
with cocoindex.FlowLiveUpdater(gdrive_text_embedding_flow):
5959
# Run queries in a loop to demonstrate the query capabilities.
6060
while True:
6161
try:
@@ -74,4 +74,4 @@ async def _run():
7474

7575
if __name__ == "__main__":
7676
load_dotenv(override=True)
77-
asyncio.run(_run())
77+
_run()

python/cocoindex/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from . import functions, query, sources, storages, cli
55
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def
66
from .flow import EvaluateAndDumpOptions, GeneratedField
7-
from .flow import update_all_flows, FlowLiveUpdater, FlowLiveUpdaterOptions
7+
from .flow import update_all_flows_async, FlowLiveUpdater, FlowLiveUpdaterOptions
88
from .llm import LlmSpec, LlmApiType
99
from .index import VectorSimilarityMetric, VectorIndexDef, IndexOptions
1010
from .auth_registry import AuthEntryReference, add_auth_entry, ref_auth_entry

python/cocoindex/cli.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
import click
32
import datetime
43

@@ -7,7 +6,6 @@
76

87
from . import flow, lib, setting
98
from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes
10-
from .runtime import execution_context
119

1210
@click.group()
1311
def cli():
@@ -136,13 +134,12 @@ def update(flow_name: str | None, live: bool, quiet: bool):
136134
Update the index to reflect the latest data from data sources.
137135
"""
138136
options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
139-
async def _update():
140-
if flow_name is None:
141-
await flow.update_all_flows(options)
142-
else:
143-
updater = await flow.FlowLiveUpdater.create(_flow_by_name(flow_name), options)
144-
await updater.wait()
145-
execution_context.run(_update())
137+
if flow_name is None:
138+
return flow.update_all_flows(options)
139+
else:
140+
updater = flow.FlowLiveUpdater(_flow_by_name(flow_name), options)
141+
updater.wait()
142+
return updater.update_stats()
146143

147144
@cli.command()
148145
@click.argument("flow_name", type=str, required=False)
@@ -217,7 +214,7 @@ def server(address: str | None, live_update: bool, quiet: bool, cors_origin: str
217214

218215
if live_update:
219216
options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet)
220-
execution_context.run(flow.update_all_flows(options))
217+
flow.update_all_flows(options)
221218
if COCOINDEX_HOST in cors_origins:
222219
click.echo(f"Open CocoInsight at: {COCOINDEX_HOST}/cocoinsight")
223220
input("Press Enter to stop...")

python/cocoindex/flow.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,13 @@ def __init__(self, arg: Flow | _engine.FlowLiveUpdater, options: FlowLiveUpdater
394394
arg.internal_flow(), dump_engine_object(options or FlowLiveUpdaterOptions())))
395395

396396
@staticmethod
397-
async def create(fl: Flow, options: FlowLiveUpdaterOptions | None = None) -> FlowLiveUpdater:
397+
async def create_async(fl: Flow, options: FlowLiveUpdaterOptions | None = None) -> FlowLiveUpdater:
398398
"""
399399
Create a live updater for a flow.
400+
Similar to the constructor, but for async usage.
400401
"""
401402
engine_live_updater = await _engine.FlowLiveUpdater.create(
402-
await fl.ainternal_flow(),
403+
await fl.internal_flow_async(),
403404
dump_engine_object(options or FlowLiveUpdaterOptions()))
404405
return FlowLiveUpdater(engine_live_updater)
405406

@@ -408,21 +409,28 @@ def __enter__(self) -> FlowLiveUpdater:
408409

409410
def __exit__(self, exc_type, exc_value, traceback):
410411
self.abort()
411-
execution_context.run(self.wait())
412+
execution_context.run(self.wait_async())
412413

413414
async def __aenter__(self) -> FlowLiveUpdater:
414415
return self
415416

416417
async def __aexit__(self, exc_type, exc_value, traceback):
417418
self.abort()
418-
await self.wait()
419+
await self.wait_async()
419420

420-
async def wait(self) -> None:
421+
def wait(self) -> None:
421422
"""
422423
Wait for the live updater to finish.
423424
"""
425+
execution_context.run(self.wait_async())
426+
427+
async def wait_async(self) -> None:
428+
"""
429+
Wait for the live updater to finish. Async version.
430+
"""
424431
await self._engine_live_updater.wait()
425432

433+
426434
def abort(self) -> None:
427435
"""
428436
Abort the live updater.
@@ -500,13 +508,20 @@ def name(self) -> str:
500508
"""
501509
return self._lazy_engine_flow().name()
502510

503-
async def update(self) -> _engine.IndexUpdateInfo:
511+
def update(self) -> _engine.IndexUpdateInfo:
512+
"""
513+
Update the index defined by the flow.
514+
Once the function returns, the index is fresh up to the moment when the function is called.
515+
"""
516+
return execution_context.run(self.update_async())
517+
518+
async def update_async(self) -> _engine.IndexUpdateInfo:
504519
"""
505520
Update the index defined by the flow.
506-
Once the function returns, the indice is fresh up to the moment when the function is called.
521+
Once the function returns, the index is fresh up to the moment when the function is called.
507522
"""
508-
updater = await FlowLiveUpdater.create(self, FlowLiveUpdaterOptions(live_mode=False))
509-
await updater.wait()
523+
updater = await FlowLiveUpdater.create_async(self, FlowLiveUpdaterOptions(live_mode=False))
524+
await updater.wait_async()
510525
return updater.update_stats()
511526

512527
def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
@@ -521,7 +536,7 @@ def internal_flow(self) -> _engine.Flow:
521536
"""
522537
return self._lazy_engine_flow()
523538

524-
async def ainternal_flow(self) -> _engine.Flow:
539+
async def internal_flow_async(self) -> _engine.Flow:
525540
"""
526541
Get the engine flow. The async version.
527542
"""
@@ -587,21 +602,27 @@ def ensure_all_flows_built() -> None:
587602
for fl in flows():
588603
fl.internal_flow()
589604

590-
async def aensure_all_flows_built() -> None:
605+
async def ensure_all_flows_built_async() -> None:
591606
"""
592607
Ensure all flows are built.
593608
"""
594609
for fl in flows():
595-
await fl.ainternal_flow()
610+
await fl.internal_flow_async()
611+
612+
def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
613+
"""
614+
Update all flows.
615+
"""
616+
return execution_context.run(update_all_flows_async(options))
596617

597-
async def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
618+
async def update_all_flows_async(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
598619
"""
599620
Update all flows.
600621
"""
601-
await aensure_all_flows_built()
622+
await ensure_all_flows_built_async()
602623
async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo:
603-
updater = await FlowLiveUpdater.create(fl, options)
604-
await updater.wait()
624+
updater = await FlowLiveUpdater.create_async(fl, options)
625+
await updater.wait_async()
605626
return updater.update_stats()
606627
fls = flows()
607628
all_stats = await asyncio.gather(*(_update_flow(fl) for fl in fls))

0 commit comments

Comments
 (0)