Skip to content

Commit 5aea935

Browse files
authored
[COST-5623] Add support for GCP daily flow (#539)
* Add support for GCP daily flow * update comment * empty line * lint * lint 2 * Add unit test * ups
1 parent e1abe83 commit 5aea935

File tree

5 files changed

+189
-141
lines changed

5 files changed

+189
-141
lines changed

nise/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "4.7.1"
1+
__version__ = "4.7.2"
22

33

44
VERSION = __version__.split(".")

nise/__main__.py

+8
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,14 @@ def add_gcp_parser_args(parser):
242242
dest="daily_report",
243243
help="GCP daily report activation",
244244
)
245+
246+
parser.add_argument(
247+
"--gcp-daily-flow",
248+
action="store_true",
249+
required=False,
250+
dest="gcp_daily_flow",
251+
help="additional GCP day to day ingest",
252+
)
245253
parser.add_argument(
246254
"-etag", "--gcp-etag", metavar="GCP_ETAG", dest="gcp_etag", required=False, help="The etag in the filename"
247255
)

nise/report.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -1206,6 +1206,7 @@ def _gcp_bigquery_process(
12061206
start_date, end_date, currency, projects, generators, options, gcp_bucket_name, gcp_dataset_name, gcp_table_name
12071207
):
12081208
resource_level = options.get("gcp_resource_level", False)
1209+
gcp_daily_flow = options.get("gcp_daily_flow", False)
12091210
data = []
12101211
for project in projects:
12111212
num_gens = len(generators)
@@ -1239,7 +1240,14 @@ def _gcp_bigquery_process(
12391240
gcp_table_name = f"gcp_billing_export_resource_{etag}"
12401241
else:
12411242
gcp_table_name = f"gcp_billing_export_{etag}"
1242-
gcp_bucket_to_dataset(gcp_bucket_name, output_file_name, gcp_dataset_name, gcp_table_name, resource_level)
1243+
gcp_bucket_to_dataset(
1244+
gcp_bucket_name,
1245+
output_file_name,
1246+
gcp_dataset_name,
1247+
gcp_table_name,
1248+
resource_level,
1249+
gcp_daily_flow=gcp_daily_flow,
1250+
)
12431251

12441252
return monthly_files
12451253

nise/upload.py

+157-139
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ def upload_to_gcp_storage(bucket_name, source_file_name, destination_blob_name):
128128
return uploaded
129129

130130

131-
def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name, resource_level=False):
131+
def gcp_bucket_to_dataset(
132+
gcp_bucket_name, file_name, dataset_name, table_name, resource_level=False, gcp_daily_flow=False
133+
):
132134
"""
133135
Create a gcp dataset from a file stored in a bucket.
134136
@@ -137,6 +139,8 @@ def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name,
137139
file_name (String): The name of the file stored in GCP
138140
dataset_name (String): name for the created dataset in GCP
139141
table_name (String): name for the created dataset in GCP
142+
resource_level (Boolean): indicates whether to generate a resource level report
143+
gcp_daily_flow (Boolean): indicates if the data are ingested as part of the day-to-day flow
140144
141145
Returns:
142146
(Boolean): True if the dataset was created
@@ -156,156 +160,165 @@ def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name,
156160
project_name = bigquery_client.project
157161
dataset_id = f"{project_name}.{dataset_name}"
158162
dataset = bigquery.Dataset(dataset_id)
163+
table_id = f"{project_name}.{dataset_name}.{table_name}"
159164

160-
# delete dataset (does not error if it doesn't exist) and create fresh one
161-
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
162-
dataset = bigquery_client.create_dataset(dataset)
165+
if gcp_daily_flow:
166+
# create the job config for daily flow - i.e., appending data
167+
job_config = bigquery.LoadJobConfig(
168+
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
169+
write_disposition="WRITE_APPEND",
170+
time_partitioning=bigquery.TimePartitioning(), # may be not needed?
171+
)
172+
log_message = f"Dataset {dataset_name} updated in GCP bigquery under the table name {table_name}."
163173

164-
table_id = f"{project_name}.{dataset_name}.{table_name}"
174+
else:
175+
# delete dataset (does not error if it doesn't exist) and create fresh one
176+
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
177+
dataset = bigquery_client.create_dataset(dataset)
165178

166-
# Build schema
167-
schema = [
168-
{"name": "billing_account_id", "type": "STRING", "mode": "NULLABLE"},
169-
{
170-
"name": "service",
171-
"type": "RECORD",
172-
"fields": [
173-
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
174-
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
175-
],
176-
"mode": "NULLABLE",
177-
},
178-
{
179-
"name": "sku",
180-
"type": "RECORD",
181-
"fields": [
182-
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
183-
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
184-
],
185-
"mode": "NULLABLE",
186-
},
187-
{"name": "usage_start_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
188-
{"name": "usage_end_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
189-
{
190-
"name": "project",
191-
"type": "RECORD",
192-
"fields": [
193-
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
194-
{"name": "number", "type": "STRING", "mode": "NULLABLE"},
195-
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
196-
{
197-
"name": "labels",
198-
"type": "RECORD",
199-
"fields": [
200-
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
201-
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
202-
],
203-
"mode": "REPEATED",
204-
},
205-
{"name": "ancestry_numbers", "type": "STRING", "mode": "NULLABLE"},
206-
],
207-
"mode": "NULLABLE",
208-
},
209-
{
210-
"name": "labels",
211-
"type": "RECORD",
212-
"fields": [
213-
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
214-
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
215-
],
216-
"mode": "REPEATED",
217-
},
218-
{
219-
"name": "system_labels",
220-
"type": "RECORD",
221-
"fields": [
222-
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
223-
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
224-
],
225-
"mode": "REPEATED",
226-
},
227-
{
228-
"name": "location",
229-
"type": "RECORD",
230-
"fields": [
231-
{"name": "location", "type": "STRING", "mode": "NULLABLE"},
232-
{"name": "country", "type": "STRING", "mode": "NULLABLE"},
233-
{"name": "region", "type": "STRING", "mode": "NULLABLE"},
234-
{"name": "zone", "type": "STRING", "mode": "NULLABLE"},
235-
],
236-
"mode": "NULLABLE",
237-
},
238-
{"name": "export_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
239-
{"name": "cost", "type": "FLOAT", "mode": "NULLABLE"},
240-
{"name": "currency", "type": "STRING", "mode": "NULLABLE"},
241-
{"name": "currency_conversion_rate", "type": "FLOAT", "mode": "NULLABLE"},
242-
{
243-
"name": "usage",
244-
"type": "RECORD",
245-
"fields": [
246-
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
247-
{"name": "unit", "type": "STRING", "mode": "NULLABLE"},
248-
{"name": "amount_in_pricing_units", "type": "FLOAT", "mode": "NULLABLE"},
249-
{"name": "pricing_unit", "type": "STRING", "mode": "NULLABLE"},
250-
],
251-
"mode": "NULLABLE",
252-
},
253-
{
254-
"name": "credits",
255-
"type": "RECORD",
256-
"fields": [
257-
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
258-
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
259-
{"name": "full_name", "type": "STRING", "mode": "NULLABLE"},
260-
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
261-
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
262-
],
263-
"mode": "REPEATED",
264-
},
265-
{
266-
"name": "invoice",
267-
"type": "RECORD",
268-
"fields": [{"name": "month", "type": "STRING", "mode": "NULLABLE"}],
269-
"mode": "NULLABLE",
270-
},
271-
{"name": "cost_type", "type": "STRING", "mode": "NULLABLE"},
272-
{
273-
"name": "adjustment_info",
274-
"type": "RECORD",
275-
"fields": [
276-
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
277-
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
278-
{"name": "mode", "type": "STRING", "mode": "NULLABLE"},
279-
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
280-
],
281-
"mode": "NULLABLE",
282-
},
283-
]
284-
285-
# Add resource to schema if required
286-
if resource_level:
287-
schema += [
179+
# Build schema
180+
schema = [
181+
{"name": "billing_account_id", "type": "STRING", "mode": "NULLABLE"},
182+
{
183+
"name": "service",
184+
"type": "RECORD",
185+
"fields": [
186+
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
187+
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
188+
],
189+
"mode": "NULLABLE",
190+
},
191+
{
192+
"name": "sku",
193+
"type": "RECORD",
194+
"fields": [
195+
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
196+
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
197+
],
198+
"mode": "NULLABLE",
199+
},
200+
{"name": "usage_start_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
201+
{"name": "usage_end_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
202+
{
203+
"name": "project",
204+
"type": "RECORD",
205+
"fields": [
206+
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
207+
{"name": "number", "type": "STRING", "mode": "NULLABLE"},
208+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
209+
{
210+
"name": "labels",
211+
"type": "RECORD",
212+
"fields": [
213+
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
214+
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
215+
],
216+
"mode": "REPEATED",
217+
},
218+
{"name": "ancestry_numbers", "type": "STRING", "mode": "NULLABLE"},
219+
],
220+
"mode": "NULLABLE",
221+
},
222+
{
223+
"name": "labels",
224+
"type": "RECORD",
225+
"fields": [
226+
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
227+
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
228+
],
229+
"mode": "REPEATED",
230+
},
231+
{
232+
"name": "system_labels",
233+
"type": "RECORD",
234+
"fields": [
235+
{"name": "key", "type": "STRING", "mode": "NULLABLE"},
236+
{"name": "value", "type": "STRING", "mode": "NULLABLE"},
237+
],
238+
"mode": "REPEATED",
239+
},
240+
{
241+
"name": "location",
242+
"type": "RECORD",
243+
"fields": [
244+
{"name": "location", "type": "STRING", "mode": "NULLABLE"},
245+
{"name": "country", "type": "STRING", "mode": "NULLABLE"},
246+
{"name": "region", "type": "STRING", "mode": "NULLABLE"},
247+
{"name": "zone", "type": "STRING", "mode": "NULLABLE"},
248+
],
249+
"mode": "NULLABLE",
250+
},
251+
{"name": "export_time", "type": "TIMESTAMP", "mode": "NULLABLE"},
252+
{"name": "cost", "type": "FLOAT", "mode": "NULLABLE"},
253+
{"name": "currency", "type": "STRING", "mode": "NULLABLE"},
254+
{"name": "currency_conversion_rate", "type": "FLOAT", "mode": "NULLABLE"},
288255
{
289-
"name": "resource",
256+
"name": "usage",
257+
"type": "RECORD",
258+
"fields": [
259+
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
260+
{"name": "unit", "type": "STRING", "mode": "NULLABLE"},
261+
{"name": "amount_in_pricing_units", "type": "FLOAT", "mode": "NULLABLE"},
262+
{"name": "pricing_unit", "type": "STRING", "mode": "NULLABLE"},
263+
],
264+
"mode": "NULLABLE",
265+
},
266+
{
267+
"name": "credits",
290268
"type": "RECORD",
291269
"fields": [
292270
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
293-
{"name": "global_name", "type": "STRING", "mode": "NULLABLE"},
271+
{"name": "amount", "type": "FLOAT", "mode": "NULLABLE"},
272+
{"name": "full_name", "type": "STRING", "mode": "NULLABLE"},
273+
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
274+
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
275+
],
276+
"mode": "REPEATED",
277+
},
278+
{
279+
"name": "invoice",
280+
"type": "RECORD",
281+
"fields": [{"name": "month", "type": "STRING", "mode": "NULLABLE"}],
282+
"mode": "NULLABLE",
283+
},
284+
{"name": "cost_type", "type": "STRING", "mode": "NULLABLE"},
285+
{
286+
"name": "adjustment_info",
287+
"type": "RECORD",
288+
"fields": [
289+
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
290+
{"name": "description", "type": "STRING", "mode": "NULLABLE"},
291+
{"name": "mode", "type": "STRING", "mode": "NULLABLE"},
292+
{"name": "type", "type": "STRING", "mode": "NULLABLE"},
294293
],
295294
"mode": "NULLABLE",
296-
}
295+
},
297296
]
298297

299-
# creates the job config with specifics
300-
job_config = bigquery.LoadJobConfig(
301-
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
302-
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
303-
time_partitioning=bigquery.TimePartitioning(),
304-
schema=schema,
305-
)
298+
# Add resource to schema if required
299+
if resource_level:
300+
schema += [
301+
{
302+
"name": "resource",
303+
"type": "RECORD",
304+
"fields": [
305+
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
306+
{"name": "global_name", "type": "STRING", "mode": "NULLABLE"},
307+
],
308+
"mode": "NULLABLE",
309+
}
310+
]
311+
312+
# creates the job config with specifics
313+
job_config = bigquery.LoadJobConfig(
314+
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
315+
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
316+
time_partitioning=bigquery.TimePartitioning(),
317+
schema=schema,
318+
)
319+
log_message = f"Dataset {dataset_name} created in GCP bigquery under the table name {table_name}."
306320

307321
uri = f"gs://{gcp_bucket_name}/{file_name}"
308-
309322
load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
310323

311324
# waits for the job to finish, will raise an exception if it doesnt work
@@ -316,16 +329,21 @@ def gcp_bucket_to_dataset(gcp_bucket_name, file_name, dataset_name, table_name,
316329
bucket = storage_client.bucket(gcp_bucket_name)
317330
blob = bucket.blob(file_name)
318331
blob.delete()
332+
319333
# Our downloader downloads by the paritiontime, however the default partitiontime is the date
320334
# the data is uploaded to bigquery. Therefore, everything goes into one single day. The load
321335
# job config does not let you upload to the _PARTITIONTIME because it is a prebuild column in
322336
# bigquery. However, we do have permission to update it.
337+
# TODO there is likely a bug on koku side for month boundary - update this to set different
338+
# partition time for some items in a day to investigate it - e.g., by using usage_end_time instead
339+
# of usage_start_time, or by using export_time (would have to be adjusted first)
323340
partition_date_sql = f"""
324341
UPDATE `{table_id}` SET _PARTITIONTIME=CAST(DATE_TRUNC(DATE(usage_start_time), DAY) AS timestamp) WHERE 1=1;
325342
"""
326343
bigquery_client.query(partition_date_sql)
327344

328-
LOG.info(f"Dataset {dataset_name} created in GCP bigquery under the table name {table_name}.")
345+
LOG.info(log_message)
346+
329347
except GoogleCloudError as upload_err:
330348
LOG.error(upload_err)
331349
uploaded = False

0 commit comments

Comments
 (0)