2
2
from dataikuapi .utils import DataikuException
3
3
from safe_logger import SafeLogger
4
4
from rest_api_client import RestAPIClient
5
- from dku_utils import get_dku_key_values , get_endpoint_parameters , parse_keys_for_json , get_value_from_path
5
+ from dku_utils import (
6
+ get_dku_key_values , get_endpoint_parameters ,
7
+ parse_keys_for_json , get_value_from_path , get_secure_credentials , decode_csv_data
8
+ )
6
9
from dku_constants import DKUConstants
7
10
import json
8
11
@@ -17,9 +20,10 @@ def __init__(self, config, plugin_config):
17
20
logger .info ('API-Connect plugin connector v{}' .format (DKUConstants .PLUGIN_VERSION ))
18
21
logger .info ("config={}" .format (logger .filter_secrets (config )))
19
22
endpoint_parameters = get_endpoint_parameters (config )
23
+ secure_credentials = get_secure_credentials (config )
20
24
credential = config .get ("credential" , {})
21
25
custom_key_values = get_dku_key_values (config .get ("custom_key_values" , {}))
22
- self .client = RestAPIClient (credential , endpoint_parameters , custom_key_values )
26
+ self .client = RestAPIClient (credential , secure_credentials , endpoint_parameters , custom_key_values )
23
27
extraction_key = endpoint_parameters .get ("extraction_key" , None )
24
28
self .extraction_key = extraction_key or ''
25
29
self .extraction_path = self .extraction_key .split ('.' )
@@ -49,9 +53,14 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None,
49
53
record_count += len (data )
50
54
for row in data :
51
55
yield self .format_output (row , metadata )
52
- else :
56
+ elif isinstance ( data , dict ) :
53
57
record_count += 1
54
58
yield self .format_output (data , metadata )
59
+ else :
60
+ data = decode_csv_data (data )
61
+ record_count += len (data )
62
+ for row in data :
63
+ yield self .format_output (row , metadata )
55
64
if is_records_limit and record_count >= records_limit :
56
65
break
57
66
0 commit comments