Skip to content

Commit

Permalink
Merge pull request #6 from zyxware/develop
Browse files Browse the repository at this point in the history
Add the wrapper scripts to avoid sampling
  • Loading branch information
vimaljoseph authored May 28, 2024
2 parents 0f32944 + f26c19b commit fad01b4
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 18 deletions.
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This is a Free Software licenced under GNU GPL v2.0 or above. Please see [What i
- `oauth2client`
- `pyyaml`
- `argparse`
- `pandas`

## Setup and Installation

Expand All @@ -36,7 +37,7 @@ This is a Free Software licenced under GNU GPL v2.0 or above. Please see [What i
```sh
python -m venv venv
source venv/bin/activate
pip install google-api-python-client oauth2client pyyaml argparse
pip install google-api-python-client oauth2client pyyaml argparse pandas
```
3. **Service Account and API Key:**
Expand Down Expand Up @@ -114,6 +115,26 @@ python3 analytics_reporter.py --report_id 1 --start 2023-01-01 --end 2023-01-31
The script will automatically resume downloading from the last saved progress.
### Avoid Sampling
The Google Analytics Reporting API may return a sample of sessions if the date range is very large or the number of records in a query is very large. To address this, we have included a wrapper script `ua_backup.py`, which can take the same list of arguments as `analytics_reporter.py`. This script will split the date range into smaller chunks based on the value in the `--report_level` argument. The available options are 'day', 'week', 'month', and 'year'.
**Example:**
```sh
python3 ua_backup.py --report_id 1 --start 2020-01-01 --end 2023-01-31 --report_level day
```
This will run the query for each day and store the results as separate CSV files in the output folder. The script `merge_report.py` can be used to merge all the individual CSV files into a single CSV file.
```sh
python3 merge_report.py output/123423_ua-property full_report
```
You will get the merged CSV report in the `full_report` folder.
**Note:** The system uses `ua-backup-execution.log` to keep track of the last script executed to resume execution if any error occurs. It also uses `quota_exceeded.log` to track whether the quota was exceeded. The `<view-id>_progress.log` is used to track individual reports. If you want to execute the script as a fresh one, starting from the beginning, you should remove these log files.
### Debugging
Try removing the <view-id>_progress.log file and the csv files generated to restart the download. Check the settings.yml, reports_config.yml files and make sure that the values are correct. Also refer the settings.yml.default and reports_config.yml.example.
Expand Down
77 changes: 64 additions & 13 deletions analytics_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import traceback
import json
import logging
from datetime import datetime
from ga_data_fetcher import get_data
from utils import format_date, write_to_csv, append_to_csv, clear_csv_file, clean_name, load_progress, save_progress

Expand All @@ -38,16 +39,50 @@ def signal_handler(sig, frame):
logging.info("You pressed Ctrl+C! Waiting for the current request to complete...")
interrupted = True

def construct_output_file(property_name, view_id, report_id, report_name):
def construct_log_file(view_id, report_name, sequence=None, log_type="progress"):
"""Construct the log file name based on provided parameters."""
log_file_name = f"{view_id}_{log_type}.log"
if sequence:
log_file_name = f"{view_id}_{log_type}.log"
return log_file_name

def log_sampling_info(output_dir, view_id, report_name, sampling_info, sequence=None):
"""Log sampling information to a log file."""
sampling_log_file = os.path.join(output_dir, construct_log_file(view_id, report_name, sequence, "sampling"))
with open(sampling_log_file, 'a') as log_file:
log_file.write(f"Sampling Info for View ID {view_id}, Report: {report_name}:\n")
log_file.write(f" Is Sampled: {sampling_info['is_sampled']}\n")
log_file.write(f" Samples Read Counts: {sampling_info['samples_read_counts']}\n")
log_file.write(f" Sampling Space Sizes: {sampling_info['sampling_space_sizes']}\n")
log_file.write("\n")

def construct_output_file(property_name, view_id, report_id, report_name, sequence=None):
"""Construct the output file name based on provided parameters."""
property_name_clean = clean_name(property_name) if property_name else ""
report_name_clean = clean_name(report_name)

output_dir = f"output/{view_id}_{property_name_clean}"
os.makedirs(output_dir, exist_ok=True)

if property_name_clean:
return f"{property_name_clean}_{view_id}_{report_id}_{report_name_clean}_report.csv"
if sequence:
return f"{output_dir}/{property_name_clean}_{view_id}_{report_id}_{report_name_clean}_report_{sequence}.csv"
else:
return f"{output_dir}/{property_name_clean}_{view_id}_{report_id}_{report_name_clean}_report.csv"
else:
return f"{view_id}_{report_id}_{report_name_clean}_report.csv"
if sequence:
return f"{output_dir}/{view_id}_{report_id}_{report_name_clean}_report_{sequence}.csv"
else:
return f"{output_dir}/{view_id}_{report_id}_{report_name_clean}_report.csv"

def log_quota_exceeded(view_id):
"""Log the date and time when quota is exceeded."""
quota_log_file = f"quota_exceeded.log"
with open(quota_log_file, 'a') as log_file:
log_file.write(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")


def generate_report(report_config, start_date, end_date, api_key, view_id, output_file):
def generate_report(report_config, start_date, end_date, api_key, view_id, report_name, output_file, sequence=None):
"""Generate report based on provided configuration."""
global interrupted
success = False
Expand All @@ -59,11 +94,12 @@ def generate_report(report_config, start_date, end_date, api_key, view_id, outpu

dimensions = report_config['dimensions']
metrics = report_config['metrics']
page_size = report_config.get('page_size', 1000) # Default to 1000 if not specified
page_size = report_config.get('page_size', 5000) # Default to 5000 if not specified
sampling_level = report_config.get('sampling_level', 'DEFAULT') # Default to 'DEFAULT' if not specified
metrics_filter = report_config.get('metrics_filter', False)

progress_file = f"{view_id}_progress.log"
output_dir = os.path.dirname(output_file)
progress_file = os.path.join(output_dir, construct_log_file(view_id, report_name, sequence, "progress"))
progress_data = load_progress(progress_file)

total_records_downloaded = 0
Expand Down Expand Up @@ -95,7 +131,17 @@ def generate_report(report_config, start_date, end_date, api_key, view_id, outpu
break

try:
data, next_page_token = get_data(api_key, view_id, dimensions, metrics, start_date, end_date, format_date, page_size, next_page_token, sampling_level, metrics_filter)
data, next_page_token, sampling_info, quota_exceeded = get_data(api_key, view_id, dimensions, metrics, start_date, end_date, format_date, page_size, next_page_token, sampling_level, metrics_filter)
if quota_exceeded:
log_quota_exceeded(view_id)
break
if sampling_info['is_sampled']:
logging.info("Data is sampled.")
logging.info(f"Sampling Read Counts: {sampling_info['samples_read_counts']}")
logging.info(f"Sample Space Sizes:, {sampling_info['sampling_space_sizes']}")
log_sampling_info(output_dir, view_id, report_name, sampling_info, sequence)
else:
logging.info("Data is not sampled.")
if not data:
logging.info(f"No data available to download for {output_file}")
break
Expand All @@ -116,6 +162,10 @@ def generate_report(report_config, start_date, end_date, api_key, view_id, outpu
if not next_page_token or interrupted:
break

except ValueError as e:
logging.error(f"ValueError: {e}")
logging.error(traceback.format_exc())
break # Exit the loop on any error and save progress
except Exception as e:
logging.error(f"An error occurred while fetching data: {e}")
logging.error(traceback.format_exc())
Expand All @@ -130,16 +180,16 @@ def generate_report(report_config, start_date, end_date, api_key, view_id, outpu
if success and data:
logging.info(f"Data available in CSV file: {output_file}")

def generate_all_reports(report_configs, start_date, end_date, api_key, view_id, property_name):
def generate_all_reports(report_configs, start_date, end_date, api_key, view_id, property_name, sequence=None):
"""Generate all reports specified in the configuration."""
property_name_clean = clean_name(property_name) if property_name else ""

for report_config in report_configs:
report_name = report_config['name']
output_file = construct_output_file(property_name, view_id, report_config['id'], report_name)
output_file = construct_output_file(property_name, view_id, report_config['id'], report_name, sequence)

logging.info(f"Generating report for {report_name}")
generate_report(report_config, start_date, end_date, api_key, view_id, output_file)
generate_report(report_config, start_date, end_date, api_key, view_id, report_name, output_file, sequence)
if interrupted:
logging.info("Interrupted! Stopping further report generation.")
break
Expand All @@ -153,6 +203,7 @@ def main():
parser.add_argument('-s', '--start', type=str, required=True, help='Start date (YYYY-MM-DD)')
parser.add_argument('-e', '--end', type=str, required=True, help='End date (YYYY-MM-DD)')
parser.add_argument('--settings', type=str, help='Path to settings YAML file')
parser.add_argument('--sequence', type=str, help='Optional sequence prefix for the output file name')
args = parser.parse_args()

settings_file = args.settings if args.settings else "settings.yml"
Expand All @@ -172,12 +223,12 @@ def main():
logging.error(f"Report configuration for ID {args.report_id} not found.")
return
report_name = report_config['name']
output_file = construct_output_file(property_name, view_id, args.report_id, report_name)
output_file = construct_output_file(property_name, view_id, args.report_id, report_name, args.sequence)

logging.info(f"Generate report for {report_name}")
generate_report(report_config, args.start, args.end, api_key, view_id, output_file)
generate_report(report_config, args.start, args.end, api_key, view_id, report_name, output_file, args.sequence)
else:
generate_all_reports(report_configs['reports'], args.start, args.end, api_key, view_id, property_name)
generate_all_reports(report_configs['reports'], args.start, args.end, api_key, view_id, property_name, args.sequence)

if __name__ == "__main__":
main()
25 changes: 21 additions & 4 deletions ga_data_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
from googleapiclient.errors import HttpError
import logging

def get_data(api_key, view_id, dimensions, metrics, start_date, end_date, date_formatter, page_size=1000, next_page_token=None, sample_size='DEFAULT', metric_filter=False):
def get_data(api_key, view_id, dimensions, metrics, start_date, end_date, date_formatter, page_size=5000, next_page_token=None, sample_size='DEFAULT', metric_filter=False):
# Initialize service
credentials = ServiceAccountCredentials.from_json_keyfile_name(api_key)
service = build('analyticsreporting', 'v4', credentials=credentials)
Expand Down Expand Up @@ -45,6 +46,14 @@ def get_data(api_key, view_id, dimensions, metrics, start_date, end_date, date_f
column_header_entries = report['columnHeader']['dimensions'] + \
[entry['name'] for entry in report['columnHeader']['metricHeader']['metricHeaderEntries']]
rows = report.get('data', {}).get('rows', [])
samples_read_counts = report.get('data', {}).get('samplesReadCounts', [])
sampling_space_sizes = report.get('data', {}).get('samplingSpaceSizes', [])
is_sampled = bool(samples_read_counts and sampling_space_sizes)
sampling_info = {
'is_sampled': is_sampled,
'samples_read_counts': samples_read_counts,
'sampling_space_sizes': sampling_space_sizes
}

for row in rows:
formatted_row = {}
Expand All @@ -61,8 +70,16 @@ def get_data(api_key, view_id, dimensions, metrics, start_date, end_date, date_f
# Get the next page token, if any
new_next_page_token = report.get('nextPageToken', None)

return formatted_data, new_next_page_token
return formatted_data, new_next_page_token, sampling_info, False

except HttpError as error:
print(f"Error fetching data: {error}")
return [], None
if error.resp.status == 429:
logging.error("Quota Error: Quota exceeded. Please try again later.")
return [], None, {'is_sampled': False, 'samples_read_counts': [], 'sampling_space_sizes': []}, True
else:
logging.error(f"Error fetching data: {error}")
return [], None, {'is_sampled': False, 'samples_read_counts': [], 'sampling_space_sizes': []}, False
except Exception as e:
logging.error(f"An error occurred: {e}")
return [], None, {'is_sampled': False, 'samples_read_counts': [], 'sampling_space_sizes': []}, False

71 changes: 71 additions & 0 deletions merge_reports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import pandas as pd
from glob import glob
import argparse
import re

def merge_report_files(input_dir, output_dir):
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Dictionary to hold report file information
report_files = {}

# Regex to match report files
report_file_pattern = re.compile(r'^(.*)_(\d+)_(\d+)_([a-zA-Z-]+)_report_(\d+)\.csv$')

# Find all matching files
for filepath in glob(os.path.join(input_dir, '*.csv')):
filename = os.path.basename(filepath)
match = report_file_pattern.match(filename)
if match:
base_name = match.group(1)
view_id = match.group(2)
report_id = match.group(3)
report_name = match.group(4)
sequence = int(match.group(5))

key = f"{base_name}_{view_id}_{report_id}_{report_name}"
if key not in report_files:
report_files[key] = []
report_files[key].append((sequence, filepath))
else:
print(f"Skipping file {filename}: Filename does not match expected pattern")

# Merge files for each report
with open(os.path.join(output_dir, 'all_reports.log'), 'w') as log_file:
for key, files in report_files.items():
files.sort() # Sort files by sequence number
output_file = os.path.join(output_dir, f"{key}_report_full.csv")
total_records = 0
start_date = None
end_date = None
num_files_merged = len(files)

merged_df = pd.DataFrame()
for seq, filepath in files:
df = pd.read_csv(filepath)
if 'ga:date' in df.columns:
if start_date is None:
start_date = df['ga:date'].iloc[0]
end_date = df['ga:date'].iloc[-1]
num_records = len(df) # Count all rows, no header adjustment
total_records += num_records
merged_df = pd.concat([merged_df, df], ignore_index=True)

# Write the merged dataframe to CSV
merged_df.to_csv(output_file, index=False)

log_file.write(f"{output_file},{start_date or ''},{end_date or ''},{num_files_merged},{total_records}\n")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Merge report files into a single file for each report.')
parser.add_argument('input_dir', type=str, help='Path to the input directory containing report files.')
parser.add_argument('output_dir', type=str, nargs='?', default='.', help='Path to the output directory where merged files will be stored. Defaults to current directory.')

args = parser.parse_args()

input_dir = args.input_dir
output_dir = args.output_dir

merge_report_files(input_dir, output_dir)
Loading

0 comments on commit fad01b4

Please sign in to comment.