Skip to content

Commit 60afec5

Browse files
authored
Merge pull request #12 from dataiku/bug/sc-72912-output-json-compatible-with-unnest
sc 72912 output json compatible with unnest
2 parents ee875e5 + 79f8eb8 commit 60afec5

File tree

9 files changed

+160
-33
lines changed

9 files changed

+160
-33
lines changed

custom-recipes/api-connect/recipe.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,13 @@
226226
"type": "BOOLEAN",
227227
"defaultValue": false
228228
},
229+
{
230+
"name": "display_metadata",
231+
"label": "Display metadata",
232+
"description": "Status code, request time...",
233+
"type": "BOOLEAN",
234+
"defaultValue": false
235+
},
229236
{
230237
"name": "timeout",
231238
"label": "Timeout (s)",
@@ -239,6 +246,13 @@
239246
"description": "-1 for no limit",
240247
"type": "INT",
241248
"defaultValue": -1
249+
},
250+
{
251+
"name": "maximum_number_rows",
252+
"label": "Maximum number of rows",
253+
"description": "-1 for no limit",
254+
"type": "INT",
255+
"defaultValue": -1
242256
}
243257
],
244258
"resourceKeys": []

custom-recipes/api-connect/recipe.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def get_partitioning_keys(id_list, dku_flow_variables):
4040
raise ValueError("There is no parameter column selected.")
4141
parameter_renamings = get_dku_key_values(config.get("parameter_renamings", {}))
4242
custom_key_values = get_dku_key_values(config.get("custom_key_values", {}))
43+
display_metadata = config.get("display_metadata", False)
44+
maximum_number_rows = config.get("maximum_number_rows", -1)
4345
input_parameters_dataset = dataiku.Dataset(input_A_names[0])
4446
partitioning_keys = get_partitioning_keys(input_parameters_dataset, dku_flow_variables)
4547
custom_key_values.update(partitioning_keys)
@@ -51,7 +53,9 @@ def get_partitioning_keys(id_list, dku_flow_variables):
5153
endpoint_parameters,
5254
extraction_key,
5355
parameter_columns,
54-
parameter_renamings
56+
parameter_renamings,
57+
display_metadata,
58+
maximum_number_rows=maximum_number_rows
5559
)
5660
results = recipe_session.process_dataframe(input_parameters_dataframe, is_raw_output)
5761

python-connectors/api-connect_dataset/connector.json

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@
110110
},
111111
{
112112
"name": "extraction_key",
113-
"label": "Key to data array (optional)",
114-
"description": "",
113+
"label": "Path to data array (optional)",
114+
"description": "Dot separated key path",
115115
"defaultValue": null,
116116
"type": "STRING"
117117
},
@@ -186,6 +186,13 @@
186186
"type": "BOOLEAN",
187187
"defaultValue": false
188188
},
189+
{
190+
"name": "display_metadata",
191+
"label": "Display metadata",
192+
"description": "Status code, request time...",
193+
"type": "BOOLEAN",
194+
"defaultValue": false
195+
},
189196
{
190197
"name": "timeout",
191198
"label": "Timeout (s)",

python-connectors/api-connect_dataset/connector.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
from dataikuapi.utils import DataikuException
33
from safe_logger import SafeLogger
44
from rest_api_client import RestAPIClient
5-
from dku_utils import get_dku_key_values, get_endpoint_parameters
5+
from dku_utils import get_dku_key_values, get_endpoint_parameters, parse_keys_for_json, get_value_from_path
6+
from dku_constants import DKUConstants
7+
import json
8+
69

710
logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
811

@@ -18,11 +21,11 @@ def __init__(self, config, plugin_config):
1821
custom_key_values = get_dku_key_values(config.get("custom_key_values", {}))
1922
self.client = RestAPIClient(credential, endpoint_parameters, custom_key_values)
2023
extraction_key = endpoint_parameters.get("extraction_key", None)
21-
if extraction_key == '':
22-
extraction_key = None
23-
self.extraction_key = extraction_key
24+
self.extraction_key = extraction_key or ''
25+
self.extraction_path = self.extraction_key.split('.')
2426
self.raw_output = endpoint_parameters.get("raw_output", None)
2527
self.maximum_number_rows = config.get("maximum_number_rows", -1)
28+
self.display_metadata = config.get("display_metadata", False)
2629

2730
def get_read_schema(self):
2831
# In this example, we don't specify a schema here, so DSS will infer the schema
@@ -37,25 +40,31 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
3740
record_count = 0
3841
while self.client.has_more_data():
3942
json_response = self.client.paginated_api_call()
40-
if self.extraction_key is None:
41-
# Todo: check api_response key is free and add something overwise
42-
if isinstance(json_response, list):
43-
record_count += len(json_response)
44-
for row in json_response:
45-
yield {"api_response": row}
46-
else:
47-
record_count += 1
48-
yield {"api_response": json_response}
43+
metadata = self.client.get_metadata() if self.display_metadata else None
44+
if self.extraction_key:
45+
data = get_value_from_path(json_response, self.extraction_path)
4946
else:
50-
data = json_response.get(self.extraction_key, None)
51-
if data is None:
52-
raise DataikuException("Extraction key '{}' was not found in the incoming data".format(self.extraction_key))
47+
data = json_response
48+
if isinstance(data, list):
5349
record_count += len(data)
54-
for result in data:
55-
yield {"api_response": result} if self.raw_output else result
50+
for row in data:
51+
yield self.format_output(row, metadata)
52+
else:
53+
record_count += 1
54+
yield self.format_output(data, metadata)
5655
if is_records_limit and record_count >= records_limit:
5756
break
5857

58+
def format_output(self, item, metadata=None):
59+
output = metadata or {}
60+
if self.raw_output:
61+
output.update({
62+
DKUConstants.API_RESPONSE_KEY: json.dumps(item)
63+
})
64+
else:
65+
output.update(parse_keys_for_json(item))
66+
return output
67+
5968
def get_writer(self, dataset_schema=None, dataset_partitioning=None,
6069
partition_id=None):
6170
"""

python-lib/dku_constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
class DKUConstants(object):
2+
API_RESPONSE_KEY = "api_response"
23
RAW_BODY_FORMAT = "RAW"
34
FORM_DATA_BODY_FORMAT = "FORM_DATA"

python-lib/dku_utils.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import json
2+
import copy
3+
4+
15
def get_dku_key_values(endpoint_query_string):
26
return {key_value.get("from"): key_value.get("to") for key_value in endpoint_query_string if key_value.get("from")}
37

@@ -19,7 +23,30 @@ def get_endpoint_parameters(configuration):
1923
"requests_per_minute",
2024
"pagination_type",
2125
"next_page_url_key",
22-
"top_key", "skip_key"
26+
"top_key", "skip_key", "maximum_number_rows"
2327
]
2428
parameters = {endpoint_parameter: configuration.get(endpoint_parameter) for endpoint_parameter in endpoint_parameters if configuration.get(endpoint_parameter) is not None}
2529
return parameters
30+
31+
32+
def parse_keys_for_json(items):
33+
ret = {}
34+
for key in items:
35+
value = items.get(key)
36+
if isinstance(value, dict) or isinstance(value, list):
37+
ret.update({key: json.dumps(value)})
38+
elif value is None:
39+
continue
40+
else:
41+
ret.update({key: value})
42+
return ret
43+
44+
45+
def get_value_from_path(dictionary, path):
46+
ret = copy.deepcopy(dictionary)
47+
for key in path:
48+
if key in ret and isinstance(ret, dict):
49+
ret = ret.get(key)
50+
else:
51+
raise ValueError("The extraction path {} was not found in the incoming data".format(path))
52+
return ret

python-lib/rest_api_client.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def __init__(self, credential, endpoint, custom_key_values={}):
107107
elif body_format in [DKUConstants.FORM_DATA_BODY_FORMAT]:
108108
key_value_body = endpoint.get("key_value_body", {})
109109
self.requests_kwargs.update({"json": get_dku_key_values(key_value_body)})
110+
self.metadata = {}
110111

111112
def set_login(self, credential):
112113
login_type = credential.get("login_type", "no_auth")
@@ -140,22 +141,30 @@ def request(self, method, url, can_raise_exeption=True, **kwargs):
140141
if self.loop_detector.is_stuck_in_loop(url, kwargs.get("params", {}), kwargs.get("headers", {})):
141142
raise RestAPIClientError("The api-connect plugin is stuck in a loop. Please check the pagination parameters.")
142143
try:
144+
request_start_time = time.time()
143145
response = requests.request(method, url, **kwargs)
146+
request_finish_time = time.time()
144147
except Exception as err:
145148
self.pagination.is_last_batch_empty = True
146149
error_message = "Error: {}".format(err)
147150
if can_raise_exeption:
148151
raise RestAPIClientError(error_message)
149152
else:
150153
return {"error": error_message}
154+
self.set_metadata("request_duration", request_finish_time - request_start_time)
151155
self.time_last_request = time.time()
156+
self.set_metadata("status_code", response.status_code)
157+
self.set_metadata("response_headers", "{}".format(response.headers))
152158
if response.status_code >= 400:
153159
error_message = "Error {}: {}".format(response.status_code, response.content)
154160
self.pagination.is_last_batch_empty = True
155161
if can_raise_exeption:
156162
raise RestAPIClientError(error_message)
157163
else:
158164
return {"error": error_message}
165+
if response.status_code in [204]:
166+
self.pagination.update_next_page({}, response.links)
167+
return self.empty_json_response()
159168
json_response = response.json()
160169
self.pagination.update_next_page(json_response, response.links)
161170
return json_response
@@ -167,6 +176,12 @@ def paginated_api_call(self, can_raise_exeption=True):
167176
self.requests_kwargs.update({"params": params})
168177
return self.request(self.http_method, self.pagination.get_next_page_url(), can_raise_exeption, **self.requests_kwargs)
169178

179+
def empty_json_response(self):
180+
return {self.extraction_key: {}} if self.extraction_key else {}
181+
182+
def set_metadata(self, metadata_name, value):
183+
self.metadata["dku_{}".format(metadata_name)] = value
184+
170185
@staticmethod
171186
def get_params(endpoint_query_string, keywords):
172187
templated_query_string = get_dku_key_values(endpoint_query_string)
@@ -191,3 +206,6 @@ def enforce_throttling(self):
191206
if time_since_last_resquests < self.time_between_requests:
192207
logger.info("Enforcing {}s throttling".format(self.time_between_requests - time_since_last_resquests))
193208
time.sleep(self.time_between_requests - time_since_last_resquests)
209+
210+
def get_metadata(self):
211+
return self.metadata

python-lib/rest_api_recipe_session.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
11
from dataikuapi.utils import DataikuException
22
from rest_api_client import RestAPIClient
33
from safe_logger import SafeLogger
4+
from dku_utils import parse_keys_for_json
5+
from dku_constants import DKUConstants
46
import copy
7+
import json
58

69
logger = SafeLogger("api-connect plugin", forbiden_keys=["token", "password"])
710

811

912
class RestApiRecipeSession:
10-
def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings):
13+
def __init__(self, custom_key_values, credential_parameters, endpoint_parameters, extraction_key, parameter_columns, parameter_renamings,
14+
display_metadata=False,
15+
maximum_number_rows=-1):
1116
self.custom_key_values = custom_key_values
1217
self.credential_parameters = credential_parameters
1318
self.endpoint_parameters = endpoint_parameters
1419
self.extraction_key = extraction_key
1520
self.client = None
1621
self.initial_parameter_columns = None
1722
self.column_to_parameter_dict = self.get_column_to_parameter_dict(parameter_columns, parameter_renamings)
23+
self.display_metadata = display_metadata
24+
self.maximum_number_rows = maximum_number_rows
25+
self.is_row_limit = (self.maximum_number_rows > 0)
1826

1927
@staticmethod
2028
def get_column_to_parameter_dict(parameter_columns, parameter_renamings):
@@ -30,6 +38,7 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
3038
results = []
3139
time_last_request = None
3240
for index, input_parameters_row in input_parameters_dataframe.iterrows():
41+
rows_count = 0
3342
self.initial_parameter_columns = {}
3443
for column_name in self.column_to_parameter_dict:
3544
parameter_name = self.column_to_parameter_dict[column_name]
@@ -46,42 +55,64 @@ def process_dataframe(self, input_parameters_dataframe, is_raw_output):
4655
while self.client.has_more_data():
4756
page_results = self.retrieve_next_page(is_raw_output)
4857
results.extend(page_results)
58+
rows_count += len(page_results)
59+
if self.is_row_limit and rows_count >= self.maximum_number_rows:
60+
break
4961
time_last_request = self.client.time_last_request
5062
return results
5163

5264
def retrieve_next_page(self, is_raw_output):
5365
page_rows = []
54-
base_row = copy.deepcopy(self.initial_parameter_columns)
5566
logger.info("retrieve_next_page: Calling next page")
5667
json_response = self.client.paginated_api_call(can_raise_exeption=False)
68+
metadata = self.client.get_metadata() if self.display_metadata else {}
69+
is_api_returning_dict = True
5770
if self.extraction_key:
5871
data_rows = json_response.get(self.extraction_key, [json_response])
5972
if data_rows is None:
6073
raise DataikuException("Extraction key '{}' was not found in the incoming data".format(self.extraction_key))
61-
page_rows.extend(self.format_page_rows(data_rows, is_raw_output))
74+
page_rows.extend(self.format_page_rows(data_rows, is_raw_output, metadata))
6275
else:
6376
# Todo: check api_response key is free and add something overwise
77+
base_row = copy.deepcopy(metadata)
6478
if is_raw_output:
6579
if is_error_message(json_response):
66-
base_row.update(json_response)
80+
base_row.update(parse_keys_for_json(json_response))
6781
else:
68-
base_row.update({"api_response": json_response})
82+
base_row.update({
83+
DKUConstants.API_RESPONSE_KEY: json.dumps(json_response)
84+
})
6985
else:
70-
base_row.update(json_response)
71-
page_rows.append(base_row)
86+
if isinstance(json_response, dict):
87+
base_row.update(parse_keys_for_json(json_response))
88+
elif isinstance(json_response, list):
89+
is_api_returning_dict = False
90+
for row in json_response:
91+
base_row = copy.deepcopy(metadata)
92+
base_row.update(parse_keys_for_json(row))
93+
base_row.update(self.initial_parameter_columns)
94+
page_rows.append(base_row)
95+
96+
if is_api_returning_dict:
97+
base_row.update(self.initial_parameter_columns)
98+
page_rows.append(base_row)
7299
return page_rows
73100

74-
def format_page_rows(self, data_rows, is_raw_output):
101+
def format_page_rows(self, data_rows, is_raw_output, metadata=None):
75102
page_rows = []
103+
metadata = metadata or {}
76104
for data_row in data_rows:
77105
base_row = copy.deepcopy(self.initial_parameter_columns)
106+
base_row.update(metadata)
78107
if is_raw_output:
79108
if is_error_message(data_row):
80-
base_row.update(data_row)
109+
base_row.update(parse_keys_for_json(data_row))
81110
else:
82-
base_row.update({"api_response": data_row})
111+
base_row.update({
112+
DKUConstants.API_RESPONSE_KEY: json.dumps(data_row)
113+
})
83114
else:
84-
base_row.update(data_row)
115+
base_row.update(parse_keys_for_json(data_row))
85116
page_rows.append(base_row)
86117
return page_rows
87118

tests/python/integration/test_scenario.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,19 @@ def test_run_api_connect_authentication_modes(user_dss_clients):
99

1010
def test_run_api_connect_pagination_modes(user_dss_clients):
1111
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="PAGINATION")
12+
13+
14+
def test_run_api_connect_recipes(user_dss_clients):
15+
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="Recipes")
16+
17+
18+
def test_run_api_connect_using_global_variable(user_dss_clients):
19+
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="UsingGlobalVariable")
20+
21+
22+
def test_run_api_connect_array_api(user_dss_clients):
23+
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="ArrayAPI")
24+
25+
26+
def test_run_api_connect_search_path(user_dss_clients):
27+
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="SearchPath")

0 commit comments

Comments
 (0)