diff --git a/DIAMOND_analysis_counter_mp.py b/DIAMOND_analysis_counter_mp.py new file mode 100644 index 0000000..5aa5ce6 --- /dev/null +++ b/DIAMOND_analysis_counter_mp.py @@ -0,0 +1,339 @@ +#!/usr/bin/env python3 +########################################################################## +# +# Copyright (C) 2015-2016 Sam Westreich +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +########################################################################## +# +# DIAMOND_analysis_counter.py +# Created 8/16/2016, this version created 1/10/2017 +# Sam Westreich, stwestreich@ucdavis.edu, github.com/transcript +# +# DIAMON_analysis_counter_mp.py +# Created 10/03/2023, this version create 02/05/2023 +# Edward Yang, yang.e@wehi.edu.au, github.com/edoyango +# +# This program parses through the results file from a DIAMOND annotation run +# (in BLAST m8 format) to get the results into something more compressed +# and readable. +# +# Usage: +# +# -I infile specifies the infile (a DIAMOND results file +# in m8 format) +# -D database specifies a reference database to search against +# for results +# -O organism returns organism results +# -F function returns functional results +# -R reference returns reference IDs in results +# -SO specific org creates a separate outfile for results that hit +# a specific organism +# -t number of processes specifies the number of processes to use +# +########################################################################## + +# imports +import operator, sys, time, gzip, re, os, multiprocessing as mp + +# String searching function: +def string_find(usage_term): + for idx, elem in enumerate(sys.argv): + this_elem = elem + next_elem = sys.argv[(idx + 1) % len(sys.argv)] + if elem == usage_term: + return next_elem + +t0 = time.time() + +# checking for an option (organism or function) to be specified +if "-O" not in sys.argv: + if "-F" not in sys.argv: + if "-R" not in sys.argv: + sys.exit("WARNING: need to specify either organism results (with -O flag in command), reference IDs (with -R flag in command), or functional results (with -F flag in command).") + +# loading starting file +if "-I" in sys.argv: + infile_name = string_find("-I") +else: + sys.exit ("WARNING: infile must be specified using '-I' flag.") + +# checking to make sure database is specified +if "-D" in sys.argv: + db_name = string_find("-D") +else: + sys.exit( "No database file indicated; skipping database search step.") + +if "-t" in sys.argv: + threads = string_find("-t") +else: + threads = 1 + +if "-SO" in sys.argv or "-R" in sys.argv: + sys.exit("This functionality hasn't been implemented yet.") + +infile = open (infile_name, "r") + +# setting up databases +RefSeq_hit_count_db = {} +unique_seq_db = {} +line_counter = 0 + +# reading through the infile - the DIAMOND results m8 format +print ("\nNow reading through the m8 results infile.") + +for line in infile: + line_counter += 1 + splitline = line.split("\t") + if line_counter % 1000000 == 0: + t99 = time.time() + print (str(line_counter)[:-6] + "M lines processed so far in " + str(t99-t0) + " seconds.") + + unique_seq_db[splitline[0]] = 1 + + try: + RefSeq_hit_count_db[splitline[1]] += 1 + except KeyError: + RefSeq_hit_count_db[splitline[1]] = 1 + continue + +t1 = time.time() + +print ("\nAnalysis of " + infile_name + " complete.") +print ("Number of total lines: " + str(line_counter)) +print ("Number of unique sequences: " + str(len(unique_seq_db))) +print ("Time elapsed: " + str(t1-t0) + " seconds.") + +infile.close() + +# time to search for these in the reference database +db = open (db_name, "r") + +print ("\nStarting database analysis now.") + +t2 = time.time() + +# optional outfile of specific organism results +if "-SO" in sys.argv: + target_org = string_find("-SO") + db_SO_dictionary = {} + +# building a dictionary of the reference database +if "-F" in sys.argv: + db_func_dictionary = {} +if "-O" in sys.argv: + db_org_dictionary = {} +if "-R" in sys.argv: + db_ref_dictionary = {} +db_line_counter = 0 +db_error_counter = 0 + +def processfile(db_name, start=0, end=0, cmdargs = []): + output_db_subdict = {} + + with open(db_name, "r") as dbfh: + dbfh.seek(start) + lines = dbfh.readlines(end-start) + db_line_counter = 0 + db_error_counter = 0 + for line in lines: + if line.startswith(">"): + db_line_counter += 1 + splitline = line.split() + db_id = str(splitline[0])[1:] + if "-F" in cmdargs or "-SO" in cmdargs: + db_entry = line.split("[", 1) + db_entry = db_entry[0].split(" ", 1) + db_entry = db_entry[1][:-1] + if "-O" in cmdargs or "-SO" in cmdargs: + + # organism name + if line.count("[") != 1: + splitline = line.split("[") + db_org = splitline[line.count("[")].strip()[:-1] + if db_org[0].isdigit(): + split_db_org = db_org.split() + try: + if split_db_org[1] == "sp.": + db_org = split_db_org[0] + " " + split_db_org[1] + " " + split_db_org[2] + else: + db_org = split_db_org[1] + " " + split_db_org[2] + except IndexError: + try: + db_org = split_db_org[1] + except IndexError: + db_org = splitline[line.count("[")-1] + if db_org[0].isdigit(): + split_db_org = db_org.split() + db_org = split_db_org[1] + " " + split_db_org[2] + else: + db_org = line.split("[", 1) + db_org = db_org[1].split() + try: + db_org = str(db_org[0]) + " " + str(db_org[1]) + except IndexError: + db_org = line.strip().split("[", 1) + db_org = db_org[1][:-1] + db_error_counter += 1 + + db_org = re.sub('[^a-zA-Z0-9-_*. ]', '', db_org) + + # add to dictionaries + if "-F" in cmdargs: + output_db_subdict[db_id] = db_entry + elif "-O" in cmdargs: + output_db_subdict[db_id] = db_org + elif "-R" in cmdargs: + output_db_subdict[db_id] = db_id + elif "-SO" in cmdargs: + if target_org in db_org: + output_db_subdict[db_id] = db_entry + + # if "-F" in sys.argv: db_func_subdict[db_id] = db_entry + + return output_db_subdict, db_line_counter, db_error_counter + +filesize = os.path.getsize(db_name) +split_size = 100*1024*1024 # 100MB chunks +if filesize > split_size or threads>1 : + pool = mp.Pool(int(threads)) + cursor = 0 + results = [] + with open(db_name, 'r') as dbfh: + for chunk in range(int(filesize / split_size)+1): + if cursor + split_size > filesize: + end = filesize + else: + end = cursor + split_size + + dbfh.seek(end) + dbfh.readline() + end=dbfh.tell() + resulti = pool.apply_async(processfile, args=[db_name, cursor, end, sys.argv]) + results.append(resulti) + cursor=end + pool.close() + pool.join() + + for proc in results: + a,n1,n2 = proc.get() + if "-F" in sys.argv: db_func_dictionary.update(a) + if "-O" in sys.argv: db_org_dictionary.update(a) + if "-R" in sys.argv: db_ref_dictionary.update(a) + db_line_counter += n1 + db_error_counter += n2 + +else: + if "-F" in sys.argv: db_func_dictionary = processfile(db_name, 0, filesize, sys.argv) + if "-O" in sys.argv: db_org_dictionary = processfile(db_name, 0, filesize, sys.argv) + if "-R" in sys.argv: db_ref_dictionary = processfile(db_name, 0, filesize, sys.argv) + +db.close() + +t3 = time.time() + +print ("\nSuccess!") +print ("Time elapsed: " + str(t3-t2) + " seconds.") +print ("Number of lines: " + str(db_line_counter)) +print ("Number of database entries with only genus name (no species): " + str(db_error_counter)) + +# condensing down the identical matches +condensed_RefSeq_hit_db = {} + +for entry in RefSeq_hit_count_db.keys(): + try: + if "-O" in sys.argv: + org = db_org_dictionary[entry] + if "-F" in sys.argv: + org = db_func_dictionary[entry] + if "-R" in sys.argv: + org = org + "\t" + entry + if org in condensed_RefSeq_hit_db.keys(): + condensed_RefSeq_hit_db[org] += RefSeq_hit_count_db[entry] + else: + condensed_RefSeq_hit_db[org] = RefSeq_hit_count_db[entry] + except KeyError: + print ("KeyError:\t" + entry) + continue + +if "-SO" in sys.argv: + condensed_RefSeq_SO_hit_db = {} + + for entry in RefSeq_hit_count_db.keys(): + if entry in db_SO_dictionary.values(): + org = db_SO_dictionary[entry] + if org in condensed_RefSeq_SO_hit_db.keys(): + condensed_RefSeq_SO_hit_db[org] += RefSeq_hit_count_db[entry] + else: + condensed_RefSeq_SO_hit_db[org] = RefSeq_hit_count_db[entry] + +t4 = time.time() + +# dictionary output and summary +print ("\nDictionary database assembled.") +print ("Time elapsed: " + str(t4-t3) + " seconds.") +# print ("Number of errors - IDs not found in reference database: " + str(db_error_counter)) + +if "-O" in sys.argv: + print ("\nTop ten organism matches:") +if "-F" in sys.argv: + print ("\nTop ten function matches:") +if "-R" in sys.argv: + print ("\nTop ten reference IDs:") +for k, v in sorted(condensed_RefSeq_hit_db.items(), key=lambda kv: -kv[1])[:10]: + try: + print (str(v) + "\t" + k ) + except KeyError: + print (str(v) + "\tWARNING: Key not found for " + k) + continue + +# creating the outfiles +if "-O" in sys.argv: + outfile_name = infile_name[:-4] + "_organism.tsv" +if "-F" in sys.argv: + outfile_name = infile_name[:-4] + "_function.tsv" +if "-R" in sys.argv: + outfile_name = infile_name[:-4] + "_referenceIDs.tsv" +if "-SO" in sys.argv: + target_org_outfile = open(infile_name[:-4] + "_" + target_org + ".tsv", "w") + +outfile = open (outfile_name, "w") + +# writing the output +error_counter = 0 +for k, v in sorted(condensed_RefSeq_hit_db.items(), key=lambda kv: -kv[1]): + try: + q = v * 100 / float(line_counter) + outfile.write (str(q) + "\t" + str(v) + "\t" + k + "\n") + except KeyError: + outfile.write (str(q) + "\t" + str(v) + "\tWARNING: Key not found for " + k + "\n") + error_counter += 1 + continue + +# writing the output if optional specific organism flag is active +if "-SO" in sys.argv: + for k, v in sorted(condensed_RefSeq_SO_hit_db.items(), key=lambda kv: -kv[1]): + try: + q = v * 100 / float(line_counter) + target_org_outfile.write (str(q) + "\t" + str(v) + "\t" + k + "\n") + except KeyError: + target_org_outfile.write (str(q) + "\t" + str(v) + "\tWARNING: Key not found for " + k + "\n") + error_counter += 1 + continue + +outfile.close() + +print ("\nAnnotations saved to file: '" + outfile_name + "'.") +print ("Number of errors in writing annotations to outfile: " + str(error_counter)) diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1c24a1f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 WEHI-ResearchComputing + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..88e78c3 --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +# samsa2-nf +Nextflow translation for [SAMSA2](https://github.com/transcript/samsa2) metatranscriptomics pipeline. Tailored for running on WEHI Milton. The translation is of the [master](https://github.com/transcript/samsa2/blob/master/bash_scripts/master_script.sh) script in the original pipeline. + +## Usage + +### Setup + +After following the Samsa2 setup instructions, the working directory structure should look like: + +``` +. +├── full_databases +│   ├── New_Bac_Vir_Arc_RefSeq.dmnd +│   ├── New_Bac_Vir_Arc_RefSeq.fa +│   ├── RefSeq_bac.fa +│   ├── subsys_db.dmnd +│   ├── subsys_db.fa +│   └── viral_reference +├── input_files +│   ├── 48E_S68_L001_R1_001.fastq # input files follow *_R{1,2}_* pattern +│   ├── 48E_S68_L001_R2_001.fastq +│   ├── 48F_S69_L001_R1_001.fastq +│   ├── 48F_S69_L001_R2_001.fastq +| ├── ...other input files... +├── programs +│   ├── diamond +│   ├── diamond-linux64.tar.gz +│   ├── diamond_manual.pdf +│   ├── diamond-sse2 +│   ├── pear-0.9.10-linux-x86_64 +│   ├── pear-0.9.10-linux-x86_64.tar.gz +│   ├── sortmerna-2.1 +│   ├── sortmerna-2.1.tar.gz +│   ├── Trimmomatic-0.36 +│   └── Trimmomatic-0.36.zip +└── python_scripts +    ├── DIAMOND_analysis_counter_mp.py # this is a parallel version of DIAMOND_analysis_counter.py +    ├── DIAMOND_analysis_counter.py +    ├── DIAMOND_subsystems_analysis_counter.py +    ├── raw_read_counter.py +    └── subsys_reducer.py +``` +*NOTE* that `samsa2-master.nf` makes use of `DIAMOND_analysis_counter_mp.py` in this repository (not the original `DIAMOND_analysis_counter.py`). See the modifications section below. + +### Run the pipeline + +```bash +nextflow run samsa2-master.nf +``` + +### Parameters + +``` +nextflow run samsa2-master.nf + --input_files + --python_scripts + --diamond_database + --subsys_database + --output_dir +``` + +## Modifications from original + +`DIAMOND_analysis_counter_mp.py` is modified from the original SAMSA2 pipeline. It's modified to make use of Python's `multiprocessing` module. Number of processes is set with `-t ` option. Currently, this produces identical results to the original in `-O` and `-F` mode (used in the original master script), but not `-R` and `-SO` mode. + +The DIAMOND blastx step has the `-b 12` and `-c 1` [performance options](https://github.com/bbuchfink/diamond/wiki/3.-Command-line-options#memory--performance-options) added. These decrease the number of blocks and decrease the number of chunks from the default. Both of these changes have the net effect of increasing memory usage *substantially*, but reducing run times *substantially*. The maximum observed memory usage is 170GB, which Milton's nodes can easily satisfy, but may be problematic on other hardware. + +The DIAMOND blastx step is also making use of `/vast/scratch/users/$USER/tmp`. This should be parameterised in the future. + +## Output files + +The output files by default will be placed in `output_files` with the `step_{1..5}_output` folders which should contain the same results as the original master script. + +*NOTE* as Nextflow on WEHI's Milton HPC is configured to use the VAST Scratch as the working directory, to get around the 14-day deletion policy,`samsa2-master.nf` *copies* the results from the working directories to the `output_files` folder. diff --git a/samsa2-master.nf b/samsa2-master.nf new file mode 100644 index 0000000..8f82ed5 --- /dev/null +++ b/samsa2-master.nf @@ -0,0 +1,256 @@ +#!/usr/bin/env nextflow + +params.input_dir = "$projectDir/input_files" +params.python_scripts = "$projectDir/python_scripts" +params.programs = "$projectDir/programs" +params.diamond_database = "$projectDir/full_databases/New_Bac_Vir_Arc_RefSeq" +refseq_db = "${params.diamond_database}.fa" +params.subsys_database = "$projectDir/full_databases/subsys_db" +params.output_dir = "$projectDir/output_files" +step_1_output_dir = "$params.output_dir/step_1_output" +step_2_output_dir = "$params.output_dir/step_2_output" +step_3_output_dir = "$params.output_dir/step_3_output" +step_4_output_dir = "$params.output_dir/step_4_output" +step_5_output_dir = "$params.output_dir/step_5_output" + +input_files = "${params.input_dir}/*_R{1,2}_*" // match everything else + +// step 1 +process TRIMMOMATIC { + + cpus 6 + memory '12 GB' + publishDir step_1_output_dir, mode: 'copy' + module 'trimmomatic' + + input: + tuple val(sample_id), path(reads) + + output: + tuple val("${sample_id}"), path("${sample_id}.cleaned.forward"), path("${sample_id}.cleaned.reverse") + + script: + """ + trimmomatic PE -phred33 -threads $task.cpus "$reads" ${sample_id}.cleaned.forward ${sample_id}.cleaned.forward_unpaired ${sample_id}.cleaned.reverse ${sample_id}.cleaned.reverse_unpaired SLIDINGWINDOW:4:15 MINLEN:70 + """ + +} + +// step 2 +process PEAR { + cpus 56 + memory '56 GB' + publishDir step_2_output_dir, mode: 'copy' + module 'pear/0.9.11' + + input: + tuple val(sample_id), path(reads_cleaned_forward), path(reads_cleaned_reverse) + + output: + tuple val("$sample_id"), path("${sample_id}.merged.assembled.fastq"), path("${sample_id}.merged.discarded.fastq"), path("${sample_id}.merged.unassembled.forward.fastq"), path("${sample_id}.merged.unassembled.reverse.fastq"), path("${sample_id}.merged.assembled.fastq.ribosomes.log") + + script: + """ + pear -f $reads_cleaned_forward -r $reads_cleaned_reverse -j $task.cpus -o ${sample_id}.merged 2>&1 | tee ${sample_id}.merged.assembled.fastq.ribosomes.log + """ +} + +// step 2.9 +process RAWREADCOUNT { + + cpus 1 + memory '4 GB' + publishDir step_2_output_dir, mode: 'copy' + + input: + path(input_forward_reads) + + output: + path 'raw_counts.txt' + + script: + """ + for infile in $input_forward_reads; do python $params.python_scripts/raw_read_counter.py -I \$infile -O raw_counts.txt; done + """ + +} + +process SORTMERNA { + + cpus 24 + memory '12 GB' + publishDir step_3_output_dir, mode: 'copy' + + input: + tuple val(sample_id), path(assembled_fastq) + + output: + tuple val(sample_id), path("${sample_id}.merged.ribodepleted.fastq") + + script: + """ + ${params.programs}/sortmerna-2.1/sortmerna \ + -a $task.cpus \ + --ref ${params.programs}/sortmerna-2.1/rRNA_databases/silva-bac-16s-id90.fasta,${params.programs}/sortmerna-2.1/index/silva-bac-16s-db \ + --reads $assembled_fastq \ + --aligned ${assembled_fastq}.ribosomes \ + --other ${sample_id}.merged.ribodepleted \ + --fastx \ + --log \ + -v + """ +} + +process DIAMOND_REFSEQ { + + cpus 70 + memory '170 GB' + publishDir "${step_4_output_dir}/RefSeq_results", mode: 'copy' + + input: + tuple val(sample_id), path(ribodepleted_fastq) + + output: + tuple val(sample_id), path("${sample_id}.merged.RefSeq_annotated"), path("${ribodepleted_fastq}.RefSeq.daa") + + script: + """ + ${params.programs}/diamond blastx --db $params.diamond_database -q $ribodepleted_fastq -a ${ribodepleted_fastq}.RefSeq -t /vast/scratch/users/\$USER/tmp -k 1 -p $task.cpus -b 12 -c 1 + ${params.programs}/diamond view --daa ${ribodepleted_fastq}.RefSeq.daa -o ${sample_id}.merged.RefSeq_annotated -f tab -p $task.cpus + """ + +} + +process DIAMOND_SUBSYS { + + cpus 24 + memory '170 GB' + publishDir "${step_4_output_dir}/Subsystems_results", mode: 'copy' + + input: + tuple val(sample_id), path(ribodepleted_fastq) + + output: + tuple val(sample_id), path("${sample_id}.merged.Subsys_annotated"), path("${ribodepleted_fastq}.Subsys.daa") + + script: + """ + ${params.programs}/diamond blastx --db $params.subsys_database -q $ribodepleted_fastq -a ${ribodepleted_fastq}.Subsys -t /vast/scratch/users/\$USER/tmp -k 1 -p $task.cpus -b 12 -c 1 + ${params.programs}/diamond view --daa ${ribodepleted_fastq}.Subsys.daa -o ${sample_id}.merged.Subsys_annotated -f tab -p $task.cpus + """ + + +} + +process REFSEQ_ANALYSISCOUNTER_FUNC { + + cpus 46 + memory '64 GB' + publishDir "${step_5_output_dir}/RefSeq_results/func_results", mode: 'copy' + + input: + tuple val(sample_id), path(refseq_annotated) + + output: + tuple val(sample_id), path("${sample_id}.merged.*.tsv") + + script: + """ + python ${params.python_scripts}/DIAMOND_analysis_counter_mp.py -I ${refseq_annotated} -D $refseq_db -F -t $task.cpus + """ +} + +process REFSEQ_ANALYSISCOUNTER_ORG { + + cpus 46 + memory '64 GB' + publishDir "${step_5_output_dir}/RefSeq_results/org_results", mode: 'copy' + + input: + tuple val(sample_id), path(refseq_annotated) + + output: + tuple val(sample_id), path("${sample_id}.merged.*.tsv") + + script: + """ + python ${params.python_scripts}/DIAMOND_analysis_counter_mp.py -I ${refseq_annotated} -D $refseq_db -O -t $task.cpus + """ +} + +process SUBSYS_ANALYSIS_COUNTER { + + cpus 2 + memory '64 GB' + publishDir "${step_5_output_dir}/Subsystems_results/receipts", pattern: '*.receipt', mode: 'copy' + + input: + tuple val(sample_id), path(subsys_annotated) + + output: + tuple val(sample_id), path('*.hierarchy'), path('*.receipt') + + script: + """ + python ${params.python_scripts}/DIAMOND_subsystems_analysis_counter.py \ + -I $subsys_annotated \ + -D ${params.subsys_database}.fa \ + -O ${subsys_annotated}.hierarchy \ + -P ${subsys_annotated}.receipt + """ + +} + +process SUBSYS_REDUCER { + + cpus 2 + memory '64 GB' + publishDir "${step_5_output_dir}/Subsystems_results", mode: 'copy' + + input: + tuple val(sample_id), path(subsys_annotated_hierarchy) + + output: + tuple val(sample_id), path('*.reduced') + + script: + """ + python ${params.python_scripts}/subsys_reducer.py -I $subsys_annotated_hierarchy + """ +} + +// process R { + +// } + +workflow { + + + infiles_ch = Channel.fromPath(input_files) + + read_pairs_ch = Channel.fromFilePairs(input_files, checkIfExists: true) + + trim_ch = TRIMMOMATIC(read_pairs_ch) + + pear_ch = PEAR(trim_ch) + + forward_reads_ch = trim_ch.map{it[1]}.collect(flat: false) + raw_counts_ch = RAWREADCOUNT(forward_reads_ch) + + sortmerna_input_ch = pear_ch.map{[it[0],it[1]]} + sortmerna_ch = SORTMERNA(sortmerna_input_ch) + + // REFSEQ + diamond_refseq_ch = DIAMOND_REFSEQ(sortmerna_ch) + + refseq_analysiscounter_input_ch = diamond_refseq_ch.map{[it[0],it[1]]} + refseq_analysiscounter_func_ch = REFSEQ_ANALYSISCOUNTER_FUNC(refseq_analysiscounter_input_ch) + refseq_analysiscounter_org_ch = REFSEQ_ANALYSISCOUNTER_ORG(refseq_analysiscounter_input_ch) + + // SUBSYS + diamond_subsys_ch = DIAMOND_SUBSYS(sortmerna_ch) + subsys_analysiscounter_input_ch = diamond_subsys_ch.map{[it[0],it[1]]} + subsys_analysiscounter_ch = SUBSYS_ANALYSIS_COUNTER(subsys_analysiscounter_input_ch) + subsys_reducer_input_ch = subsys_analysiscounter_ch.map{[it[0],it[1]]} + subsys_reducer_ch = SUBSYS_REDUCER(subsys_reducer_input_ch) +}