-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSnakefile.blob
308 lines (281 loc) · 13.6 KB
/
Snakefile.blob
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# vim: ft=python
# Rules to make BLOB plots and summary tables.
# These rules are designed to be included in Snakefile.main and will not run standalone.
from hesiod import slurp_file
# Blob plotting is originally copied from SMRTino
# BLAST 10k sequences in 40 chunks, chopping long sequences to 4096 bases
BLOB_SUBSAMPLE = int(config.get('blob_subsample', 10000))
BLOB_CHUNKS = int(config.get('blob_chunks', 40))
BLOB_CHOP = int(config.get('blob_chop', 4096))
BLOB_LEVELS = config.get('blob_levels', "phylum order species".split())
BLAST_SCRIPT = config.get('blast_script', "blast_nt")
# For testing, make blobs of all three passing outputs. Probably we just want "pass" in the
# final version. See also label_for_part() in the main Snakefile.
BLOB_PARTS = config.get('blob_parts', ["pass"])
# Snakemake won't let us set a config item to [], so...
if BLOB_PARTS == "none" or BLOB_PARTS == ["none"]:
BLOB_PARTS = []
# This is how I want to pass my plots into compile_cell_info.py
# Serves as the driver by depending on the 6 (3?) blob plots and thumbnails for
# each, and arranges the plots into 2 (1?) rows of 3 columns as we wish to
# display them.
# We also depend on the CSV outputs of parse_blob_table, which will be rendered to
# markdown within the make_report script. These are grouped by project not cell, so
# we need a separate rule below.
localrules: per_cell_blob_plots, per_project_blob_tables, fasta_numseqs, \
parse_blob_table, chop_chunk
wildcard_constraints:
chunk = r"part_[0-9]+",
n = r"[0-9]+",
# Basic FASTA sequence counter...
# we need a special case for empty files here - these simply have 0 sequences.
rule fasta_numseqs:
output: "{foo}.fasta.numseqs"
input: "{foo}.fasta"
shell:
r'''if ! grep -o '^>' {input} | wc -l > {output} ; then
[ ! -s {input} ] && echo 0 > {output}
fi
'''
rule per_cell_blob_plots:
output: "blob/{base}_{barcode}_plots.yaml"
input:
png = lambda wc: expand( "blob/{base}_{bc}_{pf}.{taxlevel}.{extn}{thumb}.png",
base = wc.base,
bc = wc.barcode,
pf = BLOB_PARTS,
taxlevel = BLOB_LEVELS,
extn = "cov0 read_cov.cov0".split(),
thumb = ['.__thumb', ''] ),
sample = lambda wc: expand( "blob/{base}_{bc}_{pf}+sub{ss}.fasta.numseqs",
base = wc.base,
bc = wc.barcode,
pf = BLOB_PARTS,
ss = [BLOB_SUBSAMPLE] ),
run:
# We want to know how big the subsample actually was, as it may be < BLOB_SUBSAMPLE,
# so check the FASTA, then make a dict of {part: seq_count} for this cell.
wc = wildcards
if not BLOB_PARTS:
counts = {}
else:
counts = { part: slurp_file(f)[0]
for part, f in zip(BLOB_PARTS, input.sample) }
# I need to emit the plots in order in pairs. Unfortunately expand() won't quite
# cut it here in preserving order but I can make a nested list comprehension.
# Group by BLOB_PARTS with an appropriate title.
plots = [ dict(title = 'Taxonomy for {pf} reads ({c} sequences) by {l}'.format(
pf = label_for_part(pf, wc.barcode),
c = counts[pf],
l = ', '.join(BLOB_LEVELS) ),
barcode = wc.barcode,
pf = pf,
subsample = counts[pf],
taxlevels = ', '.join(BLOB_LEVELS),
has_data = (str(counts[pf]) != '0'),
files = [ [ "{basebase}_{bc}_{pf}.{taxlevel}.{extn}.png".format(
basebase = os.path.basename(wc.base),
bc = wc.barcode,
pf = pf,
taxlevel = taxlevel,
extn = extn )
for taxlevel in BLOB_LEVELS ]
for extn in "read_cov.cov0 cov0".split() ]
) for pf in BLOB_PARTS ]
dump_yaml(plots, str(output))
# See above. Now we want to make a CSV table per project summarizing the main taxa found
# in the blob database.
# TODO - work out if this should be split by barcode? Prob not.
# Note - the "{x,$}" is a hack to foil Snakemake which tries to stat all the inputs for
# the rule even if we don't even try to generate this output.
rule per_project_blob_tables:
output: "blob/blobstats_by_project.yaml{x,$}"
input:
tsv = expand("blob/blobstats.{project}.{pf}.{taxlevel}.tsv",
project = SC_DATA['cells_per_project'],
pf = BLOB_PARTS,
taxlevel = BLOB_LEVELS ),
run:
# List ALL the tables of blob stats to show per project
res = dict()
for p in SC_DATA['cells_per_project']:
# For each project p make a list of tables, first by {pf}, then by tax level.
tsv_list = res[p] = list()
for pf in BLOB_PARTS:
for tl in BLOB_LEVELS:
# Recreate the filename. I should just be able to go through input.tsv
# in order but I'm not 100% sure.
tsv = f"blobstats.{p}.{pf}.{tl}.tsv"
assert "blob/" + tsv in [str(s) for s in input.tsv]
# At present we have one table per sample and all the barcodes (if any) are split in rows.
tsv_list.append( dict( title = "BLAST hit percentages for {pf} reads by {taxlevel}".format(
pf = label_for_part(pf),
taxlevel = tl ),
tsv = tsv ) )
# And this gives us the data structure we need.
dump_yaml(res, str(output))
rule parse_blob_table:
output: "blob/blobstats.{project}.{pf}.{taxlevel}.tsv"
input:
lambda wc: [ f"blob/{cellname_to_base(cell)}_{barcode}_{wc.pf}.{wc.taxlevel}.blobplot.stats.txt"
for cell in SC_DATA['cells_per_project'][wc.project]
for barcode in SC[cell] ]
params:
pct_limit = 1.0,
label = 'Cell'
shell:
"parse_blob_table.py -l {params.label:q} -t -o {output} -c {params.pct_limit} {input}"
# Convert to FASTA and subsample and munge the headers
# seqtk seq -ACU == to-fasta, no-comments, uppercase
# Also because pigz and/or 'seqtk seq' may well be killed by SIGPIPE we have to ignore pipe fails,
# even though this may mask other problems.
# Note I was thinking to add "sed '/^>/!s/U/T/g'" to fix U's to T's but apparently BLAST is cool
# with the U bases.
rule fastq_to_subsampled_fasta:
output: "blob/{foo}_{pf}+sub{n}.fasta"
input: "{foo}_{pf}.fastq.gz"
threads: 2
shell:
r"""set +o pipefail
{PIGZ} -p{threads} -d -c {input} | \
{TOOLBOX} seqtk seq -ACU - | \
{TOOLBOX} seqtk sample - {wildcards.n} | \
sed -e 's,/,_,g' > {output}
"""
# Makes a .complexity file for our FASTA file
# {foo} will be blob/{cell}/{ci[Experiment]}_{ci[Library]}_{ci[CellID]}_{pf}+sub{ss}.fasta
rule fasta_to_complexity:
output: "blob/{foo}.complexity"
input: "blob/{foo}.fasta"
params:
level = 10
shell:
"{TOOLBOX} dustmasker -level {params.level} -in {input} -outfmt fasta 2>/dev/null | count_dust.py > {output}"
# Split the FASTA into (at most) BLOB_CHUNKS chunks. The number may be less than BLOB_CHUNKS so this
# is a checkpoint rule, and merge_blast_reports then responds to the variable number of outputs. Note
# this will even 'split' a completely empty file if you ask it to, and make zero output files plus
# an empty output.parts.
checkpoint split_fasta_in_chunks:
output:
list = "blob/{foo}.fasta_parts_list",
parts = temp(directory("blob/{foo}.fasta_parts")),
input: "blob/{foo}.fasta"
params:
chunksize = BLOB_SUBSAMPLE // BLOB_CHUNKS
shell:
"""mkdir {output.parts}
touch {output.parts}/list
awk 'BEGIN {{n_seq=0;n_file=0;}} \
/^>/ {{if(n_seq%{params.chunksize}==0){{ \
file=sprintf("{output.parts}/part_%04d.fasta", n_file); n_file++; \
print file >> "{output.parts}/list"; \
}} \
print >> file; n_seq++; next; \
}} \
{{ print >> file; }}' {input}
mv {output.parts}/list {output.list}
"""
# Combine all the 100 (or however many) blast reports into one
# I'm filtering out repeated rows to reduce the size of the BLOB DB - there can
# be a _lot_ of repeats so this is worth running on the cluster.
# The input may also be empty but that's OK it still works!
def i_merge_blast_reports(wildcards):
"""Return a list of BLAST reports to be merged based upon how many chunks
were outputted by split_fasta_in_chunks.
"""
chunks_list_file = checkpoints.split_fasta_in_chunks.get(**wildcards).output.list
with open(chunks_list_file) as fh:
fasta_chunks = [ l.rstrip('\n') for l in fh ]
# Munge the list of FASTA chunks to get the list of required BLAST chunks
return dict( bparts =
[ re.sub(r'\.fasta_parts/', '.blast_parts/',
re.sub(r'\.fasta$', '.bpart', c))
for c in fasta_chunks ] )
rule merge_blast_reports:
output: "blob/{foo}.blast"
input: unpack(i_merge_blast_reports)
shell:
'LC_ALL=C ; ( for i in {input.bparts} ; do sort -u -k1,2 "$i" ; done ) > {output}'
# BLAST a chunk. Note the 'blast_nt' wrapper determines the actual database to search,
# but you can specify an alternate wrapper, which need not be in the TOOLBOX,
# in config['blast_script']
rule blast_chunk:
output: temp("blob/{foo}.blast_parts/{chunk}.bpart")
input: f"blob/{{foo}}.fasta_parts/{{chunk}}+chop{BLOB_CHOP}.fasta"
threads: 6
resources:
mem_mb = 24000,
n_cpus = 8,
params:
evalue = '1e-50',
outfmt = '6 qseqid staxid bitscore'
shell:
"""{TOOLBOX} {BLAST_SCRIPT} -query {input} -outfmt {params.outfmt:q} \
-evalue {params.evalue:q} -max_target_seqs 1 -out {output}.tmp -num_threads {threads}
mv {output}.tmp {output}
"""
rule chop_chunk:
output: temp("blob/{foo}.fasta_parts/{chunk}+chop{n}.fasta")
input: "blob/{foo}.fasta_parts/{chunk}.fasta"
shell:
"""awk -v chop={wildcards.n} \
'{{print $1~/^>/ ? $0 : substr($0,0,chop)}}' {input} > {output}
"""
# Makes a blob db per FASTA using the complexity file as a COV file.
# {foo} is {cell}.subreads or {cell}.scraps
# If reads_sample is empty this will generate an empty file
rule blob_db:
output:
json = "blob/{foo}.blobDB.json",
input:
blast_results = f"blob/{{foo}}+sub{BLOB_SUBSAMPLE}.blast",
reads_sample = f"blob/{{foo}}+sub{BLOB_SUBSAMPLE}.fasta",
cov = f"blob/{{foo}}+sub{BLOB_SUBSAMPLE}.complexity"
shadow: 'minimal'
resources:
mem_mb = 12000,
n_cpus = 2,
shell:
r'''if [ ! -s {input.reads_sample} ] ; then touch {output.json} ; exit 0 ; fi
mkdir blob_tmp
{TOOLBOX} blobtools create -i {input.reads_sample} -o blob_tmp/tmp \
-t {input.blast_results} -c {input.cov}
ls -l blob_tmp
mv blob_tmp/tmp.blobDB.json {output.json}
'''
# Run the blob plotting command once per set per tax level. Produce a single
# stats file and a pair of png files
# If blobDB.json is empty, make some empty images
rule blob_plot_png:
output:
plotc = ["blob/{foo}.{taxlevel}.cov0.png", "blob/{foo}.{taxlevel}.cov0.__thumb.png"],
plotr = ["blob/{foo}.{taxlevel}.read_cov.cov0.png", "blob/{foo}.{taxlevel}.read_cov.cov0.__thumb.png"],
stats = "blob/{foo}.{taxlevel}.blobplot.stats.txt"
input:
json = "blob/{foo}.blobDB.json"
params:
maxsize = "1750x1750",
thumbsize = "320x320"
shadow: 'minimal'
resources:
mem_bb = 12000,
n_cpus = 2,
shell:
r'''mkdir blob_tmp
if [ -s {input.json} ] ; then
export BLOB_COVERAGE_LABEL=Non-Dustiness
{TOOLBOX} blobtools plot -i {input.json} -o blob_tmp/ --dustplot --sort_first no-hit,other,undef -r {wildcards.taxlevel}
ls blob_tmp
mv blob_tmp/tmp.*.stats.txt {output.stats}
{TOOLBOX} convert blob_tmp/tmp.*.{wildcards.taxlevel}.*.blobplot.cov0.png \
-resize {params.maxsize}'>' {output.plotc[0]}
{TOOLBOX} convert blob_tmp/tmp.*.{wildcards.taxlevel}.*.blobplot.read_cov.cov0.png \
-resize {params.maxsize}'>' {output.plotr[0]}
else
echo "No data" > {output.stats}
{TOOLBOX} gm_label.sh {params.thumbsize} "No data to plot" {output.plotc[0]}
{TOOLBOX} gm_label.sh {params.thumbsize} "No data to plot" {output.plotr[0]}
fi
{TOOLBOX} convert {output.plotc[0]} -resize {params.thumbsize}'>' {output.plotc[1]}
{TOOLBOX} convert {output.plotr[0]} -resize {params.thumbsize}'>' {output.plotr[1]}
'''