Skip to content

Commit ef7eb8f

Browse files
authored
Feat(cloud): Add check feature for Cloud connectors (#562)
1 parent 7e65ab3 commit ef7eb8f

File tree

5 files changed

+372
-41
lines changed

5 files changed

+372
-41
lines changed

airbyte/_util/api_util.py

+144-19
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
from __future__ import annotations
1515

1616
import json
17-
from typing import TYPE_CHECKING, Any
17+
from typing import TYPE_CHECKING, Any, Literal
1818

1919
import airbyte_api
20+
import requests
2021
from airbyte_api import api, models
2122

2223
from airbyte.exceptions import (
@@ -26,6 +27,11 @@
2627
AirbyteMultipleResourcesError,
2728
PyAirbyteInputError,
2829
)
30+
from airbyte.secrets.base import SecretString
31+
32+
33+
if TYPE_CHECKING:
34+
from collections.abc import Callable
2935

3036

3137
if TYPE_CHECKING:
@@ -35,19 +41,38 @@
3541
DestinationConfiguration,
3642
)
3743

38-
from airbyte.secrets.base import SecretString
39-
4044

4145
JOB_WAIT_INTERVAL_SECS = 2.0
4246
JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour
4347
CLOUD_API_ROOT = "https://api.airbyte.com/v1"
48+
"""The Airbyte Cloud API root URL.
49+
50+
This is the root URL for the Airbyte Cloud API. It is used to interact with the Airbyte Cloud API
51+
and is the default API root for the `CloudWorkspace` class.
52+
- https://reference.airbyte.com/reference/getting-started
53+
"""
54+
CLOUD_CONFIG_API_ROOT = "https://cloud.airbyte.com/api/v1"
55+
"""Internal-Use API Root, aka Airbyte "Config API".
56+
57+
Documentation:
58+
- https://docs.airbyte.com/api-documentation#configuration-api-deprecated
59+
- https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml
60+
"""
4461

4562

4663
def status_ok(status_code: int) -> bool:
4764
"""Check if a status code is OK."""
4865
return status_code >= 200 and status_code < 300 # noqa: PLR2004 # allow inline magic numbers
4966

5067

68+
def get_config_api_root(api_root: str) -> str:
69+
"""Get the configuration API root from the main API root."""
70+
if api_root == CLOUD_API_ROOT:
71+
return CLOUD_CONFIG_API_ROOT
72+
73+
raise NotImplementedError("Configuration API root not implemented for this API root.")
74+
75+
5176
def get_airbyte_server_instance(
5277
*,
5378
api_root: str,
@@ -78,7 +103,7 @@ def get_workspace(
78103
client_id: SecretString,
79104
client_secret: SecretString,
80105
) -> models.WorkspaceResponse:
81-
"""Get a connection."""
106+
"""Get a workspace object."""
82107
airbyte_instance = get_airbyte_server_instance(
83108
api_root=api_root,
84109
client_id=client_id,
@@ -113,7 +138,7 @@ def list_connections(
113138
name: str | None = None,
114139
name_filter: Callable[[str], bool] | None = None,
115140
) -> list[models.ConnectionResponse]:
116-
"""Get a connection."""
141+
"""List connections."""
117142
if name and name_filter:
118143
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")
119144

@@ -155,7 +180,7 @@ def list_workspaces(
155180
name: str | None = None,
156181
name_filter: Callable[[str], bool] | None = None,
157182
) -> list[models.WorkspaceResponse]:
158-
"""Get a connection."""
183+
"""List workspaces."""
159184
if name and name_filter:
160185
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")
161186

@@ -196,7 +221,7 @@ def list_sources(
196221
name: str | None = None,
197222
name_filter: Callable[[str], bool] | None = None,
198223
) -> list[models.SourceResponse]:
199-
"""Get a connection."""
224+
"""List sources."""
200225
if name and name_filter:
201226
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")
202227

@@ -234,7 +259,7 @@ def list_destinations(
234259
name: str | None = None,
235260
name_filter: Callable[[str], bool] | None = None,
236261
) -> list[models.DestinationResponse]:
237-
"""Get a connection."""
262+
"""List destinations."""
238263
if name and name_filter:
239264
raise PyAirbyteInputError(message="You can provide name or name_filter, but not both.")
240265

@@ -720,16 +745,116 @@ def delete_connection(
720745
)
721746

722747

723-
# Not yet implemented
748+
# Functions for leveraging the Airbyte Config API (may not be supported or stable)
749+
750+
751+
def get_bearer_token(
752+
*,
753+
client_id: SecretString,
754+
client_secret: SecretString,
755+
api_root: str = CLOUD_API_ROOT,
756+
) -> SecretString:
757+
"""Get a bearer token.
758+
759+
https://reference.airbyte.com/reference/createaccesstoken
760+
761+
"""
762+
response = requests.post(
763+
url=api_root + "/applications/token",
764+
headers={
765+
"content-type": "application/json",
766+
"accept": "application/json",
767+
},
768+
json={
769+
"client_id": client_id,
770+
"client_secret": client_secret,
771+
},
772+
)
773+
if not status_ok(response.status_code):
774+
response.raise_for_status()
775+
776+
return SecretString(response.json()["access_token"])
777+
778+
779+
def _make_config_api_request(
780+
*,
781+
api_root: str,
782+
path: str,
783+
json: dict[str, Any],
784+
client_id: SecretString,
785+
client_secret: SecretString,
786+
) -> dict[str, Any]:
787+
config_api_root = get_config_api_root(api_root)
788+
bearer_token = get_bearer_token(
789+
client_id=client_id,
790+
client_secret=client_secret,
791+
api_root=api_root,
792+
)
793+
headers: dict[str, Any] = {
794+
"Content-Type": "application/json",
795+
"Authorization": f"Bearer {bearer_token}",
796+
"User-Agent": "PyAirbyte Client",
797+
}
798+
response = requests.request(
799+
method="POST",
800+
url=config_api_root + path,
801+
headers=headers,
802+
json=json,
803+
)
804+
if not status_ok(response.status_code):
805+
try:
806+
response.raise_for_status()
807+
except requests.HTTPError as ex:
808+
raise AirbyteError(
809+
context={
810+
"url": response.request.url,
811+
"body": response.request.body,
812+
"response": response.__dict__,
813+
},
814+
) from ex
815+
816+
return response.json()
817+
818+
819+
def check_connector(
820+
*,
821+
actor_id: str,
822+
connector_type: Literal["source", "destination"],
823+
client_id: SecretString,
824+
client_secret: SecretString,
825+
workspace_id: str | None = None,
826+
api_root: str = CLOUD_API_ROOT,
827+
) -> tuple[bool, str | None]:
828+
"""Check a source.
829+
830+
Raises an exception if the check fails. Uses one of these endpoints:
831+
832+
- /v1/sources/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1409
833+
- /v1/destinations/check_connection: https://github.com/airbytehq/airbyte-platform-internal/blob/10bb92e1745a282e785eedfcbed1ba72654c4e4e/oss/airbyte-api/server-api/src/main/openapi/config.yaml#L1995
834+
"""
835+
_ = workspace_id # Not used (yet)
836+
837+
json_result = _make_config_api_request(
838+
path=f"/{connector_type}s/check_connection",
839+
json={
840+
f"{connector_type}Id": actor_id,
841+
},
842+
api_root=api_root,
843+
client_id=client_id,
844+
client_secret=client_secret,
845+
)
846+
result, message = json_result.get("status"), json_result.get("message")
847+
848+
if result == "succeeded":
849+
return True, None
724850

851+
if result == "failed":
852+
return False, message
725853

726-
# def check_source(
727-
# source_id: str,
728-
# *,
729-
# api_root: str,
730-
# api_key: str,
731-
# workspace_id: str | None = None,
732-
# ) -> api.SourceCheckResponse:
733-
# """Check a source."""
734-
# _ = source_id, workspace_id, api_root, api_key
735-
# raise NotImplementedError
854+
raise AirbyteError(
855+
context={
856+
"actor_id": actor_id,
857+
"connector_type": connector_type,
858+
"response": json_result,
859+
},
860+
)

airbyte/cloud/__init__.py

+2-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
66
## Examples
77
8-
### Basic Usage Example:
8+
### Basic Sync Example:
99
1010
```python
1111
import airbyte as ab
@@ -47,12 +47,7 @@
4747
for record in dataset:
4848
print(record)
4949
```
50-
51-
ℹ️ **Experimental Features**
52-
53-
You can use the `airbyte.cloud.experimental` module to access experimental features.
54-
These additional features are subject to change and may not be available in all environments.
55-
""" # noqa: RUF002 # Allow emoji
50+
"""
5651

5752
from __future__ import annotations
5853

airbyte/cloud/connectors.py

+98-1
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,84 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2-
"""Cloud connectors module for working with Cloud sources and destinations."""
2+
"""Cloud connectors module for working with Cloud sources and destinations.
3+
4+
This module provides classes for working with Cloud sources and destinations. Rather
5+
than creating `CloudConnector` objects directly, it is recommended to use the
6+
`airbyte.cloud.workspaces` module to create and manage cloud connector objects.
7+
8+
Classes:
9+
- `CloudConnector`: A cloud connector object.
10+
- `CloudSource`: A cloud source object.
11+
- `CloudDestination`: A cloud destination object.
12+
13+
## Usage Examples
14+
15+
Obtain a cloud source object and run a `check` on it:
16+
17+
```python
18+
from airbyte.cloud import CloudWorkspace
19+
20+
workspace = CloudWorkspace(
21+
workspace_id="...",
22+
client_id="...",
23+
client_secret="...",
24+
)
25+
26+
# Get the cloud source object
27+
cloud_source = workspace.get_source("...")
28+
29+
# Check the source configuration and credentials
30+
check_result = cloud_source.check()
31+
if check_result:
32+
# Truthy if the check was successful
33+
print("Check successful")
34+
else:
35+
# Stringify the check result to get the error message
36+
print(f"Check failed: {check_result}")
37+
```
38+
"""
339

440
from __future__ import annotations
541

642
import abc
43+
from dataclasses import dataclass
744
from typing import TYPE_CHECKING, ClassVar, Literal
845

46+
from airbyte._util import api_util
47+
948

1049
if TYPE_CHECKING:
1150
from airbyte.cloud.workspaces import CloudWorkspace
1251

1352

53+
@dataclass
54+
class CheckResult:
55+
"""A cloud check result object."""
56+
57+
success: bool
58+
"""Whether the check result is valid."""
59+
60+
error_message: str | None = None
61+
"""None if the check was successful. Otherwise the failure message from the check result."""
62+
63+
internal_error: str | None = None
64+
"""None if the check was able to be run. Otherwise, this will describe the internal failure."""
65+
66+
def __bool__(self) -> bool:
67+
"""Truthy when check was successful."""
68+
return self.success
69+
70+
def __str__(self) -> str:
71+
"""Get a string representation of the check result."""
72+
return "Success" if self.success else f"Failed: {self.error_message}"
73+
74+
def __repr__(self) -> str:
75+
"""Get a string representation of the check result."""
76+
return (
77+
f"CheckResult(success={self.success}, "
78+
f"error_message={self.error_message or self.internal_error})"
79+
)
80+
81+
1482
class CloudConnector(abc.ABC):
1583
"""A cloud connector is a deployed source or destination on Airbyte Cloud.
1684
@@ -43,6 +111,35 @@ def permanently_delete(self) -> None:
43111
else:
44112
self.workspace.permanently_delete_destination(self.connector_id)
45113

114+
def check(
115+
self,
116+
*,
117+
raise_on_error: bool = True,
118+
) -> CheckResult:
119+
"""Check the connector.
120+
121+
Returns:
122+
A `CheckResult` object containing the result. The object is truthy if the check was
123+
successful and falsy otherwise. The error message is available in the `error_message`
124+
or by converting the object to a string.
125+
"""
126+
result = api_util.check_connector(
127+
workspace_id=self.workspace.workspace_id,
128+
connector_type=self.connector_type,
129+
actor_id=self.connector_id,
130+
api_root=self.workspace.api_root,
131+
client_id=self.workspace.client_id,
132+
client_secret=self.workspace.client_secret,
133+
)
134+
check_result = CheckResult(
135+
success=result[0],
136+
error_message=result[1],
137+
)
138+
if raise_on_error and not check_result:
139+
raise ValueError(f"Check failed: {check_result}")
140+
141+
return check_result
142+
46143

47144
class CloudSource(CloudConnector):
48145
"""A cloud source is a source that is deployed on Airbyte Cloud."""

0 commit comments

Comments
 (0)