Skip to content

Commit

Permalink
code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sureshhewabi committed Feb 14, 2025
1 parent c2389f9 commit 7d71f02
Show file tree
Hide file tree
Showing 16 changed files with 69 additions and 83 deletions.
1 change: 0 additions & 1 deletion .params.config

This file was deleted.

6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

[![Nextflow](https://img.shields.io/badge/nextflow%20DSL2-%E2%89%A522.10.6-23aa62.svg)](https://www.nextflow.io/)
[![run with conda](https://img.shields.io/badge/run%20with-conda-3EB049?labelColor=000000&logo=anaconda)](https://docs.conda.io/en/latest/)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)


[//]: # ([![run with singularity](https://img.shields.io/badge/run%20with-singularity-1d355c.svg?labelColor=000000)](https://sylabs.io/docs/))

## Introduction

A pipeline to calculate number of file downloads in PRIDE archive
A nextflow pipeline get the statistics around file downloads from the log files saved in the EBI infrastructure.
This helps to understand the usage of the files and the projects, and helps to make decisions.


## Usage

Expand Down
17 changes: 16 additions & 1 deletion documentation/docs/index.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
# File Download Statistics Workflow Documentation

## Overview
This Nextflow workflow facilitates the automated extraction, transformation, and analysis of file download logs, culminating in statistical reporting and database updates. The pipeline sequentially retrieves log files, processes them into Parquet format, consolidates datasets, performs analytical computations, and disseminates the results for further usage.

EBI infrastructure maintains the log files for file downloads for each service running in the EBI infrastructure.
The purpose of this workflow is to get the statistics around file downloads to help understand the usage of the files and the projects, and helps to make decisions.

## Log Files

Log files are stored in the EBI infrastructure and are in the format of `.gz` files.
The log files are stored in the following format:

![log_file_format.png](assets/log_file_format.png)

Log files are originally stored in `lts` and are copied to our `nobackup` storage for processing.

## Workflow

![log_file_parser.png](assets/log_file_parser.png)

## Parameters
| Parameter | Description |
Expand Down
1 change: 1 addition & 0 deletions filedownloadstat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# from .main import main
35 changes: 17 additions & 18 deletions filedownloadstat/main.py → filedownloadstat/filedownloadstat.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import os

import click

from log_file_stat import LogFileStat
from file_download_stat import FileDownloadStat
from log_file_analyzer import LogFileAnalyzer
from log_file_util import FileUtil
from parquet_analyzer import ParquetAnalyzer
from parquet_reader import ParquetReader
from stat_parquet import StatParquet
from file_util import FileUtil
from report_stat import ReportStat


@click.command("get_log_files",
Expand Down Expand Up @@ -111,7 +110,7 @@ def process_log_file(tsvfilepath, output_parquet, resource: str, complete: str,
type=str
)
def run_log_file_stat(file: str, output: str):
log_file_stat = LogFileStat()
log_file_stat = LogFileAnalyzer()
log_file_stat.run_log_file_stat(file, output)

@click.command(
Expand Down Expand Up @@ -145,7 +144,7 @@ def read_parquet_files(file: str):
)

def merge_parquet_files(input_dir, output_parquet):
stat_parquet = StatParquet()
stat_parquet = ParquetAnalyzer()
result = stat_parquet.merge_parquet_files(input_dir, output_parquet)


Expand Down Expand Up @@ -184,7 +183,7 @@ def analyze_parquet_files(
project_level_yearly_download_counts,
project_level_top_download_counts,
all_data):
stat_parquet = StatParquet()
stat_parquet = ParquetAnalyzer()
result = stat_parquet.analyze_parquet_files(
output_parquet,
project_level_download_counts,
Expand Down Expand Up @@ -244,28 +243,28 @@ def run_file_download_stat(file: str, output: str, report_template: str, baseurl
# Convert the comma-separated string to a list of integers
skipped_years_list = list(map(int, skipped_years.split(","))) if skipped_years else []

file_download_stat = FileDownloadStat()
file_download_stat = ReportStat()
file_download_stat.run_file_download_stat(file, output, report_template, baseurl, report_copy_filepath,
skipped_years_list)


@click.group()
def cli():
def main():
pass


# =============== Features Used ===============

cli.add_command(get_log_files)
cli.add_command(run_log_file_stat)
cli.add_command(process_log_file)
cli.add_command(merge_parquet_files)
cli.add_command(analyze_parquet_files)
cli.add_command(run_file_download_stat)
main.add_command(get_log_files)
main.add_command(run_log_file_stat)
main.add_command(process_log_file)
main.add_command(merge_parquet_files)
main.add_command(analyze_parquet_files)
main.add_command(run_file_download_stat)

# =============== Additional Features ===============

cli.add_command(read_parquet_files)
main.add_command(read_parquet_files)

if __name__ == "__main__":
cli()
main()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pandas as pd


class LogFileStat:
class LogFileAnalyzer:

@staticmethod
def log_file_size_distribution(file):
Expand Down Expand Up @@ -41,29 +41,15 @@ def plot_violin_for_protocols(file_list: str):
fig.write_html("file_size_violin_by_protocol.html")
print("Violin plot written to file_size_violin_by_protocol.html")

# @staticmethod
# def number_of_lines_vs_file_size(file):
# """
# Analyze the relationship between file size and number of lines.
# For example, determine if larger files generally have more lines.
# """
# # Load data into a DataFrame
# data = pd.read_csv(file, sep="\t", header=None, names=["file_path", "filename", "size", "lines"])
#
# # Create scatter plot for number of lines vs file size
# fig = px.scatter(data, x="size", y="lines", hover_name="filename",
# title="Number of Lines vs File Size",
# labels={"size": "File Size (Bytes)", "lines": "Number of Lines"})
# fig.write_html("number_of_lines_vs_file_size.html")

@staticmethod
def run_log_file_stat(file, output):
"""
Run the log file statistics generation and save the visualizations in an HTML output file.
"""
# Generate the visualizations
LogFileStat.log_file_size_distribution(file)
LogFileStat.plot_violin_for_protocols(file)
LogFileAnalyzer.log_file_size_distribution(file)
LogFileAnalyzer.plot_violin_for_protocols(file)

# Combine the HTML files
with open(output, "w") as f:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
warnings.filterwarnings("ignore", category=FutureWarning, module="dask.dataframe")


class LogParser:
class LogFileParser:
"""
Class to parse the log file into parquet format
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
import sys
from pathlib import Path
from log_parser import LogParser

from log_file_parser import LogFileParser
from parquet_writer import ParquetWriter


Expand All @@ -21,20 +22,6 @@ def get_file_paths(self, root_dir):
root_path = Path(root_dir)
return [str(file) for file in root_path.rglob("*.tsv.gz")]

# def count_lines_in_gz(self, file_path):
# """
# Count the number of lines in a gzipped file.
# :param file_path: Path to the gzipped file.
# :return: Number of lines.
# """
# line_count = 0
# try:
# with gzip.open(file_path, "rt", encoding="utf-8") as gz_file:
# for _ in gz_file:
# line_count += 1
# except Exception as e:
# print(f"Error counting lines in {file_path}: {e}")
# return line_count

def process_access_methods(self, root_directory: str, file_paths_list: str, protocols: list, public_list: list):
"""
Expand Down Expand Up @@ -86,7 +73,7 @@ def process_log_file(self, file_path: str, parquet_output_file: str, resource_li

print(f"Parsing file and writing output to {parquet_output_file}")

lp = LogParser(file_path, resource_list, completeness_list, accession_pattern)
lp = LogFileParser(file_path, resource_list, completeness_list, accession_pattern)
writer = ParquetWriter(parquet_path=parquet_output_file, write_strategy='batch', batch_size=batch_size)

for batch in lp.parse_gzipped_tsv(batch_size):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import dask.dataframe as dd

class StatParquet:
class ParquetAnalyzer:
def __init__(self):
pass

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import os
from pathlib import Path

from stattypes.project_stat import ProjectStat
from stattypes.regional_stat import RegionalStat
from stattypes.trends_stat import TrendsStat
from stattypes.user_stat import UserStat
from report import Report
from stat_types import ProjectStat, RegionalStat, TrendsStat, UserStat
from report_util import Report
import pandas as pd


class FileDownloadStat:
class ReportStat:

@staticmethod
def project_stat(df: pd.DataFrame, baseurl: str):
Expand Down Expand Up @@ -116,10 +111,10 @@ def run_file_download_stat(file, output, report_template, baseurl: str, report_c
# Filter out rows where 'year' is in skipped_years_list
df = df[~df["year"].isin(skipped_years_list)]

FileDownloadStat.project_stat(df, baseurl)
FileDownloadStat.trends_stat(df)
FileDownloadStat.regional_stats(df)
FileDownloadStat.user_stats(df)
ReportStat.project_stat(df, baseurl)
ReportStat.trends_stat(df)
ReportStat.regional_stats(df)
ReportStat.user_stats(df)

template_path = Path(__file__).resolve().parent.parent / "template" / report_template

Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions filedownloadstat/stat_types/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .user_stat import UserStat
from .project_stat import ProjectStat
from .regional_stat import RegionalStat
from .trends_stat import TrendsStat
File renamed without changes.
File renamed without changes.
Empty file.
26 changes: 11 additions & 15 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ LaunchDir : $workflow.launchDir
projectDir : $workflow.projectDir
workDir : $workflow.workDir
RunName : $workflow.runName
Profile : $workflow.profile
NextFlow version : $nextflow.version
Nextflow location : ${params.nextflow_location}
Date : ${new java.util.Date()}
Expand Down Expand Up @@ -62,7 +63,7 @@ process get_log_files {

script:
"""
python3 ${workflow.projectDir}/filedownloadstat/main.py get_log_files \
python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py get_log_files \
--root_dir $root_dir \
--output "file_list.txt" \
--protocols "${params.protocols.join(',')}" \
Expand All @@ -82,7 +83,7 @@ process run_log_file_stat{

script:
"""
python3 ${workflow.projectDir}/filedownloadstat/main.py run_log_file_stat \
python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py run_log_file_stat \
--file ${file_paths} \
--output "log_file_statistics.html"
"""
Expand All @@ -104,7 +105,7 @@ process process_log_file {
"""
# Extract a unique identifier from the log file name
filename=\$(basename ${file_path} .log.tsv.gz)
python3 ${workflow.projectDir}/filedownloadstat/main.py process_log_file \
python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py process_log_file \
-f ${file_path} \
-o "\${filename}.parquet" \
-r "${params.resource_identifiers.join(",")}" \
Expand All @@ -131,15 +132,16 @@ process merge_parquet_files {
# Write the file paths to a temporary file, because otherwise Argument list(file list) will be too long
echo "${all_parquet_files.join('\n')}" > all_parquet_files_list.txt
python3 ${workflow.projectDir}/filedownloadstat/main.py merge_parquet_files \
python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py merge_parquet_files \
--input_dir all_parquet_files_list.txt \
--output_parquet "output_parquet"
"""
}

process analyze_parquet_files {

label 'process_high'
label 'process_low'
label 'error_retry_medium'

input:
val output_parquet
Expand All @@ -153,7 +155,7 @@ process analyze_parquet_files {

script:
"""
python3 ${workflow.projectDir}/filedownloadstat/main.py analyze_parquet_files \
python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py analyze_parquet_files \
--output_parquet ${output_parquet} \
--project_level_download_counts project_level_download_counts.json \
--file_level_download_counts file_level_download_counts.json \
Expand All @@ -165,7 +167,8 @@ process analyze_parquet_files {

process run_file_download_stat {

label 'process_high'
label 'process_low'
label 'error_retry_medium'

input:
path all_data // Input the file generated by analyze_parquet_files
Expand All @@ -175,7 +178,7 @@ process run_file_download_stat {

script:
"""
python3 ${workflow.projectDir}/filedownloadstat/main.py run_file_download_stat \
python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py run_file_download_stat \
--file ${all_data} \
--output "file_download_stat.html" \
--report_template ${params.report_template} \
Expand All @@ -185,7 +188,6 @@ process run_file_download_stat {
"""
}


process update_project_download_counts {

label 'error_retry'
Expand Down Expand Up @@ -302,12 +304,6 @@ workflow {
// Step 3: Analyze Parquet files
analyze_parquet_files(merge_parquet_files.out.output_parquet)

// Debug: View individual outputs
analyze_parquet_files.out.project_level_download_counts.view()
analyze_parquet_files.out.file_level_download_counts.view()
analyze_parquet_files.out.project_level_yearly_download_counts.view()
analyze_parquet_files.out.project_level_top_download_counts.view()

// Step 4: Generate Statistics for file downloads
run_file_download_stat(analyze_parquet_files.out.all_data)

Expand Down

0 comments on commit 7d71f02

Please sign in to comment.