From 7739f41989c5d0effbaf66070c6b94b6c3840506 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 9 Jan 2024 14:18:54 -0600 Subject: [PATCH] feat: use faster query_and_wait method from google-cloud-bigquery when available fix unit tests fix python 3.7 fix python 3.7 fix python 3.7 fix python 3.7 fix wait_timeout units boost test coverage remove dead code boost a little more coverage --- pandas_gbq/features.py | 10 ++- pandas_gbq/gbq.py | 44 +++++++--- pandas_gbq/query.py | 75 ++++++++++++----- tests/system/test_gbq.py | 27 ------ tests/unit/test_context.py | 42 +++++++++- tests/unit/test_features.py | 17 ++++ tests/unit/test_gbq.py | 161 +++++++++++++++++++++++++++++------- tests/unit/test_query.py | 124 +++++++++++++++++++++++++++ 8 files changed, 411 insertions(+), 89 deletions(-) diff --git a/pandas_gbq/features.py b/pandas_gbq/features.py index d2fc33cb..45a43c55 100644 --- a/pandas_gbq/features.py +++ b/pandas_gbq/features.py @@ -4,8 +4,9 @@ """Module for checking dependency versions and supported features.""" -# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md +# https://github.com/googleapis/python-bigquery/blob/main/CHANGELOG.md BIGQUERY_MINIMUM_VERSION = "3.3.5" +BIGQUERY_QUERY_AND_WAIT_VERSION = "3.14.0" PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0" PANDAS_BOOLEAN_DTYPE_VERSION = "1.0.0" PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0" @@ -45,6 +46,13 @@ def bigquery_try_import(self): return google.cloud.bigquery + @property + def bigquery_has_query_and_wait(self): + import packaging.version + + min_version = packaging.version.parse(BIGQUERY_QUERY_AND_WAIT_VERSION) + return self.bigquery_installed_version >= min_version + @property def pandas_installed_version(self): import pandas diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index c54cf592..f93999a9 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -351,12 +351,17 @@ def process_http_error(ex): # See `BigQuery Troubleshooting Errors # `__ - if "cancelled" in ex.message: + message = ( + ex.message.casefold() + if hasattr(ex, "message") and ex.message is not None + else "" + ) + if "cancelled" in message: raise QueryTimeout("Reason: {0}".format(ex)) - elif "Provided Schema does not match" in ex.message: + elif "schema does not match" in message: error_message = ex.errors[0]["message"] raise InvalidSchema(f"Reason: {error_message}") - elif "Already Exists: Table" in ex.message: + elif "already exists: table" in message: error_message = ex.errors[0]["message"] raise TableCreationError(f"Reason: {error_message}") else: @@ -410,16 +415,29 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): self._start_timer() job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict) - rows_iter = pandas_gbq.query.query_and_wait( - self, - self.client, - query, - location=self.location, - project_id=self.project_id, - job_config=job_config, - max_results=max_results, - timeout_ms=timeout_ms, - ) + + if FEATURES.bigquery_has_query_and_wait: + rows_iter = pandas_gbq.query.query_and_wait_via_client_library( + self, + self.client, + query, + location=self.location, + project_id=self.project_id, + job_config=job_config, + max_results=max_results, + timeout_ms=timeout_ms, + ) + else: + rows_iter = pandas_gbq.query.query_and_wait( + self, + self.client, + query, + location=self.location, + project_id=self.project_id, + job_config=job_config, + max_results=max_results, + timeout_ms=timeout_ms, + ) dtypes = kwargs.get("dtypes") return self._download_results( diff --git a/pandas_gbq/query.py b/pandas_gbq/query.py index 35d55920..0b7036f7 100644 --- a/pandas_gbq/query.py +++ b/pandas_gbq/query.py @@ -5,9 +5,11 @@ from __future__ import annotations import concurrent.futures +import functools import logging from typing import Optional +import google.auth.exceptions from google.cloud import bigquery import pandas_gbq.exceptions @@ -78,6 +80,26 @@ def _wait_for_query_job( connector.process_http_error(ex) +def try_query(connector, query_fn): + try: + logger.debug("Requesting query... ") + return query_fn() + except concurrent.futures.TimeoutError as ex: + raise pandas_gbq.exceptions.QueryTimeout("Reason: {0}".format(ex)) + except (google.auth.exceptions.RefreshError, ValueError) as ex: + if connector.private_key: + raise pandas_gbq.exceptions.AccessDenied( + f"The service account credentials are not valid: {ex}" + ) + else: + raise pandas_gbq.exceptions.AccessDenied( + "The credentials have been revoked or expired, " + f"please re-run the application to re-authorize: {ex}" + ) + except connector.http_error as ex: + connector.process_http_error(ex) + + def query_and_wait( connector, client: bigquery.Client, @@ -122,29 +144,17 @@ def query_and_wait( Result iterator from which we can download the results in the desired format (pandas.DataFrame). """ - from google.auth.exceptions import RefreshError - - try: - logger.debug("Requesting query... ") - query_reply = client.query( + query_reply = try_query( + connector, + functools.partial( + client.query, query, job_config=job_config, location=location, project=project_id, - ) - logger.debug("Query running...") - except (RefreshError, ValueError) as ex: - if connector.private_key: - raise pandas_gbq.exceptions.AccessDenied( - f"The service account credentials are not valid: {ex}" - ) - else: - raise pandas_gbq.exceptions.AccessDenied( - "The credentials have been revoked or expired, " - f"please re-run the application to re-authorize: {ex}" - ) - except connector.http_error as ex: - connector.process_http_error(ex) + ), + ) + logger.debug("Query running...") job_id = query_reply.job_id logger.debug("Job ID: %s" % job_id) @@ -173,3 +183,30 @@ def query_and_wait( return query_reply.result(max_results=max_results) except connector.http_error as ex: connector.process_http_error(ex) + + +def query_and_wait_via_client_library( + connector, + client: bigquery.Client, + query: str, + *, + job_config: bigquery.QueryJobConfig, + location: Optional[str], + project_id: Optional[str], + max_results: Optional[int], + timeout_ms: Optional[int], +): + rows_iter = try_query( + connector, + functools.partial( + client.query_and_wait, + query, + job_config=job_config, + location=location, + project=project_id, + max_results=max_results, + wait_timeout=timeout_ms / 1000.0 if timeout_ms else None, + ), + ) + logger.debug("Query done.\n") + return rows_iter diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 7afa4ae9..c024b558 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -426,33 +426,6 @@ def test_query_inside_configuration(self, project_id): ) tm.assert_frame_equal(df, DataFrame({"valid_string": ["PI"]})) - def test_configuration_without_query(self, project_id): - sql_statement = "SELECT 1" - config = { - "copy": { - "sourceTable": { - "projectId": project_id, - "datasetId": "publicdata:samples", - "tableId": "wikipedia", - }, - "destinationTable": { - "projectId": project_id, - "datasetId": "publicdata:samples", - "tableId": "wikipedia_copied", - }, - } - } - # Test that only 'query' configurations are supported - # nor 'copy','load','extract' - with pytest.raises(ValueError): - gbq.read_gbq( - sql_statement, - project_id=project_id, - credentials=self.credentials, - configuration=config, - dialect="legacy", - ) - def test_configuration_raises_value_error_with_multiple_config(self, project_id): sql_statement = "SELECT 1" config = { diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 1cf420f0..6b6ce6a0 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -8,6 +8,7 @@ import google.cloud.bigquery import google.cloud.bigquery.table +import packaging.version import pytest @@ -55,8 +56,15 @@ def test_read_gbq_should_save_credentials(mock_get_credentials): mock_get_credentials.assert_not_called() -def test_read_gbq_should_use_dialect(mock_bigquery_client): +def test_read_gbq_should_use_dialect_with_query(monkeypatch, mock_bigquery_client): import pandas_gbq + import pandas_gbq.features + + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_MINIMUM_VERSION), + ) assert pandas_gbq.context.dialect is None pandas_gbq.context.dialect = "legacy" @@ -71,3 +79,35 @@ def test_read_gbq_should_use_dialect(mock_bigquery_client): _, kwargs = mock_bigquery_client.query.call_args assert not kwargs["job_config"].use_legacy_sql pandas_gbq.context.dialect = None # Reset the global state. + + +def test_read_gbq_should_use_dialect_with_query_and_wait( + monkeypatch, mock_bigquery_client +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + import pandas_gbq + import pandas_gbq.features + + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_QUERY_AND_WAIT_VERSION), + ) + + assert pandas_gbq.context.dialect is None + pandas_gbq.context.dialect = "legacy" + pandas_gbq.read_gbq("SELECT 1") + + _, kwargs = mock_bigquery_client.query_and_wait.call_args + assert kwargs["job_config"].use_legacy_sql + + pandas_gbq.context.dialect = "standard" + pandas_gbq.read_gbq("SELECT 1") + + _, kwargs = mock_bigquery_client.query_and_wait.call_args + assert not kwargs["job_config"].use_legacy_sql + pandas_gbq.context.dialect = None # Reset the global state. diff --git a/tests/unit/test_features.py b/tests/unit/test_features.py index 8f17c78f..947cb8ae 100644 --- a/tests/unit/test_features.py +++ b/tests/unit/test_features.py @@ -13,6 +13,23 @@ def fresh_bigquery_version(monkeypatch): monkeypatch.setattr(FEATURES, "_pandas_installed_version", None) +@pytest.mark.parametrize( + ["bigquery_version", "expected"], + [ + ("1.99.100", False), + ("2.99.999", False), + ("3.13.11", False), + ("3.14.0", True), + ("4.999.999", True), + ], +) +def test_bigquery_has_query_and_wait(monkeypatch, bigquery_version, expected): + import google.cloud.bigquery + + monkeypatch.setattr(google.cloud.bigquery, "__version__", bigquery_version) + assert FEATURES.bigquery_has_query_and_wait == expected + + @pytest.mark.parametrize( ["pandas_version", "expected"], [ diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 423589b1..ff8508c9 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -9,12 +9,15 @@ from unittest import mock import google.api_core.exceptions +import google.cloud.bigquery import numpy +import packaging.version import pandas from pandas import DataFrame import pytest from pandas_gbq import gbq +import pandas_gbq.features from pandas_gbq.features import FEATURES @@ -40,29 +43,40 @@ def mock_query_job(): return mock_query -@pytest.fixture(autouse=True) -def default_bigquery_client(mock_bigquery_client, mock_query_job): +@pytest.fixture +def mock_row_iterator(mock_bigquery_client): mock_rows = mock.create_autospec(google.cloud.bigquery.table.RowIterator) mock_rows.total_rows = 1 - mock_rows.__iter__.return_value = [(1,)] - mock_query_job.result.return_value = mock_rows - mock_bigquery_client.list_rows.return_value = mock_rows - mock_bigquery_client.query.return_value = mock_query_job # Mock out SELECT 1 query results. def generate_schema(): - query = ( - mock_bigquery_client.query.call_args[0][0] - if mock_bigquery_client.query.call_args - else "" - ) + if mock_bigquery_client.query.call_args: + query = mock_bigquery_client.query.call_args[0][0] + elif ( + hasattr(mock_bigquery_client, "query_and_wait") + and mock_bigquery_client.query_and_wait.call_args + ): + query = mock_bigquery_client.query_and_wait.call_args[0][0] + else: + query = "" if query == "SELECT 1 AS int_col": return [google.cloud.bigquery.SchemaField("int_col", "INTEGER")] else: return [google.cloud.bigquery.SchemaField("_f0", "INTEGER")] type(mock_rows).schema = mock.PropertyMock(side_effect=generate_schema) + return mock_rows + + +@pytest.fixture(autouse=True) +def default_bigquery_client(mock_bigquery_client, mock_query_job, mock_row_iterator): + mock_query_job.result.return_value = mock_row_iterator + mock_bigquery_client.list_rows.return_value = mock_row_iterator + mock_bigquery_client.query.return_value = mock_query_job + + if hasattr(mock_bigquery_client, "query_and_wait"): + mock_bigquery_client.query_and_wait.return_value = mock_row_iterator # Mock out get_table. def get_table(table_ref_or_id, **kwargs): @@ -441,15 +455,51 @@ def test_read_gbq_with_no_project_id_given_should_fail(monkeypatch): gbq.read_gbq("SELECT 1", dialect="standard") -def test_read_gbq_with_inferred_project_id(mock_bigquery_client): +def test_read_gbq_with_inferred_project_id_with_query( + monkeypatch, mock_bigquery_client, mock_query_job +): + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_MINIMUM_VERSION), + ) + type(mock_query_job).cache_hit = mock.PropertyMock(return_value=False) + type(mock_query_job).total_bytes_billed = mock.PropertyMock(return_value=10_000_000) + type(mock_query_job).total_bytes_processed = mock.PropertyMock(return_value=12345) + df = gbq.read_gbq("SELECT 1", dialect="standard") assert df is not None mock_bigquery_client.query.assert_called_once() -def test_read_gbq_with_inferred_project_id_from_service_account_credentials( - mock_bigquery_client, mock_service_account_credentials +def test_read_gbq_with_inferred_project_id_with_query_and_wait( + monkeypatch, mock_bigquery_client +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_QUERY_AND_WAIT_VERSION), + ) + + df = gbq.read_gbq("SELECT 1", dialect="standard") + assert df is not None + mock_bigquery_client.query_and_wait.assert_called_once() + + +def test_read_gbq_with_inferred_project_id_from_service_account_credentials_with_query( + monkeypatch, mock_bigquery_client, mock_service_account_credentials ): + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_MINIMUM_VERSION), + ) + mock_service_account_credentials.project_id = "service_account_project_id" df = gbq.read_gbq( "SELECT 1", @@ -465,6 +515,37 @@ def test_read_gbq_with_inferred_project_id_from_service_account_credentials( ) +def test_read_gbq_with_inferred_project_id_from_service_account_credentials_with_query_and_wait( + monkeypatch, mock_bigquery_client, mock_service_account_credentials +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_QUERY_AND_WAIT_VERSION), + ) + + mock_service_account_credentials.project_id = "service_account_project_id" + df = gbq.read_gbq( + "SELECT 1", + dialect="standard", + credentials=mock_service_account_credentials, + ) + assert df is not None + mock_bigquery_client.query_and_wait.assert_called_once_with( + "SELECT 1", + job_config=mock.ANY, + location=None, + project="service_account_project_id", + max_results=None, + wait_timeout=None, + ) + + def test_read_gbq_without_inferred_project_id_from_compute_engine_credentials( mock_compute_engine_credentials, ): @@ -481,12 +562,42 @@ def test_read_gbq_with_max_results_zero(monkeypatch): assert df is None -def test_read_gbq_with_max_results_ten(monkeypatch, mock_query_job): +def test_read_gbq_with_max_results_ten_query(monkeypatch, mock_query_job): + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_MINIMUM_VERSION), + ) df = gbq.read_gbq("SELECT 1", dialect="standard", max_results=10) assert df is not None mock_query_job.result.assert_called_with(max_results=10) +def test_read_gbq_with_max_results_ten_query_and_wait( + monkeypatch, mock_bigquery_client +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + monkeypatch.setattr( + pandas_gbq.features.FEATURES, + "_bigquery_installed_version", + packaging.version.parse(pandas_gbq.features.BIGQUERY_QUERY_AND_WAIT_VERSION), + ) + df = gbq.read_gbq("SELECT 1", dialect="standard", max_results=10) + assert df is not None + mock_bigquery_client.query_and_wait.assert_called_with( + "SELECT 1", + job_config=mock.ANY, + location=mock.ANY, + project=mock.ANY, + max_results=10, + wait_timeout=None, + ) + + @pytest.mark.parametrize(["verbose"], [(True,), (False,)]) def test_read_gbq_with_verbose_new_pandas_warns_deprecation(monkeypatch, verbose): monkeypatch.setattr( @@ -509,8 +620,6 @@ def test_read_gbq_wo_verbose_w_new_pandas_no_warnings(monkeypatch, recwarn): def test_read_gbq_with_old_bq_raises_importerror(monkeypatch): - import google.cloud.bigquery - monkeypatch.setattr(google.cloud.bigquery, "__version__", "0.27.0") monkeypatch.setattr(FEATURES, "_bigquery_installed_version", None) with pytest.raises(ImportError, match="google-cloud-bigquery"): @@ -665,7 +774,7 @@ def test_load_modifies_schema(mock_bigquery_client): assert new_schema == new_schema_cp -def test_read_gbq_passes_dtypes(mock_bigquery_client, mock_service_account_credentials): +def test_read_gbq_passes_dtypes(mock_service_account_credentials, mock_row_iterator): mock_service_account_credentials.project_id = "service_account_project_id" df = gbq.read_gbq( "SELECT 1 AS int_col", @@ -675,14 +784,13 @@ def test_read_gbq_passes_dtypes(mock_bigquery_client, mock_service_account_crede ) assert df is not None - mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100) - - _, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args + _, to_dataframe_kwargs = mock_row_iterator.to_dataframe.call_args assert to_dataframe_kwargs["dtypes"] == {"int_col": "my-custom-dtype"} def test_read_gbq_use_bqstorage_api( - mock_bigquery_client, mock_service_account_credentials + mock_service_account_credentials, + mock_row_iterator, ): mock_service_account_credentials.project_id = "service_account_project_id" df = gbq.read_gbq( @@ -693,15 +801,14 @@ def test_read_gbq_use_bqstorage_api( ) assert df is not None - mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100) - mock_list_rows.to_dataframe.assert_called_once_with( + mock_row_iterator.to_dataframe.assert_called_once_with( create_bqstorage_client=True, dtypes=mock.ANY, progress_bar_type=mock.ANY, ) -def test_read_gbq_calls_tqdm(mock_bigquery_client, mock_service_account_credentials): +def test_read_gbq_calls_tqdm(mock_service_account_credentials, mock_row_iterator): mock_service_account_credentials.project_id = "service_account_project_id" df = gbq.read_gbq( "SELECT 1", @@ -711,9 +818,7 @@ def test_read_gbq_calls_tqdm(mock_bigquery_client, mock_service_account_credenti ) assert df is not None - mock_list_rows = mock_bigquery_client.list_rows("dest", max_results=100) - - _, to_dataframe_kwargs = mock_list_rows.to_dataframe.call_args + _, to_dataframe_kwargs = mock_row_iterator.to_dataframe.call_args assert to_dataframe_kwargs["progress_bar_type"] == "foobar" diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index b9a64535..5b631634 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -9,6 +9,8 @@ from unittest import mock import freezegun +import google.api_core.exceptions +import google.auth.exceptions import google.cloud.bigquery import pytest @@ -21,6 +23,128 @@ def _make_connector(project_id: str = "some-project", **kwargs): return pandas_gbq.gbq.GbqConnector(project_id, **kwargs) +def test_query_and_wait_via_client_library_apierror_raises_genericgbqexception( + mock_bigquery_client, +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + connector = _make_connector() + connector.client = mock_bigquery_client + mock_bigquery_client.query_and_wait.side_effect = ( + google.api_core.exceptions.GoogleAPIError() + ) + + with pytest.raises(pandas_gbq.exceptions.GenericGBQException): + module_under_test.query_and_wait_via_client_library( + connector, + mock_bigquery_client, + "SELECT 1", + job_config=google.cloud.bigquery.QueryJobConfig(), + location=None, + project_id=None, + max_results=None, + timeout_ms=None, + ) + + +def test_query_and_wait_via_client_library_refresherror_raises_accessdenied_service_account( + mock_bigquery_client, +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + connector = _make_connector() + connector.private_key = "abc" + connector.client = mock_bigquery_client + mock_bigquery_client.query_and_wait.side_effect = ( + google.auth.exceptions.RefreshError() + ) + + with pytest.raises(pandas_gbq.exceptions.AccessDenied, match="service account"): + module_under_test.query_and_wait_via_client_library( + connector, + mock_bigquery_client, + "SELECT 1", + job_config=google.cloud.bigquery.QueryJobConfig(), + location=None, + project_id=None, + max_results=None, + timeout_ms=None, + ) + + +def test_query_and_wait_via_client_library_refresherror_raises_accessdenied_user_credentials( + mock_bigquery_client, +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + connector = _make_connector() + connector.client = mock_bigquery_client + mock_bigquery_client.query_and_wait.side_effect = ( + google.auth.exceptions.RefreshError() + ) + + with pytest.raises(pandas_gbq.exceptions.AccessDenied, match="revoked or expired"): + module_under_test.query_and_wait_via_client_library( + connector, + mock_bigquery_client, + "SELECT 1", + job_config=google.cloud.bigquery.QueryJobConfig(), + location=None, + project_id=None, + max_results=None, + timeout_ms=None, + ) + + +def test_query_and_wait_via_client_library_timeout_raises_querytimeout( + mock_bigquery_client, +): + if not hasattr(mock_bigquery_client, "query_and_wait"): + pytest.skip( + f"google-cloud-bigquery {google.cloud.bigquery.__version__} does not have query_and_wait" + ) + + connector = _make_connector() + connector.client = mock_bigquery_client + connector.start = datetime.datetime(2020, 1, 1).timestamp() + + mock_bigquery_client.query_and_wait.side_effect = concurrent.futures.TimeoutError( + "fake timeout" + ) + + with freezegun.freeze_time( + "2020-01-01 00:00:00", auto_tick_seconds=15 + ), pytest.raises(pandas_gbq.exceptions.QueryTimeout): + module_under_test.query_and_wait_via_client_library( + connector, + mock_bigquery_client, + "SELECT 1", + job_config=google.cloud.bigquery.QueryJobConfig(), + location="EU", + project_id="test-query-and-wait", + max_results=123, + timeout_ms=500, + ) + + mock_bigquery_client.query_and_wait.assert_called_with( + "SELECT 1", + job_config=mock.ANY, + location="EU", + project="test-query-and-wait", + max_results=123, + wait_timeout=0.5, + ) + + @pytest.mark.parametrize( ["size_in_bytes", "formatted_text"], [