Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support non vectorized managed function #1373

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
15 changes: 15 additions & 0 deletions bigframes/_config/experiment_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ExperimentOptions:
def __init__(self):
self._semantic_operators: bool = False
self._blob: bool = False
self._udf: bool = False

@property
def semantic_operators(self) -> bool:
Expand Down Expand Up @@ -53,3 +54,17 @@ def blob(self, value: bool):
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._blob = value

@property
def udf(self) -> bool:
return self._udf

@udf.setter
def udf(self, value: bool):
if value is True:
msg = (
"BigFrames managed function (udf) is still under experiments. "
"It may not work and subject to change in the future."
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._udf = value
8 changes: 5 additions & 3 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3995,9 +3995,11 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
msg = "axis=1 scenario is in preview."
warnings.warn(msg, category=bfe.PreviewWarning)

# Check if the function is a remote function
if not hasattr(func, "bigframes_remote_function"):
raise ValueError("For axis=1 a remote function must be used.")
# Check if the function is a remote function.
if not hasattr(func, "bigframes_remote_function") and not hasattr(
func, "bigframes_function"
):
raise ValueError("For axis=1 a bigframes function must be used.")

is_row_processor = getattr(func, "is_row_processor")
if is_row_processor:
Expand Down
155 changes: 117 additions & 38 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,39 +55,77 @@


class FunctionClient:
# Wait time (in seconds) for an IAM binding to take effect after creation
# Wait time (in seconds) for an IAM binding to take effect after creation.
_iam_wait_seconds = 120

# TODO(b/392707725): Convert all necessary parameters for cloud function
# deployment into method parameters.
def __init__(
self,
gcp_project_id,
cloud_function_region,
cloud_functions_client,
bq_location,
bq_dataset,
bq_client,
bq_connection_id,
bq_connection_manager,
cloud_function_service_account,
cloud_function_kms_key_name,
cloud_function_docker_repository,
cloud_function_region=None,
cloud_functions_client=None,
cloud_function_service_account=None,
cloud_function_kms_key_name=None,
cloud_function_docker_repository=None,
*,
session: Session,
):
self._gcp_project_id = gcp_project_id
self._cloud_function_region = cloud_function_region
self._cloud_functions_client = cloud_functions_client
self._bq_location = bq_location
self._bq_dataset = bq_dataset
self._bq_client = bq_client
self._bq_connection_id = bq_connection_id
self._bq_connection_manager = bq_connection_manager
self._session = session

# Optional attributes only for remote functions.
self._cloud_function_region = cloud_function_region
self._cloud_functions_client = cloud_functions_client
self._cloud_function_service_account = cloud_function_service_account
self._cloud_function_kms_key_name = cloud_function_kms_key_name
self._cloud_function_docker_repository = cloud_function_docker_repository
self._session = session

def _create_bq_connection(self) -> None:
if self._bq_connection_manager:
self._bq_connection_manager.create_bq_connection(
self._gcp_project_id,
self._bq_location,
self._bq_connection_id,
"run.invoker",
)

def _ensure_dataset_exists(self) -> None:
# Make sure the dataset exists, i.e. if it doesn't exist, go ahead and
# create it.
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
try:
# This check does not require bigquery.datasets.create IAM
# permission. So, if the data set already exists, then user can work
# without having that permission.
self._bq_client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
# This requires bigquery.datasets.create IAM permission.
self._bq_client.create_dataset(dataset, exists_ok=True)

def _create_bq_function(self, create_function_ddl: str) -> None:
# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
)
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")

def create_bq_remote_function(
self,
Expand All @@ -101,13 +139,7 @@ def create_bq_remote_function(
):
"""Create a BigQuery remote function given the artifacts of a user defined
function and the http endpoint of a corresponding cloud function."""
if self._bq_connection_manager:
self._bq_connection_manager.create_bq_connection(
self._gcp_project_id,
self._bq_location,
self._bq_connection_id,
"run.invoker",
)
self._create_bq_connection()

# Create BQ function
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
Expand Down Expand Up @@ -144,31 +176,78 @@ def create_bq_remote_function(

logger.info(f"Creating BQ remote function: {create_function_ddl}")

# Make sure the dataset exists. I.e. if it doesn't exist, go ahead and
# create it
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
try:
# This check does not require bigquery.datasets.create IAM
# permission. So, if the data set already exists, then user can work
# without having that permission.
self._bq_client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
# This requires bigquery.datasets.create IAM permission
self._bq_client.create_dataset(dataset, exists_ok=True)
self._ensure_dataset_exists()
self._create_bq_function(create_function_ddl)

# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
def create_bq_managed_function(
self,
func,
input_types,
output_type,
language,
runtime_version,
bq_function_name,
packages,
):
"""Create a BigQuery managed function."""
self._create_bq_connection()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection is not mandatory in managed function


import cloudpickle

pickled = cloudpickle.dumps(func)

code_block = f"""\
import cloudpickle

udf = cloudpickle.loads({pickled})

def managed_func(*args, **kwargs):
return udf(*args, **kwargs)
"""
# Create BQ managed function.
bq_function_args = []
bq_function_return_type = output_type

input_args = inspect.getargs(func.__code__).args
# We expect the input type annotations to be 1:1 with the input args.
for name, type_ in zip(input_args, input_types):
bq_function_args.append(f"{name} {type_}")

managed_function_options = {
"runtime_version": runtime_version,
"entry_point": "managed_func",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] maybe call it "bigframes_handler"

}

managed_function_options_str = ", ".join(
[
f'{key}="{val}"' if isinstance(val, str) else f"{key}={val}"
for key, val in managed_function_options.items()
if val is not None
]
)
if not packages:
packages = ["cloudpickle"]
if "cloudpickle" not in packages:
packages += ["cloudpickle"]
managed_function_options_str = (
f"{managed_function_options_str}, packages={packages}"
)

logger.info(f"Created remote function {query_job.ddl_target_routine}")
persistent_func_id = (
f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}"
)
create_function_ddl = f"""
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
RETURNS {bq_function_return_type}
LANGUAGE {language}
OPTIONS ({managed_function_options_str})
AS r'''
{code_block}
'''
"""

self._ensure_dataset_exists()
self._create_bq_function(create_function_ddl)

def get_cloud_function_fully_qualified_parent(self):
"Get the fully qualilfied parent for a cloud function."
Expand Down
Loading