diff --git a/.params.config b/.params.config deleted file mode 100644 index 02ab360..0000000 --- a/.params.config +++ /dev/null @@ -1 +0,0 @@ -workDir = '$WORK_PATH' \ No newline at end of file diff --git a/README.md b/README.md index 62b2dbe..adef841 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,16 @@ [![Nextflow](https://img.shields.io/badge/nextflow%20DSL2-%E2%89%A522.10.6-23aa62.svg)](https://www.nextflow.io/) [![run with conda](https://img.shields.io/badge/run%20with-conda-3EB049?labelColor=000000&logo=anaconda)](https://docs.conda.io/en/latest/) +[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) + [//]: # ([![run with singularity](https://img.shields.io/badge/run%20with-singularity-1d355c.svg?labelColor=000000)](https://sylabs.io/docs/)) ## Introduction -A pipeline to calculate number of file downloads in PRIDE archive +A nextflow pipeline get the statistics around file downloads from the log files saved in the EBI infrastructure. +This helps to understand the usage of the files and the projects, and helps to make decisions. + ## Usage diff --git a/documentation/docs/index.md b/documentation/docs/index.md index 2450dff..ddff659 100644 --- a/documentation/docs/index.md +++ b/documentation/docs/index.md @@ -1,7 +1,22 @@ # File Download Statistics Workflow Documentation ## Overview -This Nextflow workflow facilitates the automated extraction, transformation, and analysis of file download logs, culminating in statistical reporting and database updates. The pipeline sequentially retrieves log files, processes them into Parquet format, consolidates datasets, performs analytical computations, and disseminates the results for further usage. + +EBI infrastructure maintains the log files for file downloads for each service running in the EBI infrastructure. +The purpose of this workflow is to get the statistics around file downloads to help understand the usage of the files and the projects, and helps to make decisions. + +## Log Files + +Log files are stored in the EBI infrastructure and are in the format of `.gz` files. +The log files are stored in the following format: + +![log_file_format.png](assets/log_file_format.png) + +Log files are originally stored in `lts` and are copied to our `nobackup` storage for processing. + +## Workflow + +![log_file_parser.png](assets/log_file_parser.png) ## Parameters | Parameter | Description | diff --git a/filedownloadstat/__init__.py b/filedownloadstat/__init__.py index e69de29..f5928b2 100644 --- a/filedownloadstat/__init__.py +++ b/filedownloadstat/__init__.py @@ -0,0 +1 @@ +# from .main import main \ No newline at end of file diff --git a/filedownloadstat/main.py b/filedownloadstat/filedownloadstat.py similarity index 91% rename from filedownloadstat/main.py rename to filedownloadstat/filedownloadstat.py index e065c0d..51dd59f 100644 --- a/filedownloadstat/main.py +++ b/filedownloadstat/filedownloadstat.py @@ -1,12 +1,11 @@ import os - import click -from log_file_stat import LogFileStat -from file_download_stat import FileDownloadStat +from log_file_analyzer import LogFileAnalyzer +from log_file_util import FileUtil +from parquet_analyzer import ParquetAnalyzer from parquet_reader import ParquetReader -from stat_parquet import StatParquet -from file_util import FileUtil +from report_stat import ReportStat @click.command("get_log_files", @@ -111,7 +110,7 @@ def process_log_file(tsvfilepath, output_parquet, resource: str, complete: str, type=str ) def run_log_file_stat(file: str, output: str): - log_file_stat = LogFileStat() + log_file_stat = LogFileAnalyzer() log_file_stat.run_log_file_stat(file, output) @click.command( @@ -145,7 +144,7 @@ def read_parquet_files(file: str): ) def merge_parquet_files(input_dir, output_parquet): - stat_parquet = StatParquet() + stat_parquet = ParquetAnalyzer() result = stat_parquet.merge_parquet_files(input_dir, output_parquet) @@ -184,7 +183,7 @@ def analyze_parquet_files( project_level_yearly_download_counts, project_level_top_download_counts, all_data): - stat_parquet = StatParquet() + stat_parquet = ParquetAnalyzer() result = stat_parquet.analyze_parquet_files( output_parquet, project_level_download_counts, @@ -244,28 +243,28 @@ def run_file_download_stat(file: str, output: str, report_template: str, baseurl # Convert the comma-separated string to a list of integers skipped_years_list = list(map(int, skipped_years.split(","))) if skipped_years else [] - file_download_stat = FileDownloadStat() + file_download_stat = ReportStat() file_download_stat.run_file_download_stat(file, output, report_template, baseurl, report_copy_filepath, skipped_years_list) @click.group() -def cli(): +def main(): pass # =============== Features Used =============== -cli.add_command(get_log_files) -cli.add_command(run_log_file_stat) -cli.add_command(process_log_file) -cli.add_command(merge_parquet_files) -cli.add_command(analyze_parquet_files) -cli.add_command(run_file_download_stat) +main.add_command(get_log_files) +main.add_command(run_log_file_stat) +main.add_command(process_log_file) +main.add_command(merge_parquet_files) +main.add_command(analyze_parquet_files) +main.add_command(run_file_download_stat) # =============== Additional Features =============== -cli.add_command(read_parquet_files) +main.add_command(read_parquet_files) if __name__ == "__main__": - cli() + main() diff --git a/filedownloadstat/log_file_stat.py b/filedownloadstat/log_file_analyzer.py similarity index 71% rename from filedownloadstat/log_file_stat.py rename to filedownloadstat/log_file_analyzer.py index fa2fd0e..c4a3aaf 100644 --- a/filedownloadstat/log_file_stat.py +++ b/filedownloadstat/log_file_analyzer.py @@ -2,7 +2,7 @@ import pandas as pd -class LogFileStat: +class LogFileAnalyzer: @staticmethod def log_file_size_distribution(file): @@ -41,20 +41,6 @@ def plot_violin_for_protocols(file_list: str): fig.write_html("file_size_violin_by_protocol.html") print("Violin plot written to file_size_violin_by_protocol.html") - # @staticmethod - # def number_of_lines_vs_file_size(file): - # """ - # Analyze the relationship between file size and number of lines. - # For example, determine if larger files generally have more lines. - # """ - # # Load data into a DataFrame - # data = pd.read_csv(file, sep="\t", header=None, names=["file_path", "filename", "size", "lines"]) - # - # # Create scatter plot for number of lines vs file size - # fig = px.scatter(data, x="size", y="lines", hover_name="filename", - # title="Number of Lines vs File Size", - # labels={"size": "File Size (Bytes)", "lines": "Number of Lines"}) - # fig.write_html("number_of_lines_vs_file_size.html") @staticmethod def run_log_file_stat(file, output): @@ -62,8 +48,8 @@ def run_log_file_stat(file, output): Run the log file statistics generation and save the visualizations in an HTML output file. """ # Generate the visualizations - LogFileStat.log_file_size_distribution(file) - LogFileStat.plot_violin_for_protocols(file) + LogFileAnalyzer.log_file_size_distribution(file) + LogFileAnalyzer.plot_violin_for_protocols(file) # Combine the HTML files with open(output, "w") as f: diff --git a/filedownloadstat/log_parser.py b/filedownloadstat/log_file_parser.py similarity index 99% rename from filedownloadstat/log_parser.py rename to filedownloadstat/log_file_parser.py index c23bbce..11454f2 100644 --- a/filedownloadstat/log_parser.py +++ b/filedownloadstat/log_file_parser.py @@ -7,7 +7,7 @@ warnings.filterwarnings("ignore", category=FutureWarning, module="dask.dataframe") -class LogParser: +class LogFileParser: """ Class to parse the log file into parquet format """ diff --git a/filedownloadstat/file_util.py b/filedownloadstat/log_file_util.py similarity index 83% rename from filedownloadstat/file_util.py rename to filedownloadstat/log_file_util.py index db61cb0..16c34b5 100644 --- a/filedownloadstat/file_util.py +++ b/filedownloadstat/log_file_util.py @@ -2,7 +2,8 @@ import os import sys from pathlib import Path -from log_parser import LogParser + +from log_file_parser import LogFileParser from parquet_writer import ParquetWriter @@ -21,20 +22,6 @@ def get_file_paths(self, root_dir): root_path = Path(root_dir) return [str(file) for file in root_path.rglob("*.tsv.gz")] - # def count_lines_in_gz(self, file_path): - # """ - # Count the number of lines in a gzipped file. - # :param file_path: Path to the gzipped file. - # :return: Number of lines. - # """ - # line_count = 0 - # try: - # with gzip.open(file_path, "rt", encoding="utf-8") as gz_file: - # for _ in gz_file: - # line_count += 1 - # except Exception as e: - # print(f"Error counting lines in {file_path}: {e}") - # return line_count def process_access_methods(self, root_directory: str, file_paths_list: str, protocols: list, public_list: list): """ @@ -86,7 +73,7 @@ def process_log_file(self, file_path: str, parquet_output_file: str, resource_li print(f"Parsing file and writing output to {parquet_output_file}") - lp = LogParser(file_path, resource_list, completeness_list, accession_pattern) + lp = LogFileParser(file_path, resource_list, completeness_list, accession_pattern) writer = ParquetWriter(parquet_path=parquet_output_file, write_strategy='batch', batch_size=batch_size) for batch in lp.parse_gzipped_tsv(batch_size): diff --git a/filedownloadstat/stat_parquet.py b/filedownloadstat/parquet_analyzer.py similarity index 99% rename from filedownloadstat/stat_parquet.py rename to filedownloadstat/parquet_analyzer.py index f9c0f70..0c487c8 100644 --- a/filedownloadstat/stat_parquet.py +++ b/filedownloadstat/parquet_analyzer.py @@ -1,7 +1,7 @@ import os import dask.dataframe as dd -class StatParquet: +class ParquetAnalyzer: def __init__(self): pass diff --git a/filedownloadstat/file_download_stat.py b/filedownloadstat/report_stat.py similarity index 93% rename from filedownloadstat/file_download_stat.py rename to filedownloadstat/report_stat.py index 4b07fa9..67b24a6 100644 --- a/filedownloadstat/file_download_stat.py +++ b/filedownloadstat/report_stat.py @@ -1,15 +1,10 @@ -import os from pathlib import Path -from stattypes.project_stat import ProjectStat -from stattypes.regional_stat import RegionalStat -from stattypes.trends_stat import TrendsStat -from stattypes.user_stat import UserStat -from report import Report +from stat_types import ProjectStat, RegionalStat, TrendsStat, UserStat +from report_util import Report import pandas as pd - -class FileDownloadStat: +class ReportStat: @staticmethod def project_stat(df: pd.DataFrame, baseurl: str): @@ -116,10 +111,10 @@ def run_file_download_stat(file, output, report_template, baseurl: str, report_c # Filter out rows where 'year' is in skipped_years_list df = df[~df["year"].isin(skipped_years_list)] - FileDownloadStat.project_stat(df, baseurl) - FileDownloadStat.trends_stat(df) - FileDownloadStat.regional_stats(df) - FileDownloadStat.user_stats(df) + ReportStat.project_stat(df, baseurl) + ReportStat.trends_stat(df) + ReportStat.regional_stats(df) + ReportStat.user_stats(df) template_path = Path(__file__).resolve().parent.parent / "template" / report_template diff --git a/filedownloadstat/report.py b/filedownloadstat/report_util.py similarity index 100% rename from filedownloadstat/report.py rename to filedownloadstat/report_util.py diff --git a/filedownloadstat/stat_types/__init__.py b/filedownloadstat/stat_types/__init__.py new file mode 100644 index 0000000..130d56b --- /dev/null +++ b/filedownloadstat/stat_types/__init__.py @@ -0,0 +1,4 @@ +from .user_stat import UserStat +from .project_stat import ProjectStat +from .regional_stat import RegionalStat +from .trends_stat import TrendsStat \ No newline at end of file diff --git a/filedownloadstat/stattypes/project_stat.py b/filedownloadstat/stat_types/project_stat.py similarity index 100% rename from filedownloadstat/stattypes/project_stat.py rename to filedownloadstat/stat_types/project_stat.py diff --git a/filedownloadstat/stattypes/regional_stat.py b/filedownloadstat/stat_types/regional_stat.py similarity index 100% rename from filedownloadstat/stattypes/regional_stat.py rename to filedownloadstat/stat_types/regional_stat.py diff --git a/filedownloadstat/stattypes/__init__.py b/filedownloadstat/stattypes/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/main.nf b/main.nf index ea43525..abe43b8 100644 --- a/main.nf +++ b/main.nf @@ -30,6 +30,7 @@ LaunchDir : $workflow.launchDir projectDir : $workflow.projectDir workDir : $workflow.workDir RunName : $workflow.runName +Profile : $workflow.profile NextFlow version : $nextflow.version Nextflow location : ${params.nextflow_location} Date : ${new java.util.Date()} @@ -62,7 +63,7 @@ process get_log_files { script: """ - python3 ${workflow.projectDir}/filedownloadstat/main.py get_log_files \ + python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py get_log_files \ --root_dir $root_dir \ --output "file_list.txt" \ --protocols "${params.protocols.join(',')}" \ @@ -82,7 +83,7 @@ process run_log_file_stat{ script: """ - python3 ${workflow.projectDir}/filedownloadstat/main.py run_log_file_stat \ + python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py run_log_file_stat \ --file ${file_paths} \ --output "log_file_statistics.html" """ @@ -104,7 +105,7 @@ process process_log_file { """ # Extract a unique identifier from the log file name filename=\$(basename ${file_path} .log.tsv.gz) - python3 ${workflow.projectDir}/filedownloadstat/main.py process_log_file \ + python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py process_log_file \ -f ${file_path} \ -o "\${filename}.parquet" \ -r "${params.resource_identifiers.join(",")}" \ @@ -131,7 +132,7 @@ process merge_parquet_files { # Write the file paths to a temporary file, because otherwise Argument list(file list) will be too long echo "${all_parquet_files.join('\n')}" > all_parquet_files_list.txt - python3 ${workflow.projectDir}/filedownloadstat/main.py merge_parquet_files \ + python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py merge_parquet_files \ --input_dir all_parquet_files_list.txt \ --output_parquet "output_parquet" """ @@ -139,7 +140,8 @@ process merge_parquet_files { process analyze_parquet_files { - label 'process_high' + label 'process_low' + label 'error_retry_medium' input: val output_parquet @@ -153,7 +155,7 @@ process analyze_parquet_files { script: """ - python3 ${workflow.projectDir}/filedownloadstat/main.py analyze_parquet_files \ + python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py analyze_parquet_files \ --output_parquet ${output_parquet} \ --project_level_download_counts project_level_download_counts.json \ --file_level_download_counts file_level_download_counts.json \ @@ -165,7 +167,8 @@ process analyze_parquet_files { process run_file_download_stat { - label 'process_high' + label 'process_low' + label 'error_retry_medium' input: path all_data // Input the file generated by analyze_parquet_files @@ -175,7 +178,7 @@ process run_file_download_stat { script: """ - python3 ${workflow.projectDir}/filedownloadstat/main.py run_file_download_stat \ + python3 ${workflow.projectDir}/filedownloadstat/filedownloadstat.py run_file_download_stat \ --file ${all_data} \ --output "file_download_stat.html" \ --report_template ${params.report_template} \ @@ -185,7 +188,6 @@ process run_file_download_stat { """ } - process update_project_download_counts { label 'error_retry' @@ -302,12 +304,6 @@ workflow { // Step 3: Analyze Parquet files analyze_parquet_files(merge_parquet_files.out.output_parquet) - // Debug: View individual outputs - analyze_parquet_files.out.project_level_download_counts.view() - analyze_parquet_files.out.file_level_download_counts.view() - analyze_parquet_files.out.project_level_yearly_download_counts.view() - analyze_parquet_files.out.project_level_top_download_counts.view() - // Step 4: Generate Statistics for file downloads run_file_download_stat(analyze_parquet_files.out.all_data)