Skip to content

Commit cdd7923

Browse files
authored
Convert Ovens to use a Future API, and rename Ovens to BatchExecutor
This PR also adds a @deprecated wrapper, and changes where version info is stored for vcap/ and vcap_utils/
1 parent 17e0083 commit cdd7923

File tree

17 files changed

+292
-87
lines changed

17 files changed

+292
-87
lines changed

.github/workflows/test.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ on:
44
pull_request:
55
branches: [ master ]
66
push:
7-
branches: [ master ]
7+
88

99
jobs:
1010
test:
@@ -33,7 +33,7 @@ jobs:
3333
path: ${{ steps.pip-cache.outputs.dir }}
3434
key: ${{ runner.os }}-${{ hashFiles('**/setup.py') }}-pip-cache
3535
restore-keys: |
36-
${{ runner.os }}-pip-cache
36+
${{ runner.os }}-pip-cache
3737
- name: Install dependencies
3838
run: |
3939
python -m pip install --upgrade pip

docs/backends.rst

+8-7
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,20 @@ amount of round-trips the video frames take between devices.
3434

3535
If you wish to use batching in your capsule, you may call the ``send_to_batch``
3636
method in ``process_frame`` instead of doing analysis in that method directly.
37-
The ``send_to_batch`` method collects one or more input objects into a list and
38-
routinely calls your backend's ``batch_predict`` method with this list. As a
39-
result, users of ``send_to_batch`` must override the ``batch_predict`` method
40-
in addition to the other required methods.
37+
The ``send_to_batch`` method sends the input to a ``BatchExecutor`` which collects
38+
inference requests for this capsule from different streams. Then, the
39+
``BatchExecutor`` routinely calls your backend's ``batch_predict`` method with a
40+
list of the collected inputs. As a result, users of ``send_to_batch`` must
41+
override the ``batch_predict`` method in addition to the other required methods.
4142

4243
The ``send_to_batch`` method is asynchronous. Instead of immediately returning
43-
analysis results, it returns a ``queue.Queue`` where the result will be provided.
44+
analysis results, it returns a ``concurrent.futures.Future`` where the result will be provided.
4445
Simple batching capsules may call ``send_to_batch``, then immediately call
45-
``get`` to block for the result.
46+
``result`` to block for the result.
4647

4748
.. code-block:: python
4849
49-
result = self.send_to_batch(frame).get()
50+
result = self.send_to_batch(frame).result()
5051
5152
An argument of any type may be provided to ``send_to_batch``, as the argument
5253
will be passed in a list to ``batch_predict`` without modification. In many

tests/test_batch_executor.py

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import random
2+
from concurrent.futures import Future
3+
from typing import Any, Generator, List, Tuple
4+
5+
import pytest
6+
7+
from vcap.batch_executor import BatchExecutor, _Request
8+
9+
10+
@pytest.fixture()
11+
def batch_executor():
12+
"""To use this fixture, replace batch_executor.batch_fn with your own
13+
batch function."""
14+
15+
def batch_fn(inputs):
16+
raise NotImplemented
17+
18+
batch_executor = BatchExecutor(batch_fn=batch_fn)
19+
yield batch_executor
20+
batch_executor.close()
21+
22+
23+
def batch_fn_base(inputs: List[int], raises: bool) \
24+
-> Generator[Any, None, None]:
25+
"""Process results and yield them as they are processed
26+
27+
This function is to be used as a base for other test cases for batch_fn
28+
variants.
29+
30+
:param inputs: A list of inputs
31+
:param raises: If True, raises an error on the 5th input. If False,
32+
no exception will be raised.
33+
"""
34+
for i in inputs:
35+
if i == 5 and raises:
36+
# This occurs on the 5th input if raises=True.
37+
# This is used to test BatchExecutor's handling of exceptions
38+
raise RuntimeError("Oh no, a batch_fn error has occurred!")
39+
yield i * 100
40+
41+
42+
def batch_fn_returns_generator(inputs: List[int]) \
43+
-> Generator[Any, None, None]:
44+
return (o for o in batch_fn_base(inputs, raises=False))
45+
46+
47+
def batch_fn_returns_generator_raises(inputs: List[int]) \
48+
-> Generator[Any, None, None]:
49+
return (o for o in batch_fn_base(inputs, raises=True))
50+
51+
52+
def batch_fn_returns_list(inputs: List[int]) -> List[Any]:
53+
"""Process results and yield them at the end, as a list."""
54+
return list(batch_fn_base(inputs, raises=False))
55+
56+
57+
def batch_fn_returns_list_raises(inputs: List[int]) -> List[Any]:
58+
return list(batch_fn_base(inputs, raises=True))
59+
60+
61+
@pytest.mark.parametrize(
62+
argnames=["batch_fn", "expect_partial_results"],
63+
argvalues=[
64+
(batch_fn_returns_generator_raises, True),
65+
(batch_fn_returns_list_raises, False)
66+
]
67+
)
68+
def test_exceptions_during_batch_fn(
69+
batch_executor, batch_fn, expect_partial_results):
70+
"""Test that BatchExecutor catches exceptions that occur in the batch_fn
71+
and propagates them through the requests Future objects.
72+
73+
If an exception occurs after processing some of the batch, the expectation
74+
is that the unprocessed inputs of the batch will get an exception
75+
set (expect_partial_results=True). If the exception happens before
76+
receiving any results, all future objects should have exceptions set.
77+
"""
78+
batch_executor.batch_fn = batch_fn
79+
request_batch = [
80+
_Request(
81+
future=Future(),
82+
input_data=i)
83+
for i in range(10)
84+
]
85+
batch_executor._on_requests_ready(request_batch)
86+
for i, request in enumerate(request_batch):
87+
if expect_partial_results and i < 5:
88+
result = request.future.result(timeout=5)
89+
assert result == request.input_data * 100, \
90+
"The result for this future doesn't match the input that " \
91+
"was supposed to have been routed to it!"
92+
else:
93+
with pytest.raises(RuntimeError):
94+
request.future.result(timeout=5)
95+
96+
97+
@pytest.mark.parametrize(
98+
argnames=["batch_fn"],
99+
argvalues=[
100+
(batch_fn_returns_generator,),
101+
(batch_fn_returns_list,)
102+
]
103+
)
104+
def test_relevant_input_outputs_match(batch_executor, batch_fn):
105+
"""Test the output for any given input is routed to the correct
106+
Future object. """
107+
batch_executor.batch_fn = batch_fn
108+
109+
# Submit input values in a random order
110+
request_inputs = list(range(10000))
111+
random.seed("vcap? More like vgood")
112+
random.shuffle(request_inputs)
113+
114+
# Submit inputs to the BatchExecutor and keep track of their futures
115+
inputs_and_futures: List[Tuple[int, Future]] = []
116+
for input_data in request_inputs:
117+
future = batch_executor.submit(input_data)
118+
inputs_and_futures.append((input_data, future))
119+
120+
# Verify that all outputs are the expected ones for their respective input
121+
for input_data, future in inputs_and_futures:
122+
result = future.result(timeout=5)
123+
assert result == input_data * 100, \
124+
"The result for this future doesn't match the input that " \
125+
"was supposed to have been routed to it!"
126+
127+
assert batch_executor.total_imgs_in_pipeline == 0

vcap/examples/classifier_gait_example/backend.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def process_frame(self, frame: np.ndarray,
2020
.pad_percent(top=10, bottom=10, left=10, right=10)
2121
.apply(frame))
2222

23-
prediction = self.send_to_batch(crop).get()
23+
prediction = self.send_to_batch(crop).result()
2424

2525
detection_node.attributes[config.category] = prediction.name
2626
detection_node.extra_data[config.extra_data] = prediction.confidence

vcap/examples/detector_person_example/backend.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def process_frame(self, frame: np.ndarray,
2424
max_height=max_frame_side_length)
2525
frame = clamp.apply()
2626

27-
predictions = self.send_to_batch(frame).get()
27+
predictions = self.send_to_batch(frame).result()
2828

2929
results = []
3030

vcap/setup.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
#!/usr/bin/env python3
22
import os
3+
from pathlib import Path
34

45
from setuptools import setup, find_namespace_packages
56

7+
# Get package version/metadata
8+
about = {}
9+
exec(Path("vcap/version.py").read_text(), about)
10+
611
test_packages = ["pytest", "mock"]
712

813
PRE_RELEASE_SUFFIX = os.environ.get("PRE_RELEASE_SUFFIX", "")
914

1015
setup(
1116
name='vcap',
1217
description="A library for creating OpenVisionCapsules in Python",
13-
author="Dilili Labs",
18+
author="Aotu.ai",
1419
packages=find_namespace_packages(include=["vcap*"]),
15-
version="0.2.5" + PRE_RELEASE_SUFFIX,
20+
version=about["__version__"] + PRE_RELEASE_SUFFIX,
1621

1722
install_requires=[
1823
"pycryptodomex==3.9.7",

vcap/vcap/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from .version import __version__
2+
from .deprecation import deprecated
13
from .capsule import BaseCapsule
24
from .stream_state import BaseStreamState
35
from .backend import BaseBackend

vcap/vcap/backend.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import abc
2-
from queue import Queue
3-
from typing import Any, Dict, List, Union
2+
from concurrent.futures import Future
3+
from typing import Any, Dict, List
44

55
import numpy as np
66

7+
from vcap.batch_executor import BatchExecutor
78
from vcap.node_description import DETECTION_NODE_TYPE
8-
from vcap.ovens import Oven
99
from vcap.options import OPTION_TYPE
1010
from vcap.stream_state import BaseStreamState
1111

@@ -16,18 +16,18 @@ class BaseBackend(abc.ABC):
1616
"""
1717

1818
def __init__(self):
19-
self._oven = Oven(self.batch_predict)
19+
self._batch_executor = BatchExecutor(self.batch_predict)
2020

21-
def send_to_batch(self, input_data: Any) -> Queue:
21+
def send_to_batch(self, input_data: Any) -> Future:
2222
"""Sends the given object to the batch_predict method for processing.
2323
This call does not block. Instead, the result will be provided on the
24-
returned queue. The batch_predict method must be overridden on the
24+
returned Future. The batch_predict method must be overridden on the
2525
backend this method is being called on.
2626
2727
:param input_data: The input object to send to batch_predict
28-
:return: A queue where results will be stored
28+
:return: A Future where results will be stored
2929
"""
30-
return self._oven.submit(input_data)
30+
return self._batch_executor.submit(input_data)
3131

3232
@property
3333
def workload(self) -> float:
@@ -36,7 +36,7 @@ def workload(self) -> float:
3636
is intended to give the scheduler the ability to pick the least busy
3737
backend.
3838
"""
39-
return self._oven.total_imgs_in_pipeline
39+
return self._batch_executor.total_imgs_in_pipeline
4040

4141
@abc.abstractmethod
4242
def process_frame(self,
@@ -90,4 +90,4 @@ def close(self) -> None:
9090
The backend will stop receiving frames before this method is
9191
called, and will not receive frames again.
9292
"""
93-
self._oven.close()
93+
self._batch_executor.close()

0 commit comments

Comments
 (0)