Skip to content

Commit dba7f02

Browse files
authored
Feat: BigQuery service account impersonation auth method (#3940)
1 parent d8c9b66 commit dba7f02

File tree

4 files changed

+28
-5
lines changed

4 files changed

+28
-5
lines changed

docs/integrations/engines/bigquery.md

+8-3
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,15 @@ This creates a gateway named `bigquery` and makes it your project's default gate
7171

7272
It uses the [`oauth` authentication method](#authentication-methods), which does not specify a username or other information directly in the connection configuration. Other authentication methods are [described below](#authentication-methods).
7373

74-
In BigQuery, navigate to the dashboard and select the BigQuery project your SQLMesh project will use. From the Google Cloud dashboard, use the arrow to open the pop-up menu:
74+
In BigQuery, navigate to the dashboard and select the BigQuery project your SQLMesh project will use. From the Google Cloud dashboard, use the arrow to open the pop-up menu:
7575

7676
![BigQuery Dashboard](./bigquery/bigquery-1.png)
7777

78-
Now we can identify the project ID needed in the `config.yaml` gateway specification above. Select the project that you want to work with, the project ID that you need to add to your yaml file is the ID label from the pop-up menu.
78+
Now we can identify the project ID needed in the `config.yaml` gateway specification above. Select the project that you want to work with, the project ID that you need to add to your yaml file is the ID label from the pop-up menu.
7979

8080
![BigQuery Dashboard: selecting your project](./bigquery/bigquery-2.png)
8181

82-
For this guide, the Docs-Demo is the one we will use, thus the project ID for this example is `healthy-life-440919-s0`.
82+
For this guide, the Docs-Demo is the one we will use, thus the project ID for this example is `healthy-life-440919-s0`.
8383

8484
## Usage
8585

@@ -158,6 +158,7 @@ pip install "sqlmesh[bigquery]"
158158
| `client_secret` | OAuth 2.0 client secret | string | N |
159159
| `token_uri` | OAuth 2.0 authorization server's toke endpoint URI | string | N |
160160
| `scopes` | The scopes used to obtain authorization | list | N |
161+
| `impersonated_service_account` | The service account to impersonate | string | N |
161162
| `job_creation_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to be created. | int | N |
162163
| `job_execution_timeout_seconds` | The maximum amount of time, in seconds, to wait for the underlying job to complete. | int | N |
163164
| `job_retries` | The number of times to retry the underlying job if it fails. (Default: `1`) | int | N |
@@ -227,6 +228,10 @@ sqlmesh_airflow = SQLMeshAirflow(
227228
- Related Credential Configuration:
228229
- `keyfile_json` (Required)
229230
- `scopes` (Optional)
231+
- [service-account-impersonation](https://google-auth.readthedocs.io/en/latest/reference/google.auth.impersonated_credentials.html)
232+
- Related Credential Configuration:
233+
- `impersonated_service_account` (Required)
234+
- `scopes` (Optional)
230235

231236
## Permissions Required
232237
With any of the above connection methods, ensure these BigQuery permissions are enabled to allow SQLMesh to work correctly.

sqlmesh/core/config/connection.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,7 @@ class BigQueryConnectionMethod(str, Enum):
817817
OAUTH_SECRETS = "oauth-secrets"
818818
SERVICE_ACCOUNT = "service-account"
819819
SERVICE_ACCOUNT_JSON = "service-account-json"
820+
SERVICE_ACCOUNT_IMPERSONATION = "service-account-impersonation"
820821

821822

822823
class BigQueryPriority(str, Enum):
@@ -861,8 +862,9 @@ class BigQueryConnectionConfig(ConnectionConfig):
861862
client_secret: t.Optional[str] = None
862863
token_uri: t.Optional[str] = None
863864
scopes: t.Tuple[str, ...] = ("https://www.googleapis.com/auth/bigquery",)
864-
job_creation_timeout_seconds: t.Optional[int] = None
865+
impersonated_service_account: t.Optional[str] = None
865866
# Extra Engine Config
867+
job_creation_timeout_seconds: t.Optional[int] = None
866868
job_execution_timeout_seconds: t.Optional[int] = None
867869
job_retries: t.Optional[int] = 1
868870
job_retry_deadline_seconds: t.Optional[int] = None
@@ -924,6 +926,16 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
924926
creds = service_account.Credentials.from_service_account_info(
925927
self.keyfile_json, scopes=self.scopes
926928
)
929+
elif self.method == BigQueryConnectionMethod.SERVICE_ACCOUNT_IMPERSONATION:
930+
from google.auth import impersonated_credentials
931+
932+
default_creds, _ = google.auth.default()
933+
934+
creds = impersonated_credentials.Credentials(
935+
source_credentials=default_creds,
936+
target_principal=self.impersonated_service_account,
937+
target_scopes=self.scopes,
938+
)
927939
elif self.method == BigQueryConnectionMethod.OAUTH_SECRETS:
928940
creds = credentials.Credentials(
929941
token=self.token,

sqlmesh/dbt/target.py

+6
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,8 @@ class BigQueryConfig(TargetConfig):
508508
client_secret: The BigQuery client secret
509509
token_uri: The BigQuery token URI
510510
scopes: The BigQuery scopes
511+
impersonated_service_account: The service account to impersonate
512+
job_creation_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to be created
511513
job_execution_timeout_seconds: The maximum amount of time, in seconds, to wait for the underlying job to complete
512514
timeout_seconds: Alias for job_execution_timeout_seconds
513515
job_retries: The number of times to retry the underlying job if it fails
@@ -536,6 +538,8 @@ class BigQueryConfig(TargetConfig):
536538
"https://www.googleapis.com/auth/cloud-platform",
537539
"https://www.googleapis.com/auth/drive",
538540
)
541+
impersonated_service_account: t.Optional[str] = None
542+
job_creation_timeout_seconds: t.Optional[int] = None
539543
job_execution_timeout_seconds: t.Optional[int] = None
540544
timeout_seconds: t.Optional[int] = None # To support legacy config
541545
job_retries: t.Optional[int] = None
@@ -596,6 +600,8 @@ def to_sqlmesh(self, **kwargs: t.Any) -> ConnectionConfig:
596600
client_secret=self.client_secret,
597601
token_uri=self.token_uri,
598602
scopes=self.scopes,
603+
impersonated_service_account=self.impersonated_service_account,
604+
job_creation_timeout_seconds=self.job_creation_timeout_seconds,
599605
job_execution_timeout_seconds=job_execution_timeout_seconds,
600606
job_retries=job_retries,
601607
job_retry_deadline_seconds=self.job_retry_deadline_seconds,

tests/cli/test_cli.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ def test_plan_dlt(runner, tmp_path):
952952
def test_init_project_dialects(tmp_path):
953953
dialect_to_config = {
954954
"redshift": "# concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # user: \n # password: \n # database: \n # host: \n # port: \n # source_address: \n # unix_sock: \n # ssl: \n # sslmode: \n # timeout: \n # tcp_keepalive: \n # application_name: \n # preferred_role: \n # principal_arn: \n # credentials_provider: \n # region: \n # cluster_identifier: \n # iam: \n # is_serverless: \n # serverless_acct_id: \n # serverless_work_group: \n # enable_merge: ",
955-
"bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ",
955+
"bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ",
956956
"snowflake": "account: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # user: \n # password: \n # warehouse: \n # database: \n # role: \n # authenticator: \n # token: \n # application: Tobiko_SQLMesh\n # private_key: \n # private_key_path: \n # private_key_passphrase: \n # session_parameters: ",
957957
"databricks": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # server_hostname: \n # http_path: \n # access_token: \n # auth_type: \n # oauth_client_id: \n # oauth_client_secret: \n # catalog: \n # http_headers: \n # session_configuration: \n # databricks_connect_server_hostname: \n # databricks_connect_access_token: \n # databricks_connect_cluster_id: \n # databricks_connect_use_serverless: False\n # force_databricks_connect: False\n # disable_databricks_connect: False\n # disable_spark_session: False",
958958
"postgres": "host: \n user: \n password: \n port: \n database: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: True\n # pretty_sql: False\n # keepalives_idle: \n # connect_timeout: 10\n # role: \n # sslmode: ",

0 commit comments

Comments
 (0)