diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 2c06db49..ba170789 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -718,9 +718,11 @@ async def _build_flow_info_async(self) -> TransformFlowInfo: for (param_name, param), param_type in zip(sig.parameters.items(), self._flow_arg_types): if param.kind not in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY): - raise ValueError(f"Parameter {param_name} is not a parameter can be passed by name") - engine_ds = flow_builder_state.engine_flow_builder.add_direct_input( - param_name, encode_enriched_type(param_type)) + raise ValueError(f"Parameter `{param_name}` is not a parameter can be passed by name") + encoded_type = encode_enriched_type(param_type) + if encoded_type is None: + raise ValueError(f"Parameter `{param_name}` has no type annotation") + engine_ds = flow_builder_state.engine_flow_builder.add_direct_input(param_name, encoded_type) kwargs[param_name] = DataSlice(_DataSliceState(flow_builder_state, engine_ds)) output = self._flow_fn(**kwargs) @@ -780,8 +782,13 @@ def _transform_flow_wrapper(fn: Callable[..., DataSlice[T]]): for (param_name, param) in sig.parameters.items(): if param.kind not in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY): - raise ValueError(f"Parameter {param_name} is not a parameter can be passed by name") - arg_types.append(_get_data_slice_annotation_type(param.annotation)) + raise ValueError(f"Parameter `{param_name}` is not a parameter can be passed by name") + value_type_annotation = _get_data_slice_annotation_type(param.annotation) + if value_type_annotation is None: + raise ValueError( + f"Parameter `{param_name}` for {fn} has no value type annotation. " + "Please use `cocoindex.DataSlice[T]` where T is the type of the value.") + arg_types.append(value_type_annotation) _transform_flow = TransformFlow(fn, arg_types) functools.update_wrapper(_transform_flow, fn)