From 33673bca9780d1d5ee5839dfaf0f238e4b5ad9e4 Mon Sep 17 00:00:00 2001 From: Edward Yang <94523015+edoyango@users.noreply.github.com> Date: Tue, 2 May 2023 14:18:39 +1000 Subject: [PATCH 1/9] Initial commit --- LICENSE | 21 +++++++++++++++++++++ README.md | 2 ++ 2 files changed, 23 insertions(+) create mode 100644 LICENSE create mode 100644 README.md diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1c24a1f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 WEHI-ResearchComputing + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..489d97d --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# samsa2-nf +Nextflow translation for SAMSA2 metatranscriptomics pipeline. Tailored for running on WEHI Milton. From bc54564392847248ed7fd5a43aab3cfa4519bb93 Mon Sep 17 00:00:00 2001 From: Edward Yang <94523015+edoyango@users.noreply.github.com> Date: Tue, 2 May 2023 14:46:58 +1000 Subject: [PATCH 2/9] Update README.md --- README.md | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 489d97d..0301c91 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,67 @@ # samsa2-nf -Nextflow translation for SAMSA2 metatranscriptomics pipeline. Tailored for running on WEHI Milton. +Nextflow translation for [SAMSA2](https://github.com/transcript/samsa2) metatranscriptomics pipeline. Tailored for running on WEHI Milton. The translation is of the [master](https://github.com/transcript/samsa2/blob/master/bash_scripts/master_script.sh) script in the original pipeline. + +## Usage + +### Setup + +After following the Samsa2 setup instructions, the working directory structure should look like: + +``` +. +├── full_databases +│   ├── New_Bac_Vir_Arc_RefSeq.dmnd +│   ├── New_Bac_Vir_Arc_RefSeq.fa +│   ├── RefSeq_bac.fa +│   ├── subsys_db.dmnd +│   ├── subsys_db.fa +│   └── viral_reference +├── input_files +│   ├── 48E_S68_L001_R1_001.fastq # input files follow *_R{1,2}_* pattern +│   ├── 48E_S68_L001_R2_001.fastq +│   ├── 48F_S69_L001_R1_001.fastq +│   ├── 48F_S69_L001_R2_001.fastq +| ├── ...other input files... +├── programs +│   ├── diamond +│   ├── diamond-linux64.tar.gz +│   ├── diamond_manual.pdf +│   ├── diamond-sse2 +│   ├── pear-0.9.10-linux-x86_64 +│   ├── pear-0.9.10-linux-x86_64.tar.gz +│   ├── sortmerna-2.1 +│   ├── sortmerna-2.1.tar.gz +│   ├── Trimmomatic-0.36 +│   └── Trimmomatic-0.36.zip +└── python_scripts +    ├── DIAMOND_analysis_counter_mp.py # this is a parallel version of DIAMOND_analysis_counter.py +    ├── DIAMOND_analysis_counter.py +    ├── DIAMOND_subsystems_analysis_counter.py +    ├── raw_read_counter.py +    └── subsys_reducer.py +``` + +### Run the pipeline + +```bash +nextflow run samsa2-master.nf +``` + +### Parameters + +``` +nextfow run samsa2-master.nf + --input_files + --python_scripts + --diamond_database + --subsys_database + --output_dir +``` + +## Modifications from original + +`DIAMOND_analysis_counter_mp.py` is modified from the original SAMSA2 pipeline. It's modified to make use of Python's `multiprocessing` module. Currently, this produces identical results to the original in `-O` and `-F` mode (used in the original master script), but not `-R` and `-SO` mode. + +The DIAMOND blastx step has the `-b 12` and `-c 1` [performance options](https://github.com/bbuchfink/diamond/wiki/3.-Command-line-options#memory--performance-options) added. These decrease the number of blocks and decrease the number of chunks from the default. Both of these changes have the net effect of increasing memory usage *substantially*, but reducing run times *substantially*. The maximum observed memory usage is 170GB, which Milton's nodes can easily satisfy, but may be problematic on other hardware. + +The DIAMOND blastx step is also making use of `/vast/scratch/users/$USER/tmp`. This should be parameterised in the future. From 0e940980629d9d8d17f271a37115208a3521aabf Mon Sep 17 00:00:00 2001 From: Edward Yang Date: Tue, 2 May 2023 14:52:57 +1000 Subject: [PATCH 3/9] Added scripts --- DIAMOND_analysis_counter_mp.py | 331 +++++++++++++++++++++++++++++++++ 1 file changed, 331 insertions(+) create mode 100644 DIAMOND_analysis_counter_mp.py diff --git a/DIAMOND_analysis_counter_mp.py b/DIAMOND_analysis_counter_mp.py new file mode 100644 index 0000000..49a3dd3 --- /dev/null +++ b/DIAMOND_analysis_counter_mp.py @@ -0,0 +1,331 @@ +#!/usr/bin/env Python +########################################################################## +# +# Copyright (C) 2015-2016 Sam Westreich +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +########################################################################## +# +# DIAMOND_analysis_counter.py +# Created 8/16/2016, this version created 1/10/2017 +# Sam Westreich, stwestreich@ucdavis.edu, github.com/transcript +# +# This program parses through the results file from a DIAMOND annotation run +# (in BLAST m8 format) to get the results into something more compressed +# and readable. +# +# Usage: +# +# -I infile specifies the infile (a DIAMOND results file +# in m8 format) +# -D database specifies a reference database to search against +# for results +# -O organism returns organism results +# -F function returns functional results +# -R reference returns reference IDs in results +# -SO specific org creates a separate outfile for results that hit +# a specific organism +# +########################################################################## + +# imports +import operator, sys, time, gzip, re, os, multiprocessing as mp + +# String searching function: +def string_find(usage_term): + for idx, elem in enumerate(sys.argv): + this_elem = elem + next_elem = sys.argv[(idx + 1) % len(sys.argv)] + if elem == usage_term: + return next_elem + +t0 = time.time() + +# checking for an option (organism or function) to be specified +if "-O" not in sys.argv: + if "-F" not in sys.argv: + if "-R" not in sys.argv: + sys.exit("WARNING: need to specify either organism results (with -O flag in command), reference IDs (with -R flag in command), or functional results (with -F flag in command).") + +# loading starting file +if "-I" in sys.argv: + infile_name = string_find("-I") +else: + sys.exit ("WARNING: infile must be specified using '-I' flag.") + +# checking to make sure database is specified +if "-D" in sys.argv: + db_name = string_find("-D") +else: + sys.exit( "No database file indicated; skipping database search step.") + +if "-t" in sys.argv: + threads = string_find("-t") +else: + threads = 1 + +infile = open (infile_name, "r") + +# setting up databases +RefSeq_hit_count_db = {} +unique_seq_db = {} +line_counter = 0 + +# reading through the infile - the DIAMOND results m8 format +print ("\nNow reading through the m8 results infile.") + +for line in infile: + line_counter += 1 + splitline = line.split("\t") + if line_counter % 1000000 == 0: + t99 = time.time() + print (str(line_counter)[:-6] + "M lines processed so far in " + str(t99-t0) + " seconds.") + + unique_seq_db[splitline[0]] = 1 + + try: + RefSeq_hit_count_db[splitline[1]] += 1 + except KeyError: + RefSeq_hit_count_db[splitline[1]] = 1 + continue + +t1 = time.time() + +print ("\nAnalysis of " + infile_name + " complete.") +print ("Number of total lines: " + str(line_counter)) +print ("Number of unique sequences: " + str(len(unique_seq_db))) +print ("Time elapsed: " + str(t1-t0) + " seconds.") + +infile.close() + +# time to search for these in the reference database +db = open (db_name, "r") + +print ("\nStarting database analysis now.") + +t2 = time.time() + +# optional outfile of specific organism results +if "-SO" in sys.argv: + target_org = string_find("-SO") + db_SO_dictionary = {} + +# building a dictionary of the reference database +if "-F" in sys.argv: + db_func_dictionary = {} +if "-O" in sys.argv: + db_org_dictionary = {} +if "-R" in sys.argv: + db_ref_dictionary = {} +db_line_counter = 0 +db_error_counter = 0 + +def processfile(db_name, start=0, end=0, cmdargs = []): + output_db_subdict = {} + + with open(db_name, "r") as dbfh: + dbfh.seek(start) + lines = dbfh.readlines(end-start) + db_line_counter = 0 + db_error_counter = 0 + for line in lines: + if line.startswith(">"): + db_line_counter += 1 + splitline = line.split() + db_id = str(splitline[0])[1:] + if "-F" in cmdargs or "-SO" in cmdargs: + db_entry = line.split("[", 1) + db_entry = db_entry[0].split(" ", 1) + db_entry = db_entry[1][:-1] + if "-O" in cmdargs or "-SO" in cmdargs: + + # organism name + if line.count("[") != 1: + splitline = line.split("[") + db_org = splitline[line.count("[")].strip()[:-1] + if db_org[0].isdigit(): + split_db_org = db_org.split() + try: + if split_db_org[1] == "sp.": + db_org = split_db_org[0] + " " + split_db_org[1] + " " + split_db_org[2] + else: + db_org = split_db_org[1] + " " + split_db_org[2] + except IndexError: + try: + db_org = split_db_org[1] + except IndexError: + db_org = splitline[line.count("[")-1] + if db_org[0].isdigit(): + split_db_org = db_org.split() + db_org = split_db_org[1] + " " + split_db_org[2] + else: + db_org = line.split("[", 1) + db_org = db_org[1].split() + try: + db_org = str(db_org[0]) + " " + str(db_org[1]) + except IndexError: + db_org = line.strip().split("[", 1) + db_org = db_org[1][:-1] + db_error_counter += 1 + + db_org = re.sub('[^a-zA-Z0-9-_*. ]', '', db_org) + + # add to dictionaries + if "-F" in cmdargs: + output_db_subdict[db_id] = db_entry + elif "-O" in cmdargs: + output_db_subdict[db_id] = db_org + elif "-R" in cmdargs: + output_db_subdict[db_id] = db_id + elif "-SO" in cmdargs: + if target_org in db_org: + output_db_subdict[db_id] = db_entry + + # if "-F" in sys.argv: db_func_subdict[db_id] = db_entry + + return output_db_subdict, db_line_counter, db_error_counter + +filesize = os.path.getsize(db_name) +split_size = 100*1024*1024 # 100MB chunks +if filesize > split_size or threads>1 : + pool = mp.Pool(int(threads)) + cursor = 0 + results = [] + with open(db_name, 'r') as dbfh: + for chunk in range(int(filesize / split_size)+1): + if cursor + split_size > filesize: + end = filesize + else: + end = cursor + split_size + + dbfh.seek(end) + dbfh.readline() + end=dbfh.tell() + resulti = pool.apply_async(processfile, args=[db_name, cursor, end, sys.argv]) + results.append(resulti) + cursor=end + pool.close() + pool.join() + + for proc in results: + a,n1,n2 = proc.get() + if "-F" in sys.argv: db_func_dictionary.update(a) + if "-O" in sys.argv: db_org_dictionary.update(a) + if "-R" in sys.argv: db_ref_dictionary.update(a) + db_line_counter += n1 + db_error_counter += n2 + +else: + if "-F" in sys.argv: db_func_dictionary = processfile(db_name, 0, filesize, sys.argv) + if "-O" in sys.argv: db_org_dictionary = processfile(db_name, 0, filesize, sys.argv) + if "-R" in sys.argv: db_ref_dictionary = processfile(db_name, 0, filesize, sys.argv) + +db.close() + +t3 = time.time() + +print ("\nSuccess!") +print ("Time elapsed: " + str(t3-t2) + " seconds.") +print ("Number of lines: " + str(db_line_counter)) +print ("Number of database entries with only genus name (no species): " + str(db_error_counter)) + +# condensing down the identical matches +condensed_RefSeq_hit_db = {} + +for entry in RefSeq_hit_count_db.keys(): + try: + if "-O" in sys.argv: + org = db_org_dictionary[entry] + if "-F" in sys.argv: + org = db_func_dictionary[entry] + if "-R" in sys.argv: + org = org + "\t" + entry + if org in condensed_RefSeq_hit_db.keys(): + condensed_RefSeq_hit_db[org] += RefSeq_hit_count_db[entry] + else: + condensed_RefSeq_hit_db[org] = RefSeq_hit_count_db[entry] + except KeyError: + print ("KeyError:\t" + entry) + continue + +if "-SO" in sys.argv: + condensed_RefSeq_SO_hit_db = {} + + for entry in RefSeq_hit_count_db.keys(): + if entry in db_SO_dictionary.values(): + org = db_SO_dictionary[entry] + if org in condensed_RefSeq_SO_hit_db.keys(): + condensed_RefSeq_SO_hit_db[org] += RefSeq_hit_count_db[entry] + else: + condensed_RefSeq_SO_hit_db[org] = RefSeq_hit_count_db[entry] + +t4 = time.time() + +# dictionary output and summary +print ("\nDictionary database assembled.") +print ("Time elapsed: " + str(t4-t3) + " seconds.") +# print ("Number of errors - IDs not found in reference database: " + str(db_error_counter)) + +if "-O" in sys.argv: + print ("\nTop ten organism matches:") +if "-F" in sys.argv: + print ("\nTop ten function matches:") +if "-R" in sys.argv: + print ("\nTop ten reference IDs:") +for k, v in sorted(condensed_RefSeq_hit_db.items(), key=lambda kv: -kv[1])[:10]: + try: + print (str(v) + "\t" + k ) + except KeyError: + print (str(v) + "\tWARNING: Key not found for " + k) + continue + +# creating the outfiles +if "-O" in sys.argv: + outfile_name = infile_name[:-4] + "_organism.tsv" +if "-F" in sys.argv: + outfile_name = infile_name[:-4] + "_function.tsv" +if "-R" in sys.argv: + outfile_name = infile_name[:-4] + "_referenceIDs.tsv" +if "-SO" in sys.argv: + target_org_outfile = open(infile_name[:-4] + "_" + target_org + ".tsv", "w") + +outfile = open (outfile_name, "w") + +# writing the output +error_counter = 0 +for k, v in sorted(condensed_RefSeq_hit_db.items(), key=lambda kv: -kv[1]): + try: + q = v * 100 / float(line_counter) + outfile.write (str(q) + "\t" + str(v) + "\t" + k + "\n") + except KeyError: + outfile.write (str(q) + "\t" + str(v) + "\tWARNING: Key not found for " + k + "\n") + error_counter += 1 + continue + +# writing the output if optional specific organism flag is active +if "-SO" in sys.argv: + for k, v in sorted(condensed_RefSeq_SO_hit_db.items(), key=lambda kv: -kv[1]): + try: + q = v * 100 / float(line_counter) + target_org_outfile.write (str(q) + "\t" + str(v) + "\t" + k + "\n") + except KeyError: + target_org_outfile.write (str(q) + "\t" + str(v) + "\tWARNING: Key not found for " + k + "\n") + error_counter += 1 + continue + +outfile.close() + +print ("\nAnnotations saved to file: '" + outfile_name + "'.") +print ("Number of errors in writing annotations to outfile: " + str(error_counter)) From 7a6f8058e0e9f60821a7d8a78c5f8608d24a8de4 Mon Sep 17 00:00:00 2001 From: Edward Yang Date: Tue, 2 May 2023 14:53:55 +1000 Subject: [PATCH 4/9] Add master nextflow script --- samsa2-master.nf | 258 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 samsa2-master.nf diff --git a/samsa2-master.nf b/samsa2-master.nf new file mode 100644 index 0000000..98b615f --- /dev/null +++ b/samsa2-master.nf @@ -0,0 +1,258 @@ +#!/usr/bin/env nextflow + +params.input_dir = "$projectDir/input_files" +params.python_scripts = "$projectDir/python_scripts" +params.programs = "$projectDir/programs" +params.diamond_database = "$projectDir/full_databases/New_Bac_Vir_Arc_RefSeq" +refseq_db = "${params.diamond_database}.fa" +params.subsys_database = "$projectDir/full_databases/subsys_db" +params.output_dir = "$projectDir/output_files" +step_1_output_dir = "$params.output_dir/step_1_output" +step_2_output_dir = "$params.output_dir/step_2_output" +step_3_output_dir = "$params.output_dir/step_3_output" +step_4_output_dir = "$params.output_dir/step_4_output" +step_5_output_dir = "$params.output_dir/step_5_output" + +input_files = "${params.input_dir}/*_R{1,2}_*" // match everything else + +// step 1 +process TRIMMOMATIC { + + cpus 6 + memory '12 GB' + publishDir step_1_output_dir, mode: 'copy' + module 'trimmomatic' + + input: + tuple val(sample_id), path(reads) + + output: + tuple val("${sample_id}"), path("${sample_id}.cleaned.forward"), path("${sample_id}.cleaned.reverse") + + script: + """ + trimmomatic PE -phred33 -threads $task.cpus "$reads" ${sample_id}.cleaned.forward ${sample_id}.cleaned.forward_unpaired ${sample_id}.cleaned.reverse ${sample_id}.cleaned.reverse_unpaired SLIDINGWINDOW:4:15 MINLEN:70 + """ + +} + +// step 2 +process PEAR { + cpus 56 + memory '56 GB' + publishDir step_2_output_dir, mode: 'copy' + module 'pear/0.9.11' + + input: + tuple val(sample_id), path(reads_cleaned_forward), path(reads_cleaned_reverse) + + output: + tuple val("$sample_id"), path("${sample_id}.merged.assembled.fastq"), path("${sample_id}.merged.discarded.fastq"), path("${sample_id}.merged.unassembled.forward.fastq"), path("${sample_id}.merged.unassembled.reverse.fastq"), path("${sample_id}.merged.assembled.fastq.ribosomes.log") + + script: + """ + pear -f $reads_cleaned_forward -r $reads_cleaned_reverse -j $task.cpus -o ${sample_id}.merged 2>&1 | tee ${sample_id}.merged.assembled.fastq.ribosomes.log + """ +} + +// step 2.9 +process RAWREADCOUNT { + + cpus 1 + memory '4 GB' + publishDir step_2_output_dir, mode: 'copy' + + input: + path(input_forward_reads) + + output: + path 'raw_counts.txt' + + script: + """ + for infile in $input_forward_reads; do python $params.python_scripts/raw_read_counter.py -I \$infile -O raw_counts.txt; done + """ + +} + +process SORTMERNA { + + cpus 24 + memory '12 GB' + publishDir step_3_output_dir, mode: 'copy' + + input: + tuple val(sample_id), path(assembled_fastq) + + output: + tuple val(sample_id), path("${sample_id}.merged.ribodepleted.fastq") + + script: + """ + ${params.programs}/sortmerna-2.1/sortmerna \ + -a $task.cpus \ + --ref ${params.programs}/sortmerna-2.1/rRNA_databases/silva-bac-16s-id90.fasta,${params.programs}/sortmerna-2.1/index/silva-bac-16s-db \ + --reads $assembled_fastq \ + --aligned ${assembled_fastq}.ribosomes \ + --other ${sample_id}.merged.ribodepleted \ + --fastx \ + --log \ + -v + """ +} + +process DIAMOND_REFSEQ { + + cpus 70 + memory '170 GB' + publishDir "${step_4_output_dir}/RefSeq_results", mode: 'copy' + + input: + tuple val(sample_id), path(ribodepleted_fastq) + + output: + tuple val(sample_id), path("${sample_id}.merged.RefSeq_annotated"), path("${ribodepleted_fastq}.RefSeq.daa") + + script: + """ + ${params.programs}/diamond blastx --db $params.diamond_database -q $ribodepleted_fastq -a ${ribodepleted_fastq}.RefSeq -t /vast/scratch/users/\$USER/tmp -k 1 -p $task.cpus -b 12 -c 1 + ${params.programs}/diamond view --daa ${ribodepleted_fastq}.RefSeq.daa -o ${sample_id}.merged.RefSeq_annotated -f tab -p $task.cpus + """ + +} + +process DIAMOND_SUBSYS { + + cpus 24 + memory '170 GB' + publishDir "${step_4_output_dir}/Subsystems_results", mode: 'copy' + + input: + tuple val(sample_id), path(ribodepleted_fastq) + + output: + tuple val(sample_id), path("${sample_id}.merged.Subsys_annotated"), path("${ribodepleted_fastq}.Subsys.daa") + + script: + """ + ${params.programs}/diamond blastx --db $params.subsys_database -q $ribodepleted_fastq -a ${ribodepleted_fastq}.Subsys -t /vast/scratch/users/\$USER/tmp -k 1 -p $task.cpus -b 12 -c 1 + ${params.programs}/diamond view --daa ${ribodepleted_fastq}.Subsys.daa -o ${sample_id}.merged.Subsys_annotated -f tab -p $task.cpus + """ + + +} + +process REFSEQ_ANALYSISCOUNTER_FUNC { + + cpus 46 + memory '64 GB' + publishDir "${step_5_output_dir}/RefSeq_results/func_results", mode: 'copy' + + input: + tuple val(sample_id), path(refseq_annotated) + + output: + tuple val(sample_id), path("${sample_id}.merged.*.tsv") + + script: + """ + python ${params.python_scripts}/DIAMOND_analysis_counter_mp.py -I ${refseq_annotated} -D $refseq_db -F -t $task.cpus + """ +} + +process REFSEQ_ANALYSISCOUNTER_ORG { + + cpus 46 + memory '64 GB' + publishDir "${step_5_output_dir}/RefSeq_results/org_results", mode: 'copy' + + input: + tuple val(sample_id), path(refseq_annotated) + + output: + tuple val(sample_id), path("${sample_id}.merged.*.tsv") + + script: + """ + python ${params.python_scripts}/DIAMOND_analysis_counter_mp.py -I ${refseq_annotated} -D $refseq_db -O -t $task.cpus + """ +} + +process SUBSYS_ANALYSIS_COUNTER { + + cpus 2 + memory '64 GB' + publishDir "${step_5_output_dir}/Subsystems_results/receipts", pattern: '*.receipt', mode: 'copy' + + input: + tuple val(sample_id), path(subsys_annotated) + + output: + tuple val(sample_id), path('*.hierarchy'), path('*.receipt') + + script: + """ + python ${params.python_scripts}/DIAMOND_subsystems_analysis_counter.py \ + -I $subsys_annotated \ + -D ${params.subsys_database}.fa \ + -O ${subsys_annotated}.hierarchy \ + -P ${subsys_annotated}.receipt + """ + +} + +process SUBSYS_REDUCER { + + cpus 2 + memory '64 GB' + publishDir "${step_5_output_dir}/Subsystems_results", mode: 'copy' + + input: + tuple val(sample_id), path(subsys_annotated_hierarchy) + + output: + tuple val(sample_id), path('*.reduced') + + script: + """ + python ${params.python_scripts}/subsys_reducer.py -I $subsys_annotated_hierarchy + """ +} + +// process R { + +// } + +workflow { + + + infiles_ch = Channel.fromPath(input_files) + + Channel + .fromFilePairs(input_files, checkIfExists: true) + .set { read_pairs_ch } + + trim_ch = TRIMMOMATIC(read_pairs_ch) + + pear_ch = PEAR(trim_ch) + + forward_reads_ch = trim_ch.map{it[1]}.collect(flat: false) + raw_counts_ch = RAWREADCOUNT(forward_reads_ch) + + sortmerna_input_ch = pear_ch.map{[it[0],it[1]]} + sortmerna_ch = SORTMERNA(sortmerna_input_ch) + + // REFSEQ + diamond_refseq_ch = DIAMOND_REFSEQ(sortmerna_ch) + + refseq_analysiscounter_input_ch = diamond_refseq_ch.map{[it[0],it[1]]} + refseq_analysiscounter_func_ch = REFSEQ_ANALYSISCOUNTER_FUNC(refseq_analysiscounter_input_ch) + refseq_analysiscounter_org_ch = REFSEQ_ANALYSISCOUNTER_ORG(refseq_analysiscounter_input_ch) + + // SUBSYS + diamond_subsys_ch = DIAMOND_SUBSYS(sortmerna_ch) + subsys_analysiscounter_input_ch = diamond_subsys_ch.map{[it[0],it[1]]} + subsys_analysiscounter_ch = SUBSYS_ANALYSIS_COUNTER(subsys_analysiscounter_input_ch) + subsys_reducer_input_ch = subsys_analysiscounter_ch.map{[it[0],it[1]]} + subsys_reducer_ch = SUBSYS_REDUCER(subsys_reducer_input_ch) +} From 871ac1dfcb3114edddc4d7ef60cd0497891d16e1 Mon Sep 17 00:00:00 2001 From: Edward Yang <94523015+edoyango@users.noreply.github.com> Date: Tue, 2 May 2023 15:04:16 +1000 Subject: [PATCH 5/9] Add comments on output files --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0301c91..5320817 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ After following the Samsa2 setup instructions, the working directory structure s    ├── raw_read_counter.py    └── subsys_reducer.py ``` +*NOTE* that `samsa2-master.nf` makes use of `DIAMOND_analysis_counter_mp.py` in this repository (not the original `DIAMOND_analysis_counter.py`). See the modifications section below. ### Run the pipeline @@ -60,8 +61,14 @@ nextfow run samsa2-master.nf ## Modifications from original -`DIAMOND_analysis_counter_mp.py` is modified from the original SAMSA2 pipeline. It's modified to make use of Python's `multiprocessing` module. Currently, this produces identical results to the original in `-O` and `-F` mode (used in the original master script), but not `-R` and `-SO` mode. +`DIAMOND_analysis_counter_mp.py` is modified from the original SAMSA2 pipeline. It's modified to make use of Python's `multiprocessing` module. Number of processes is set with `-t ` option. Currently, this produces identical results to the original in `-O` and `-F` mode (used in the original master script), but not `-R` and `-SO` mode. The DIAMOND blastx step has the `-b 12` and `-c 1` [performance options](https://github.com/bbuchfink/diamond/wiki/3.-Command-line-options#memory--performance-options) added. These decrease the number of blocks and decrease the number of chunks from the default. Both of these changes have the net effect of increasing memory usage *substantially*, but reducing run times *substantially*. The maximum observed memory usage is 170GB, which Milton's nodes can easily satisfy, but may be problematic on other hardware. The DIAMOND blastx step is also making use of `/vast/scratch/users/$USER/tmp`. This should be parameterised in the future. + +## Output files + +The output files by default will be placed in `output_files` with the `step_{1..5}_output` folders which should contain the same results as the original master script. + +*NOTE* as Nextflow on WEHI's Milton HPC is configured to use the VAST Scratch as the working directory, to get around the 14-day deletion policy,`samsa2-master.nf` *copies* the results from the working directories to the `output_files` folder. From 9a2dcd40592c7ebbfe820fc06272d08f4cdbf299 Mon Sep 17 00:00:00 2001 From: Edward Yang <94523015+edoyango@users.noreply.github.com> Date: Tue, 2 May 2023 15:08:20 +1000 Subject: [PATCH 6/9] Add exit if passed an unimplemented option --- DIAMOND_analysis_counter_mp.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/DIAMOND_analysis_counter_mp.py b/DIAMOND_analysis_counter_mp.py index 49a3dd3..e7a77a2 100644 --- a/DIAMOND_analysis_counter_mp.py +++ b/DIAMOND_analysis_counter_mp.py @@ -22,6 +22,10 @@ # Created 8/16/2016, this version created 1/10/2017 # Sam Westreich, stwestreich@ucdavis.edu, github.com/transcript # +# DIAMON_analysis_counter_mp.py +# Created 10/03/2023, this version create 02/05/2023 +# Edward Yang, yang.e@wehi.edu.au, github.com/edoyango +# # This program parses through the results file from a DIAMOND annotation run # (in BLAST m8 format) to get the results into something more compressed # and readable. @@ -37,6 +41,7 @@ # -R reference returns reference IDs in results # -SO specific org creates a separate outfile for results that hit # a specific organism +# -t number of processes specifies the number of processes to use # ########################################################################## @@ -75,6 +80,9 @@ def string_find(usage_term): threads = string_find("-t") else: threads = 1 + +if "-SO" in sys.argv or "-R" in sys.argv: + sys.exit("This functionality hasn't been implemented yet.") infile = open (infile_name, "r") From c3f3af088aa0b81e777131c54dcad0c201647395 Mon Sep 17 00:00:00 2001 From: Edward Yang <94523015+edoyango@users.noreply.github.com> Date: Wed, 3 May 2023 09:05:09 +1000 Subject: [PATCH 7/9] Fix nextflow typo in README.md Co-authored-by: Michael Milton --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5320817..88e78c3 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ nextflow run samsa2-master.nf ### Parameters ``` -nextfow run samsa2-master.nf +nextflow run samsa2-master.nf --input_files --python_scripts --diamond_database From 3c376e3e62c4c9efd032284be090ea3f5a3e703c Mon Sep 17 00:00:00 2001 From: Edward Yang <94523015+edoyango@users.noreply.github.com> Date: Wed, 3 May 2023 09:05:28 +1000 Subject: [PATCH 8/9] Use correct python interpreter Co-authored-by: Michael Milton --- DIAMOND_analysis_counter_mp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DIAMOND_analysis_counter_mp.py b/DIAMOND_analysis_counter_mp.py index e7a77a2..5aa5ce6 100644 --- a/DIAMOND_analysis_counter_mp.py +++ b/DIAMOND_analysis_counter_mp.py @@ -1,4 +1,4 @@ -#!/usr/bin/env Python +#!/usr/bin/env python3 ########################################################################## # # Copyright (C) 2015-2016 Sam Westreich From 590654eb66a6c6b7ebd6aeff4cdf0985951a4ce4 Mon Sep 17 00:00:00 2001 From: Edward Yang <94523015+edoyango@users.noreply.github.com> Date: Wed, 3 May 2023 09:15:09 +1000 Subject: [PATCH 9/9] Make use of equals instead of set. Co-authored-by: Michael Milton --- samsa2-master.nf | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/samsa2-master.nf b/samsa2-master.nf index 98b615f..8f82ed5 100644 --- a/samsa2-master.nf +++ b/samsa2-master.nf @@ -228,9 +228,7 @@ workflow { infiles_ch = Channel.fromPath(input_files) - Channel - .fromFilePairs(input_files, checkIfExists: true) - .set { read_pairs_ch } + read_pairs_ch = Channel.fromFilePairs(input_files, checkIfExists: true) trim_ch = TRIMMOMATIC(read_pairs_ch)