-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support non vectorized managed function #1373
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll review the test code in the next batch
def wrapper(func): | ||
nonlocal input_types, output_type | ||
|
||
if not callable(func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can lines 808-837 be put in a common function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there is a TODO on top of the wrapper. I'll use another PR to do it later if you agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, let's create an issue for tracking
ssets can be located through the following properties set in the | ||
object: | ||
|
||
`bigframes_managed_function` - The bigquery managed function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should document bigframes_bigquery_function (related to the other comment)
@@ -570,11 +647,12 @@ def try_delattr(attr): | |||
func.bigframes_cloud_function = ( | |||
remote_function_client.get_cloud_function_fully_qualified_name(cf_name) | |||
) | |||
func.bigframes_remote_function = ( | |||
func.bigframes_function = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] I think for clarity we should call the new attribute "bigframes_bigquery_function".
# TODO(jialuo): Deprecate the "bigframes_remote_function" attribute. | ||
# We have some tests using pre-defined remote_function that were | ||
# defined based on "bigframes_remote_function" instead of | ||
# "bigframes_bigquery_function". So we need to fix those pre-defined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's chat offline which tests need the logic here to depend on both attributes. If possible we should rely on the new attribute and keep the older attribute only for backward compatibility
is_row_processor, | ||
): | ||
"""Create a BigQuery managed function.""" | ||
self._create_bq_connection() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connection is not mandatory in managed function
ibis_signature.output_type | ||
), | ||
language="python", | ||
runtime_version="python-3.11", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should pick this up from the environment instead of hard coding
|
||
managed_function_options = { | ||
"runtime_version": runtime_version, | ||
"entry_point": "managed_func", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] maybe call it "bigframes_handler"
|
||
udf = cloudpickle.loads({pickled}) | ||
|
||
def managed_func(*args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think kwargs is redundant here, we can just use args
self._try_delattr(func, "is_row_processor") | ||
self._try_delattr(func, "ibis_node") | ||
|
||
bq_function_name = name if name else func.__name__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not use func.__name__
, multiple users using a common name with entirely different code could end up overwriting each other. See how provision_bq_remote_function
is determining the name of the BQ function from the hash of the user code + dependencies
|
||
|
||
@pytest.fixture(scope="module") | ||
def bq_cf_connection() -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since connection is only optional in managed udf, let's run the tests without one. We can have one or two separate tests in large tests which specifically test an explicit connection
pd_int64_col = scalars_pandas_df["int64_col"] | ||
pd_int64_col_filter = pd_int64_col.notnull() | ||
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter] | ||
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use an independent lambda, .apply(square)
would work on a pandas series. (If you found such usage elsewhere, it was probably written before the remote function could be applied on scalar directly - the op in line 62)
b/391680147
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕