-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpipeline.py
executable file
·282 lines (254 loc) · 16.2 KB
/
pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
__author__ = 'alipirani'
"""Declaring required python modules"""
import argparse
import ConfigParser
import subprocess
import re
import os
import sys
import errno
import glob
from modules.log_modules import *
from logging_subprocess import *
from config_settings import ConfigSectionMap
from modules.coverage import *
from modules.quality import *
from modules.multiqc import *
from modules.screen_contamination import *
from modules.kraken_contamination import *
from datetime import datetime
from argparse import RawTextHelpFormatter
from modules.coverage_depth import *
from modules.kraken_report import *
from modules.mlst import mlst
from modules.summary import *
from modules.amr import *
from modules.mashscreen import *
from modules.trim import *
""" Command line argument parsing """
def parser():
parser = argparse.ArgumentParser(description='QC\'d - Quality control and Contamination Detection Workflow', formatter_class=RawTextHelpFormatter)
required = parser.add_argument_group('Required arguments')
optional = parser.add_argument_group('Optional arguments')
required.add_argument('-samples', action='store', dest="samples", help='A file containing filenames of forward-paired end or single-end reads. One Sample per line')
optional.add_argument('-config', action='store', dest="config", help='Path to Config file, Make sure to check config settings before running pipeline.\nNote: Set Kraken database path under [kraken] config section')
required.add_argument('-dir', action='store', dest="directory", help='Path to Sequencing Reads Data directory. NOTE: Provide full/absolute path.')
required.add_argument('-analysis', action='store', dest="analysis_names", help='comma-seperated analysis names to run\n[Options: coverage,quality,kraken_contamination,kraken_report,coverage_depth,amr,mlst,mashscreen].\nExample: "-analysis coverage,quality" - This will estimate coverage and quality for the samples given in -samples')
required.add_argument('-o', action='store', dest="output_folder", help='Output Folder Path ending with output directory name to save the results.\nCreates a new output directory path if it doesn\'t exist.')
required.add_argument('-type', action='store', dest='type', help='Type of analysis: SE or PE')
optional.add_argument('-cluster', action='store', dest='cluster', help='Run in one of the two modes. \nDefault is local for coverage and fastqc analysis.\nFor all the other analysis Default is cluster. The pipeline prefers cluster mode to generate SLURM/PBS jobs.\nModes: local or cluster')
required.add_argument('-genome_size', action='store', dest='size', help='Estimated Genome Size - to be used for estimating coverage in downsampling steps')
required.add_argument('-prefix', action='store', dest='prefix', help='Use this prefix to save the results files')
optional.add_argument('-reference', action='store', dest='reference', help='Reference genome to use for calculating GATK Depth of coverage.')
optional.add_argument('-downsample', action='store', dest="downsample",
help='yes/no: Downsample Reads data to default depth of 100X')
optional.add_argument('-scheduler', action='store', dest="scheduler",
help='\nType of Scheduler for generating Kraken cluster jobs: PBS, SLURM, LOCAL')
optional.add_argument('-dryrun', action='store_true', dest="dryrun",
help='Perform a trial run without submitting cluster jobs',
required=False)
optional.add_argument('-mlst_db', action='store', dest="mlst_db",
help='Ariba MLST database path',
required=False)
optional.add_argument('-amr_db', action='store', dest="amr_db",
help='Ariba AMR CARD database path',
required=False)
optional.add_argument('-mash_refseq_db', action='store', dest="mash_refseq_db",
help='Mash Refseq database Sketch',
required=False)
return parser
""" Main Pipeline """
def pipeline(args, logger, Config, output_folder, prefix, reference):
keep_logging('\nSTART: Pipeline', 'START: Pipeline', logger, 'info')
""" Check Subroutines and create logger object: Arguments, Input files, Reference Index"""
keep_logging('Checking Dependencies...', 'Checking Dependencies', logger, 'info')
""" Check java availability """
java_check()
""" Check if the input file exists """
with open(args.samples) as fp:
for line in fp:
line = line.strip()
line = args.directory + "/" + line
filenames_array.append(line)
if args.type != "PE":
reverse_raw = "None"
file_exists(line, reverse_raw)
else:
#reverse_raw = args.directory + "/" + reverse_raw
file_exists(line, line)
keep_logging('Total no. of Samples %s' % len(filenames_array), 'Total no. of Samples %s' % len(filenames_array), logger, 'info')
""" Start the pipeline: """
analysis_list = args.analysis_names.split(',')
keep_logging('Running analysis - %s' % args.analysis_names, 'Running analysis - %s' % args.analysis_names,
logger, 'info')
""" Copy filenames to output folder """
cp_cmd = "cp %s %s" % (args.samples, output_folder)
os.system(cp_cmd)
""" Set Default cluster mode"""
if args.cluster:
cluster = args.cluster
else:
cluster = "local"
""" Start Specific analysis based on analysis list """
for analysis in analysis_list:
if analysis == "coverage":
keep_logging("Step: Calculating Coverage...\n", "Calculating Coverage", logger, 'info')
coverage_fastqscan(filenames_array, Config, logger, output_folder, args.type, args.samples, args.size, prefix)
elif analysis == "quality":
keep_logging("Step: Analysing Fastqc Quality...\n", "Analysing Fastqc Quality...", logger, 'info')
fastqc_main_directory = args.output_folder + "/%s_Fastqc" % args.prefix
make_sure_path_exists(fastqc_main_directory)
fastqc_forward_directory = fastqc_main_directory + "/%s_Forward" % args.prefix
make_sure_path_exists(fastqc_forward_directory)
fastqc_reverse_directory = fastqc_main_directory + "/%s_Reverse" % args.prefix
make_sure_path_exists(fastqc_reverse_directory)
Multiqc_reports_directory = args.output_folder + "/%s_Multiqc_reports" % args.prefix
make_sure_path_exists(Multiqc_reports_directory)
quality(filenames_array, Config, logger, output_folder, args.type, args.samples, fastqc_main_directory, fastqc_forward_directory, fastqc_reverse_directory, args.cluster, args.scheduler)
multiqc(fastqc_forward_directory, "%s_Forward_fastqc" % args.prefix, Config, logger, Multiqc_reports_directory, cluster, args.scheduler)
multiqc(fastqc_reverse_directory, "%s_Reverse_fastqc" % args.prefix, Config, logger, Multiqc_reports_directory, cluster, args.scheduler)
elif analysis == "screen_contamination":
keep_logging("Step: Screening Fastq reads against Reference Database...\n", "Screening Fastq reads against Reference Database...", logger, 'info')
fastq_screen_directory = args.output_folder + "/%s_Fastqc_screen" % args.prefix
make_sure_path_exists(fastq_screen_directory)
screen_contamination(filenames_array, Config, logger, output_folder, args.type, args.samples, fastq_screen_directory, cluster)
Multiqc_reports_directory = args.output_folder + "/%s_Multiqc_reports" % args.prefix
make_sure_path_exists(Multiqc_reports_directory)
multiqc(fastq_screen_directory, "%s_Fastq_screen" % args.prefix, Config, logger, Multiqc_reports_directory)
keep_logging('MultiQC Report of FastQC results can be found in - %s\n' % Multiqc_reports_directory, 'MultiQC Report of FastQC results can be found in - %s\n' % Multiqc_reports_directory, logger, 'info')
elif analysis == "kraken_contamination":
keep_logging("Step: Running Kraken on Input reads...\n", "Running Kraken on Input reads...", logger, 'info')
kraken_directory = args.output_folder + "/%s_Kraken_results" % args.prefix
make_sure_path_exists(kraken_directory)
kraken_contamination(filenames_array, Config, logger, output_folder, args.type, args.samples, kraken_directory, cluster, args.downsample, args.scheduler, args.size, args.dryrun)
elif analysis == "kraken_report":
keep_logging("Step: Generating Kraken report on Kraken Results...\n", "Generating Kraken report on Kraken Results...", logger, 'info')
kraken_directory = args.output_folder + "/%s_Kraken_results" % args.prefix
make_sure_path_exists(kraken_directory)
kraken_report(filenames_array, Config, logger, output_folder, args.type, args.samples, kraken_directory, cluster, args.scheduler)
elif analysis == "coverage_depth":
keep_logging("Step: Running Coverage Depth analysis on Input reads...\n", "Running Coverage Depth analysis on Input reads...", logger, 'info')
coverage_depth_directory = args.output_folder + "/%s_Coverage_depth" % args.prefix
make_sure_path_exists(coverage_depth_directory)
coverage_depth_analysis(filenames_array, Config, logger, output_folder, args.type, args.samples, coverage_depth_directory, cluster, reference, args.scheduler)
elif analysis == "mlst":
keep_logging("Step: Running Ariba MLST sequence typing on Input reads...\n", "Running MLST sequence typing on Input reads...", logger, 'info')
if args.mlst_db:
mlstdb = args.mlst_db
else:
mlstdb = ConfigSectionMap("ariba", Config)['mlst_db_path']
keep_logging(
'',
"Using Ariba MLST Database from this path - %s" % mlstdb,
logger, 'debug')
mlst_directory = args.output_folder + "/%s_MLST_results" % args.prefix
make_sure_path_exists(mlst_directory)
mlst(filenames_array, Config, logger, mlst_directory, args.type, args.samples, mlst_directory, cluster, args.scheduler, mlstdb)
elif analysis == "amr":
keep_logging("Step: Running Ariba AMR detection on Input reads...\n", "Running Ariba AMR detection on Input reads...", logger, 'info')
if args.amr_db:
amrdb = args.amr_db
else:
amrdb = ConfigSectionMap("ariba", Config)['amr_db_path']
keep_logging(
'',
"Using Ariba AMR Database from this path - %s" % amrdb,
logger, 'debug')
amr_directory = args.output_folder + "/%s_AMR_results" % args.prefix
make_sure_path_exists(amr_directory)
amr(filenames_array, Config, logger, amr_directory, args.type, args.samples, amr_directory, cluster, args.scheduler, amrdb)
elif analysis == "mashscreen":
keep_logging("Step: Running Mash Screen on Input reads...\n", "Running Mash Screen on Input reads...", logger, 'info')
if args.mash_refseq_db:
mashrefseqdb = args.mash_refseq_db
else:
mashrefseqdb = ConfigSectionMap("mash", Config)['mash_refseq_db']
keep_logging(
'',
"Using Mash RefSeq Database Sketch from this path - %s" % mashrefseqdb,
logger, 'debug')
mashscreen_directory = args.output_folder + "/%s_Mashscreen_results" % args.prefix
make_sure_path_exists(mashscreen_directory)
mashscreen(filenames_array, Config, logger, mashscreen_directory, args.type, args.samples, mashscreen_directory, cluster, args.scheduler, mashrefseqdb)
elif analysis == "summary":
keep_logging('', "Generating Summary report for QC'd analysis - %s" % args.prefix, logger, 'debug')
summary(filenames_array, Config, logger, args.prefix, output_folder)
keep_logging("Summary report - %s/%s_summary.tsv" % (output_folder, prefix), "Summary report - %s/%s_summary.tsv" % (output_folder, prefix), logger, 'info')
elif analysis == "aftertrimmultiqc":
keep_logging("Step: Running After Trimmomatic MultiQC on Input reads...\n", "Running After Trimmomatic MultiQC on Input reads...", logger, 'info')
aftertrimqc_directory = args.output_folder + "/%s_AfterTrimQC_results" % args.prefix
make_sure_path_exists(aftertrimqc_directory)
#aftertrimqc(filenames_array, Config, logger, aftertrimqc_directory, args.type, args.samples, cluster, args.scheduler)
clean_reads(filenames_array, args.type, aftertrimqc_directory, logger, Config, args.scheduler)
multiqc(aftertrimqc_directory, "%s_Forward_fastqc" % args.prefix, Config, logger, aftertrimqc_directory, cluster, args.scheduler)
""" Check Subroutines """
""" Usage Message: """
def usage():
print "Usage: pipeline.py [-h] [-samples FILENAMES] [-dir DIRECTORY of Fastq files] [-config CONFIG] [-analysis ANALYSIS_NAME1,ANALYSIS_NAME2,...] [-o OUTPUT_FOLDER] [-type TYPE]\n"
""" Check Java Availability: """
def java_check():
print "Checking Java Availability....\n"
jd = subprocess.check_output(["java", "-version"], stderr=subprocess.STDOUT)
if len(jd) < 1:
print "Unable to find a java runtime environment. The pipeline requires java 6 or later.\n"
else:
print "Java Availability Check completed ...\n" + jd
""" Make sure input raw reads files exists at given location. """
def file_exists(path1, path2):
if path1:
if not os.path.isfile(path1):
file_basename = os.path.basename(path1)
print "The input file " + file_basename + " does not exists. \nPlease provide another file.\n"
exit()
if path2 != "None":
if not os.path.isfile(path2):
file_basename = os.path.basename(path2)
print "The input file " + file_basename + " does not exists. \nPlease provide another file.\n"
exit()
""" Make sure the output folder exists or create at given path """
def make_sure_path_exists(out_path):
try:
os.makedirs(out_path)
except OSError as exception:
if exception.errno != errno.EEXIST:
print "Errors in output folder path! please change the output path or analysis name\n"
exit()
""" Start of Main Method/Pipeline """
if __name__ == '__main__':
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
start_time_2 = datetime.now()
log_unique_time = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
args = parser().parse_args()
global config_file
global filenames_array
global Config
global logger
analysis_string = args.analysis_names.replace(',', '_')
filenames_array = []
# Set config file paths
if args.config:
config_file = args.config
else:
config_file = os.path.dirname(os.path.abspath(__file__)) + "/config"
Config = ConfigParser.ConfigParser()
Config.read(config_file)
logger = generate_logger(args.output_folder, analysis_string, log_unique_time)
# Set output directory paths
if args.output_folder != '':
args.output_folder += '/'
make_sure_path_exists(args.output_folder)
if args.output_folder != '':
args.output_folder += '/'
# Set reference genome path to map the samples and calculate coverage depth
if "coverage_depth" in args.analysis_names:
if args.reference:
reference = ConfigSectionMap(args.reference, Config)['ref_path'] + "/" + ConfigSectionMap(args.reference, Config)['ref_name']
else:
print "Please provide reference genome name or Check the reference genome path in config file.\n"
keep_logging('\n\nNo reference genome provided. Exiting\n', '\nNo reference genome provided. Exiting\n', logger, 'error')
exit()
# Main Workflow
pipeline(args, logger, Config, args.output_folder, args.prefix, args.reference)
keep_logging('End: Pipeline\n', 'End: Pipeline', logger, 'info')
time_taken = datetime.now() - start_time_2
keep_logging('Total Time taken: {}'.format(time_taken), 'Total Time taken: {}'.format(time_taken), logger, 'info')