Skip to content

Commit bd4953c

Browse files
authored
Merge pull request #11 from dataiku/feature/sc-68121-implement-rfc5988-pagination
[sc-68121] implement rfc5988 pagination
2 parents 0a785c3 + 518eefa commit bd4953c

File tree

12 files changed

+188
-37
lines changed

12 files changed

+188
-37
lines changed

CHANGELOG.md

+9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
# Changelog
22

3+
4+
## [Version 1.0.3](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.0.3) - Bugfix and feature release - 2021-11-23
5+
6+
- Fixes error raised on HTTP 204 status codes
7+
- Adds requests performance indicator to output datasets
8+
- Data extraction key is replaced by a path
9+
- Fixes JSON formatting issues
10+
- Implements RFC5988 for pagination
11+
312
## [Version 1.0.2](https://github.com/dataiku/dss-plugin-api-connect/releases/tag/v1.0.2) - Bugfix release - 2021-05-25
413

514
- Fixed recipe ignoring the selected http_method

custom-recipes/api-connect/recipe.json

+14
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,13 @@
226226
"type": "BOOLEAN",
227227
"defaultValue": false
228228
},
229+
{
230+
"name": "display_metadata",
231+
"label": "Display metadata",
232+
"description": "Status code, request time...",
233+
"type": "BOOLEAN",
234+
"defaultValue": false
235+
},
229236
{
230237
"name": "timeout",
231238
"label": "Timeout (s)",
@@ -239,6 +246,13 @@
239246
"description": "-1 for no limit",
240247
"type": "INT",
241248
"defaultValue": -1
249+
},
250+
{
251+
"name": "maximum_number_rows",
252+
"label": "Maximum number of rows",
253+
"description": "-1 for no limit",
254+
"type": "INT",
255+
"defaultValue": -1
242256
}
243257
],
244258
"resourceKeys": []

custom-recipes/api-connect/recipe.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def get_partitioning_keys(id_list, dku_flow_variables):
4040
raise ValueError("There is no parameter column selected.")
4141
parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {}))
4242
custom_key_values = get_dku_key_values(config.get("custom_key_values", {}))
43+
display_metadata = config.get("display_metadata", False)
44+
maximum_number_rows = config.get("maximum_number_rows", -1)
4345
input_parameters_dataset = dataiku.Dataset(input_A_names[0])
4446
partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables)
4547
custom_key_values.update(partitioning_keys)
@@ -51,7 +53,9 @@ def get_partitioning_keys(id_list, dku_flow_variables):
5153
endpoint_parameters,
5254
extraction_key,
5355
parameter_columns,
54-
parameter_renamings
56+
parameter_renamings,
57+
display_metadata,
58+
maximum_number_rows=maximum_number_rows
5559
)
5660
results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output)
5761

plugin.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"id": "api-connect",
3-
"version": "1.0.2",
3+
"version": "1.0.3",
44
"meta": {
55
"label": "API Connect",
66
"description": "Retrieve data from any REST API",

python-connectors/api-connect_dataset/connector.json

+16-2
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@
110110
},
111111
{
112112
"name": "extraction_key",
113-
"label": "Key to data array (optional)",
114-
"description": "",
113+
"label": "Path to data array (optional)",
114+
"description": "Dot separated key path",
115115
"defaultValue": null,
116116
"type": "STRING"
117117
},
@@ -186,6 +186,13 @@
186186
"type": "BOOLEAN",
187187
"defaultValue": false
188188
},
189+
{
190+
"name": "display_metadata",
191+
"label": "Display metadata",
192+
"description": "Status code, request time...",
193+
"type": "BOOLEAN",
194+
"defaultValue": false
195+
},
189196
{
190197
"name": "timeout",
191198
"label": "Timeout (s)",
@@ -199,6 +206,13 @@
199206
"description": "-1 for no limit",
200207
"type": "INT",
201208
"defaultValue": -1
209+
},
210+
{
211+
"name": "maximum_number_rows",
212+
"label": "Maximum number of rows",
213+
"description": "-1 for no limit",
214+
"type": "INT",
215+
"defaultValue": -1
202216
}
203217
]
204218
}

python-connectors/api-connect_dataset/connector.py

+31-19
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
from dataikuapi.utils import DataikuException
33
from safe_logger import SafeLogger
44
from rest_api_client import RestAPIClient
5-
from dku_utils import get_dku_key_values, get_endpoint_parameters
5+
from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json, get_value_from_path
6+
from dku_constants import DKUConstants
7+
import json
8+
69

710
logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
811

@@ -18,10 +21,11 @@ def __init__(self, config, plugin_config):
1821
custom_key_values = get_dku_key_values(config.get("custom_key_values", {}))
1922
self.client = RestAPIClient(credential, endpoint_parameters, custom_key_values)
2023
extraction_key = endpoint_parameters.get("extraction_key", None)
21-
if extraction_key == '':
22-
extraction_key = None
23-
self.extraction_key = extraction_key
24+
self.extraction_key = extraction_key or ''
25+
self.extraction_path = self.extraction_key.split('.')
2426
self.raw_output = endpoint_parameters.get("raw_output", None)
27+
self.maximum_number_rows = config.get("maximum_number_rows", -1)
28+
self.display_metadata = config.get("display_metadata", False)
2529

2630
def get_read_schema(self):
2731
# In this example, we don't specify a schema here, so DSS will infer the schema
@@ -30,29 +34,37 @@ def get_read_schema(self):
3034

3135
def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
3236
partition_id=None, records_limit=-1):
33-
is_records_limit = records_limit > 0
37+
is_records_limit = (records_limit > 0) or (self.maximum_number_rows > 0)
38+
if self.maximum_number_rows > 0:
39+
records_limit = self.maximum_number_rows
3440
record_count = 0
3541
while self.client.has_more_data():
3642
json_response = self.client.paginated_api_call()
37-
if self.extraction_key is None:
38-
# Todo: check api_response key is free and add something overwise
39-
if isinstance(json_response, list):
40-
record_count += len(json_response)
41-
for row in json_response:
42-
yield {"api_response": row}
43-
else:
44-
record_count += 1
45-
yield {"api_response": json_response}
43+
metadata = self.client.get_metadata() if self.display_metadata else None
44+
if self.extraction_key:
45+
data = get_value_from_path(json_response, self.extraction_path)
4646
else:
47-
data = json_response.get(self.extraction_key, None)
48-
if data is None:
49-
raise DataikuException("Extraction key '{}' was not found in the incoming data".format(self.extraction_key))
47+
data = json_response
48+
if isinstance(data, list):
5049
record_count += len(data)
51-
for result in data:
52-
yield {"api_response": result} if self.raw_output else result
50+
for row in data:
51+
yield self.format_output(row, metadata)
52+
else:
53+
record_count += 1
54+
yield self.format_output(data, metadata)
5355
if is_records_limit and record_count >= records_limit:
5456
break
5557

58+
def format_output(self, item, metadata=None):
59+
output = metadata or {}
60+
if self.raw_output:
61+
output.update({
62+
DKUConstants.API_RESPONSE_KEY: json.dumps(item)
63+
})
64+
else:
65+
output.update(parse_keys_for_json(item))
66+
return output
67+
5668
def get_writer(self, dataset_schema=None, dataset_partitioning=None,
5769
partition_id=None):
5870
"""

python-lib/dku_constants.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
class DKUConstants(object):
2+
API_RESPONSE_KEY = "api_response"
23
RAW_BODY_FORMAT = "RAW"
34
FORM_DATA_BODY_FORMAT = "FORM_DATA"

python-lib/dku_utils.py

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import json
2+
import copy
3+
4+
15
def get_dku_key_values(endpoint_query_string):
26
return {key_value.get("from"): key_value.get("to") for key_value in endpoint_query_string if key_value.get("from")}
37

@@ -19,7 +23,30 @@ def get_endpoint_parameters(configuration):
1923
"requests_per_minute",
2024
"pagination_type",
2125
"next_page_url_key",
22-
"top_key", "skip_key"
26+
"top_key", "skip_key", "maximum_number_rows"
2327
]
2428
parameters = {endpoint_parameter: configuration.get(endpoint_parameter) for endpoint_parameter in endpoint_parameters if configuration.get(endpoint_parameter) is not None}
2529
return parameters
30+
31+
32+
def parse_keys_for_json(items):
33+
ret = {}
34+
for key in items:
35+
value = items.get(key)
36+
if isinstance(value, dict) or isinstance(value, list):
37+
ret.update({key: json.dumps(value)})
38+
elif value is None:
39+
continue
40+
else:
41+
ret.update({key: value})
42+
return ret
43+
44+
45+
def get_value_from_path(dictionary, path):
46+
ret = copy.deepcopy(dictionary)
47+
for key in path:
48+
if key in ret and isinstance(ret, dict):
49+
ret = ret.get(key)
50+
else:
51+
raise ValueError("The extraction path {} was not found in the incoming data".format(path))
52+
return ret

python-lib/pagination.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,15 @@ def reset_paging(self, counting_key=None, url=None):
5050
def set_counting_key(self, counting_key):
5151
self.counting_key = counting_key
5252

53-
def update_next_page(self, data):
53+
def update_next_page(self, data, response_links=None):
54+
response_links = response_links or {}
55+
next_link = response_links.get('next', {})
56+
next_page_url = next_link.get("url")
5457
self.is_first_batch = False
5558
self.counter += 1
5659
self.next_page_number = self.next_page_number + 1
60+
if next_page_url:
61+
self.next_page_url = next_page_url
5762
if isinstance(data, list):
5863
batch_size = len(data)
5964
self.records_to_skip = self.records_to_skip + batch_size

python-lib/rest_api_client.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def __init__(self, credential, endpoint, custom_key_values={}):
107107
elif body_format in [DKUConstants.FORM_DATA_BODY_FORMAT]:
108108
key_value_body = endpoint.get("key_value_body", {})
109109
self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)})
110+
self.metadata = {}
110111

111112
def set_login(self, credential):
112113
login_type = credential.get("login_type", "no_auth")
@@ -140,24 +141,32 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
140141
if self.loop_detector.is_stuck_in_loop(url, kwargs.get("params", {}), kwargs.get("headers", {})):
141142
raise RestAPIClientError("The api-connect plugin is stuck in a loop. Please check the pagination parameters.")
142143
try:
144+
request_start_time = time.time()
143145
response = requests.request(method, url, **kwargs)
146+
request_finish_time = time.time()
144147
except Exception as err:
145148
self.pagination.is_last_batch_empty = True
146149
error_message = "Error: {}".format(err)
147150
if can_raise_exeption:
148151
raise RestAPIClientError(error_message)
149152
else:
150153
return {"error": error_message}
154+
self.set_metadata("request_duration", request_finish_time - request_start_time)
151155
self.time_last_request = time.time()
156+
self.set_metadata("status_code", response.status_code)
157+
self.set_metadata("response_headers", "{}".format(response.headers))
152158
if response.status_code >= 400:
153159
error_message = "Error {}: {}".format(response.status_code, response.content)
154160
self.pagination.is_last_batch_empty = True
155161
if can_raise_exeption:
156162
raise RestAPIClientError(error_message)
157163
else:
158164
return {"error": error_message}
165+
if response.status_code in [204]:
166+
self.pagination.update_next_page({}, response.links)
167+
return self.empty_json_response()
159168
json_response = response.json()
160-
self.pagination.update_next_page(json_response)
169+
self.pagination.update_next_page(json_response, response.links)
161170
return json_response
162171

163172
def paginated_api_call(self, can_raise_exeption=True):
@@ -167,6 +176,12 @@ def paginated_api_call(self, can_raise_exeption=True):
167176
self.requests_kwargs.update({"params": params})
168177
return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, **self.requests_kwargs)
169178

179+
def empty_json_response(self):
180+
return {self.extraction_key: {}} if self.extraction_key else {}
181+
182+
def set_metadata(self, metadata_name, value):
183+
self.metadata["dku_{}".format(metadata_name)] = value
184+
170185
@staticmethod
171186
def get_params(endpoint_query_string, keywords):
172187
templated_query_string = get_dku_key_values(endpoint_query_string)
@@ -191,3 +206,6 @@ def enforce_throttling(self):
191206
if time_since_last_resquests < self.time_between_requests:
192207
logger.info("Enforcing {}s throttling".format(self.time_between_requests - time_since_last_resquests))
193208
time.sleep(self.time_between_requests - time_since_last_resquests)
209+
210+
def get_metadata(self):
211+
return self.metadata

0 commit comments

Comments
 (0)