-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSnakefile.rundata
217 lines (188 loc) · 8.95 KB
/
Snakefile.rundata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# vim: ft=python
from hesiod import ( load_final_summary, find_sequencing_summary, find_summary,
get_common_prefix, dump_yaml, load_yaml )
# Rules to filter, compress and combine the original files.
# These rules are designed to be included in Snakefile.main and will not run standalone.
# Note that these rules are bypassed if the rundata directory is missing, allowing us to repeat QC
# without errors relating to missing files.
# Compress the file discovered by the above function and rename it, matching the base of
# the FASTQ and BAM files. Note the original name is preserved in the GZIP header and can
# be revealed by 'gunzip -Nlv {output.gz}'.
rule gzip_sequencing_summary:
output:
gz = "{cell}/{fullid}_sequencing_summary.txt.gz",
md5 = "md5sums/{cell}/{fullid}_sequencing_summary.txt.gz.md5"
input: lambda wc: [find_sequencing_summary(EXPDIR, wc.cell)]
threads: 2
shell:
r"""{PIGZ} -v -p{threads} -Nc {input} >{output.gz}
( cd "$(dirname {output.gz:q})" && md5sum -- "$(basename {output.gz:q})" ) > {output.md5:q}
"""
localrules: convert_final_summary, copy_report
rule convert_final_summary:
output:
yaml = "{cell}/cell_final_summary.yaml",
input: lambda wc: find_summary('final_summary.txt', EXPDIR, wc.cell)
run:
dump_yaml(load_final_summary(str(input)), str(output.yaml))
# This needs to work for the HTML or the PDF reports
rule copy_report:
output:
pdf = "{cell}/{fullid}_report.{extn}"
input: lambda wc: find_summary(f"report.{wc.extn}", EXPDIR, wc.cell, allow_missing=True) or []
wildcard_constraints:
extn = r"pdf|html",
run:
if input:
shell("cp -T {input} {output.pdf}")
else:
shell("touch {output}")
# Copy and checksum a pod5 file:
# * merging is no longer needed, as of May 2024, but we should check the pod5 integrity
# * md5sum is invoked such that keeps the full path out of the .md5 file, as usual
# * barcode=. should work with this even though it's a bit sus.
# * making the input 'ancient' is an attempt to speed up the DAG build but I'm not sure it helps
# (it does with my hacked Snakemake!)
def i_copy_md5sum_pod5(wc):
"""Input func for a single pod5 file to be copied and checksummed.
The only fiddly bit is the barcode.
"""
# This is not much use.
#all_in_dir = SC[wildcards.cell][wildcards.barcode][f"pod5{wildcards._pfs}"]
if wc.barcode == ".":
res = f"{EXPDIR}/{wc.cell}/pod5{wc._pfs}/{wc.pod5file}.pod5"
else:
res = f"{EXPDIR}/{wc.cell}/pod5{wc._pfs}/{wc.barcode}/{wc.pod5file}.pod5"
return [ ancient(res) ]
# pod5 view seems a reasonable way to check pod5 files for basis consistency
rule copy_md5sum_pod5:
output:
pod5 = "{cell}/pod5_{barcode}{_pfs}/{pod5file}.pod5",
md5 = temp("md5sums/{cell}/pod5_{barcode}{_pfs}/{pod5file}.pod5.md5")
input:
i_copy_md5sum_pod5
params:
pod5_base = "pod5_{barcode}{_pfs}/{pod5file}.pod5"
shell:
r"""cp --no-preserve=all {input} {output.pod5}
( cd {wildcards.cell} && md5sum -- {params.pod5_base} ) > {output.md5}.tmp
pod5 view -I -o /dev/null {output.pod5}
mv {output.md5}.tmp {output.md5}
"""
localrules: merge_pod5_md5sums
# The copy_pod5 rule in Snakefile.main drives execution of the rule below which runs per barcode
# (and within that for pass+fail) and drives the rule above which actually copes and checksums the
# files.
def i_merge_pod5_md5sums(wildcards):
"""Available POD5 files within a single directory are listed
"""
pod5_dir = f"{wildcards.cell}/pod5_{wildcards.barcode}{wildcards._pfs}"
# What are the input pod5 files for this barcode?
# For output, we collapse the directory path slighly but keep the file names.
all_files = SC[wildcards.cell][wildcards.barcode][f'pod5{wildcards._pfs}']
res = []
for af in all_files:
basename = os.path.basename(af)
res.append(f"md5sums/{pod5_dir}/{basename}.md5")
return res
rule merge_pod5_md5sums:
output: "md5sums/{cell}/pod5_{barcode}{_pfs}/all_pod5.md5"
input: i_merge_pod5_md5sums
run:
# Cos sys.stderr gets silenced in sub-jobs:
logger.quiet.discard('all')
# Compile the per-merged-pod5 md5 files into one per barcode.
# Could do this with shell("cat ...") but there may be a lot of files.
lines_written = 0
with open(str(output), 'x') as ofh:
out_dir = os.path.dirname(str(output))
print(f"Writing pod5 md5 for {out_dir}")
for c in input:
with open(str(c)) as ifh:
for md5line in ifh:
ofh.write(md5line)
lines_written += 1
# Check that the count of md5 lines matches
assert lines_written == len(input), "lines_written == len(input)"
# These two concatenate and zip and sum the fastq. The fastq are smaller so one final file is OK.
# The name for the file is as per doc/filename_convention.txt but this rule doesn't care.
# I've broken out the rule that makes _fastq.list files so I can request the list without
# making the actual merged file. The basic FASTQ merge step doesn't do any batching.
localrules: list_fastq_or_bam
rule list_fastq_or_bam:
priority: 100
output:
fofn = r"{cell}/{fullid}_{barcode}_{pf}_{extn}.list"
input:
infiles = lambda wc: [ ancient(f"{EXPDIR}/{f}")
for f in SC[wc.cell][wc.barcode].get(f"{wc.extn}_{wc.pf}",()) ]
wildcard_constraints:
extn = r"fastq|fastq\.gz|bam",
run:
# Just write all the input filenames to the output file.
with open(output.fofn, "w") as fh:
try:
if not input.infiles:
# Some Snakemake versions still add an empty list
raise AttributeError("input.infiles is empty")
for fname in input.infiles: print(fname, file=fh)
except AttributeError:
# So there are no files. Make an empty list then.
pass
rule concat_gzip_md5sum_bam:
priority: 100
output:
bam = "{cell}/{fullid}_{barcode}_{pf}.bam",
md5 = "md5sums/{cell}/{fullid}_{barcode}_{pf}.bam.md5",
input:
fofn = "{cell}/{fullid}_{barcode}_{pf}_bam.list",
threads: 4
resources:
mem_mb = 12000,
n_cpus = 4,
run:
# Samtools 'cat' is a *lot* faster than merging the BAM files.
# Also samtools can accept a file of filenames to merge so we don't need xargs
if os.stat(str(input.fofn)).st_size:
shell(r"{TOOLBOX} samtools cat -@ {threads} -b {input.fofn} -o {output.bam}")
else:
# We're merging nothing. But Snakemake must have her output file.
shell("touch {output.bam}")
# md5sum
shell(r"( cd $(dirname {output.bam}) && md5sum $(basename {output.bam}) ) > {output.md5}")
rule concat_gzip_md5sum_fastq:
priority: 100
output:
gz = "{cell}/{fullid}_{barcode}_{pf}.fastq.gz",
md5 = "md5sums/{cell}/{fullid}_{barcode}_{pf}.fastq.gz.md5",
counts = "counts/{cell}/{fullid}_{barcode}_{pf}.fastq.count",
input:
fofn = "{cell}/{fullid}_{barcode}_{pf}_fastq.list",
fofn_gz = "{cell}/{fullid}_{barcode}_{pf}_fastq.gz.list",
threads: 8
resources:
mem_mb = 24000,
n_cpus = 8,
run:
# Rely on xargs to deal with way more files than could fit on the command line
# and zip them all into one.
shell(r"xargs -rd '\n' cat <{input.fofn} | {PIGZ} -p{threads} -c > {output.gz}")
# Add already gzipped files. Normally there will only be one or the other, but we do support both
# Note that we're just concatenating the zipped files, not decompressing and recompressing,
# but we do want to verify integrity, and this version should do so in a cache-friendly way.
shell(r"""xargs -n 1 -rd '\n' sh -c '{PIGZ} -t "$@" >&2 || exit 255 ; cat "$@"' - <{input.fofn_gz} >> {output.gz}""")
# Base counter
shell(r"{PIGZ} -cd {output.gz} | fq_base_counter.awk -r fn=`basename {output.gz} .gz` > {output.counts}")
shell(r"( cd $(dirname {output.gz}) && md5sum $(basename {output.gz}) ) > {output.md5}")
# Remove the fastq_pass_tmp directory which should normally be empty of files if
# everything worked.
# This now has to be 'rm -r', which could be bad because Snakemake runs this hook
# after every run including if I'm just manually recreating a single file.
# Therefore use config['cleanup'] to explicitly trigger cleanup.
#onsuccess:
# if config.get('cleanup'):
# shell("rm -rvf fastq_pass_tmp")
# FIXME - I'm not sure the above is needed for the pod5 version anyway.
# Also remember that cleaning up the files from all cells just because the
# last one ran ok is not necessarily safe, if we then expect to be able to
# resume the failed ones. Maybe I need to clean the specific cells?