-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsummarize_lane_contents.py
executable file
·569 lines (475 loc) · 25.8 KB
/
summarize_lane_contents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
#!/usr/bin/env python3
import os, sys, re
import datetime
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from illuminatus.SampleSheetReader import SampleSheetReader
from illuminatus.RunInfoXMLParser import RunInfoXMLParser
from illuminatus.RunParametersXMLParser import RunParametersXMLParser
from illuminatus.Formatters import pct, fmt_time
from illuminatus.yaml import load_yaml, dump_yaml, ParserError
# Project links can be set by an environment var, presumably in environ.sh
PROJECT_PAGE_URL = os.environ.get('PROJECT_PAGE_URL', "http://foo.example.com/")
try:
if PROJECT_PAGE_URL.format('test') == PROJECT_PAGE_URL:
PROJECT_PAGE_URL += '{}'
except Exception:
print(f"The environment variable PROJECT_PAGE_URL={PROJECT_PAGE_URL} is not a valid format string.",
file = sys.stderr)
raise
# Non-pools may either be called 'NoPool' or ''. Other names may be added here.
NON_POOLS = ['NoPool', 'None', '']
def parse_args(*args):
description = """This script is part of the Illuminatus pipeline.
It gathers an overview of the run by parsing the SampleSheet.csv, RunParameters.xml
and RunInfo.xml in the current directory, and by looking for proper project names
in pipeline/project_names.yaml, and for basemasks in pipeline/output/demultiplexing
Output may be in YAML, MQC, TSV or Text format. MQC is suitable for MultiQC custom
content - http://multiqc.info/docs/#custom-content. YAML may be re-loaded and re-presented
as any format.
"""
# Note that summarize_for_overview.py now obtains much of the information from the YAML
# outputted form this script. Possibly the functionality should be folded in here? One
# reason to not do that is that a few things are always checked dynamically by that script,
# but the shtick of this script is that any output format can be created purely from the YAML,
# and the YAML is only dependent on the sample sheet and run metadata files.
a = ArgumentParser( description=description,
formatter_class = ArgumentDefaultsHelpFormatter )
a.add_argument("--from_yml",
help="Get all the info from the supplied YAML file, not by" +
" scanning the directory." )
a.add_argument("--yml",
help="Output in YAML format to the specified file (- for stdout)." )
a.add_argument("--mqc",
help="Output for MultiQC to the specified file (- for stdout)." )
a.add_argument("--txt",
help="Output in text format to the specified file (- for stdout)." )
a.add_argument("--tsv",
help="Output in TSV format to the specified file (- for stdout)." )
a.add_argument("--add_in_yaml", nargs="*",
help="Add columns by sucking in extra bits of YAML. Items must be" +
" of the form key=file where key is [wd, yield, b2f]")
a.add_argument("run_dir", nargs='?', default='.',
help="Supply a directory to scan, if not the current directory.")
return a.parse_args(*args)
def main(args):
"""Basic gist - build data structure in memory, then serialize it as
requested.
"""
#Sanity check that some output mode is active.
if not any([args.yml, args.mqc, args.txt, args.tsv]):
exit("No output specified. Nothing to do.")
#See where we want to get our info
try:
if args.from_yml:
data_struct = load_yaml( sys.stdin if args.from_yml == '-'
else args.from_yml )
else:
data_struct = scan_for_info(args.run_dir)
except FileNotFoundError as e:
exit(f"Error summarizing run.\n{e}")
#See about extra stuff that we learn as processing goes on
for ai in (args.add_in_yaml or []):
try:
key, filename = ai.split('=', 1)
if key not in ["wd", "yield", "b2f"]:
exit("Key for add_in_yaml must be wd, b2f or yield.")
if not filename:
#Empty val can be an artifact of the way Snakemake is calling this script
continue
data_struct['add_in_' + key] = load_yaml(filename)
except ValueError:
exit(f"Error parsing {ai} as add_in_yaml.")
#See where we want to put it...
for dest, formatter in [ ( args.yml, dump_yaml ),
( args.mqc, output_mqc ),
( args.txt, output_txt ),
( args.tsv, output_tsv ) ]:
if dest:
if dest == '-':
formatter(data_struct, fh=sys.stdout)
else:
with open(dest, 'w') as ofh:
formatter(data_struct, fh=ofh)
#DONE!
def output_mqc(rids, fh):
"""This also happens to be YAML but is specifically for display
in MultiQC. The filename should end in _mqc.yaml (not .yml) in
order to be picked up.
"""
# Decide if this is a MiSeq (non-patterned flowcell). This will determine
# exactly what is displayed.
is_patterned_flowcell = not ( rids['Instrument'].startswith('hiseq2500') or
rids['Instrument'].startswith('miseq') )
mqc_out = dict(
id = 'lane_summary',
section_name = 'Lane Summary',
description = 'Content of lanes in the run',
plot_type = 'table',
pconfig = { 'title': '', 'sortRows': True, 'no_beeswarm': True },
data = {},
headers = {},
)
#So my understanding is that pconfig needs to be a list of
#singleton dicts as {col_id: { conf_1: 'foo', conf_2: 'bar' }}
#for colnum, col in enumerate(["Lane", "Project", "Pool/Library", "Loaded (pmol)"]):
# mqc_out['pconfig'].append( { 'col_{:02}'.format(colnum) : dict() } )
# Nope - apparently not. Had to read the source...
# 'headers' needs to be a dict of { col_id: {title: ..., format: ... }
table_headers = ["Lane", "Project", "Pool/Library", "Num Indexes", "Loaded (pmol)"]
table_formats = ["", "{:s}", "{:s}", "{:,}", "{:s}", ]
table_desc = [None, None, "Summary of lane contents. See per-lane pages for a full list.",
"Number of samples, or 0 for a single unindexed sample.",
None, ]
# We'll always add the density column but will hide it later for patterned flowcells
if 'add_in_yield' in rids:
table_headers.extend(["Aligned PhiX (%)", "Density", "Clusters PF", "PF (%)", "Q30 (%)", "Yield GB"])
table_formats.extend(["{:.3f}", "{:,.1f}", "{:,}", "{:.3f}", "{:.3f}", "{:.3f}" ])
table_desc.extend( ["Percentage of PhiX according to InterOp",
"Raw cluster density according to InterOp",
"Count of clusters/wells passing filter",
"Percent of clusters/wells passing filter",
"Percent of bases being Q30 or more",
"Yield in Gigabases" ])
# Also tack on a grand total to the description line above the table,
# unless we have the more accurate b2f values available.
if 'add_in_yield' in rids and 'add_in_b2f' not in rids:
yield_totals = [ rids['add_in_yield'][f"lane{lane['LaneNumber']}"]['Totals'] for lane in rids['Lanes'] ]
mqc_out['description'] += ", with {:,} of {:,} clusters passing filter, according to InterOP ({:.3f}%)".format(
sum(t['reads_pf'] for t in yield_totals),
sum(t['reads'] for t in yield_totals),
pct( sum(t['reads_pf'] for t in yield_totals), sum(t['reads'] for t in yield_totals) ))
if 'add_in_wd' in rids:
table_headers.extend(["Well Dups (%)"])
table_formats.extend(["{:.2f}" ])
table_desc.extend( ["Average well dups (raw figure from count_well_dups) over the lane"])
if 'add_in_b2f' in rids:
table_headers.extend(["Barcode Balance"])
table_formats.extend(["{:.4f}" ])
table_desc.extend( ["Barcode balance expressed in terms of CV (from bcl2fastq)"])
# Tack on a grand total to the description line of the table, using the
# more accurate values than we have from interop.
yield_totals = [ rids['add_in_b2f'][int(lane['LaneNumber'])] for lane in rids['Lanes'] ]
grand_total_raw = sum(t.get('Total Reads Raw') for t in yield_totals)
grand_total_pf = sum(t.get('Assigned Reads',0) + t.get('Unassigned Reads PF',0) for t in yield_totals)
mqc_out['description'] += ", with {:,} of {:,} clusters passing filter, according to bcl2fastq ({:.3f}%)".format(
grand_total_pf,
grand_total_raw,
pct( grand_total_pf, grand_total_raw ))
# Here we tweak the settings for our table columns.
# col1_header is actually col0_header!
mqc_out['pconfig']['col1_header'] = table_headers[0]
for colnum, col in list(enumerate(table_headers))[1:]:
column_settings = dict(title=col, format=table_formats[colnum])
# This is a bit of a hack, but if the header contains a '%' symbol set min and max
# accordingly. Also add the description.:
if '%' in col: column_settings.update(min=0, max=100)
if 'Barcode Balance' in col: column_settings.update(min=0, max=1)
if 'Density' in col and is_patterned_flowcell: column_settings.update(hidden=True)
if table_desc[colnum]: column_settings.update(description=table_desc[colnum])
mqc_out['headers'][f"col_{colnum:02}"] = column_settings
# As a special case, force the Pool/Library column to be treated as text.
# I might be asked to make the full list of libs appear in the popup, but let's
# not second guess that.
# Also the same for the Project column as ther may be many
mqc_out['headers']['col_01']['textcell'] = True # Project
mqc_out['headers']['col_02']['textcell'] = True # Pool/Library
for lane in rids['Lanes']:
# Logic here is just copied from output_tsv, but we also want the total num_indexes
# like in output_txt.
# First put all the pools in one dict (not partitioned by project)
pools_union = dict_union(lane['Contents'].values())
num_indexes = 0 if lane.get('Unindexed') else sum(len(v) for v in pools_union.values())
contents_str = ', '.join( squish_project_content( pools_union , 20) )
dd = mqc_out['data'][f"Lane {lane['LaneNumber']}"] = dict(
col_01 = ','.join( sorted(lane['Contents']) ),
col_02 = contents_str,
col_03 = num_indexes,
col_04 = lane['Loading'].get('pmol', 'unknown') )
if 'add_in_yield' in rids:
# was: table_headers.extend(["Clusters PF", "Q30 (%)", "Yield"])
# now: table_headers.extend(["Clusters PF", "PF (%)", "Q30 (%)", "Yield GB"])
lane_yield_info = rids['add_in_yield'][f"lane{lane['LaneNumber']}"]['Totals']
dd['col_05'] = lane_yield_info.get('percent_aligned', 'unknown')
dd['col_06'] = lane_yield_info['density']
dd['col_07'] = lane_yield_info['reads_pf']
dd['col_08'] = pct(lane_yield_info['reads_pf'], lane_yield_info['reads'])
dd['col_09'] = lane_yield_info['percent_gt_q30']
dd['col_10'] = lane_yield_info['yield_g']
if 'add_in_wd' in rids:
#table_headers.extend(["Well Dups (%)"])
# See at which index in the table this header has ended up...
dd_col, = [ k for k, v in mqc_out['headers'].items() if v['title'].startswith("Well Dups") ]
# Get the relevant dict from the YAML data file which is indexed by lane and surface
lane_wd_info = rids['add_in_wd'][f"{lane['LaneNumber']}"]['mean']
# Add the raw value for now - could choose v1 or v2 instead?
dd[dd_col] = lane_wd_info['raw']
if 'add_in_b2f' in rids:
#table_headers.extend(["Barcode Balance"])
dd_col, = [ k for k, v in mqc_out['headers'].items() if v['title'].startswith("Barcode Balance") ]
lane_b2f_totals = rids['add_in_b2f'][int(lane['LaneNumber'])]
# If all the entries are blank does MultiQC hide the column for me or do I need to
# do that myself?? Or do I even want to?
if 'Barcode Balance' in lane_b2f_totals:
dd[dd_col] = lane_b2f_totals['Barcode Balance']
# If b2f data is provided, use the more accurate yield numbers for 'reads_pf', overwriting
# those from interop.
if 'add_in_yield' in rids:
dd['col_07'] = lane_b2f_totals.get('Assigned Reads',0) + lane_b2f_totals.get('Unassigned Reads PF',0)
dump_yaml(mqc_out, fh=fh)
def scan_for_info(run_dir):
"""Hoovers up the info and builds a data structure which can
be serialized to YAML or converted to the various output formats.
"""
try:
# File must be valid YAML or empty (which loads as None)
pnfile = os.path.join(run_dir, "pipeline/project_names.yaml")
project_names = load_yaml(pnfile) or dict()
except FileNotFoundError:
# No matter we go without it
project_names = dict()
# Load both the RunInfo.xml and (a little later) the SampleSheet.csv
ri_xml = RunInfoXMLParser(run_dir)
# Build run info data structure (rids). First just inherit the info
# from ri_xml (RunId, Instrument, Flowcell, ...)
rids = ri_xml.run_info.copy()
# We need this to reliably get the NovoSeq flowcell type
# Also we now care about the experiment name which is here and lets us link to BaseSpace
try:
run_params = RunParametersXMLParser( run_dir ).run_parameters
if 'Flowcell Type' in run_params:
rids['FCType'] = run_params['Flowcell Type']
rids['ExperimentName'] = run_params.get('Experiment Name')
# This 'Start Time' comes from file timestamps. RunDate on the NovaSeq also
# gives a timestamp, but not on the MiSeq, even post-upgrade. And I don't
# trust the MiSeq clock in any case.
rids['RunStartTimeStamp'] = run_params.get('Start Time')
rids['RunStartTime'] = fmt_time(rids['RunStartTimeStamp'])
rids['Chemistry'] = get_chemistry(run_params, rids['Instrument'])
except Exception:
# Not to worry we can do without this.
pass
# Reads are pairs (length, index?)
rids['CyclesAsList'] = [ (ri_xml.read_and_length[i], ri_xml.read_and_indexed[i] == 'Y')
for i in
sorted(ri_xml.read_and_length.keys(), key=int) ]
#Which file is actually providing the SampleSheet?
try:
rids['SampleSheet'] = os.path.basename(os.readlink(f"{run_dir}/SampleSheet.csv"))
except OSError:
# Weird - maybe not a link?
rids['SampleSheet'] = "SampleSheet.csv"
try:
ss_csv = SampleSheetReader(run_dir + "/SampleSheet.csv")
except Exception:
# We can live without this if the sample sheet is invalid
ss_csv = None
#When is this report being made?
rids['ReportDateTime'] = fmt_time()
#Slice the sample sheet by lane
rids['Lanes'] = []
rids['ProjectInfo'] = {}
if ss_csv:
# Snag the 'real' experiment name
rids['ExperimentSS'] = ss_csv.headers.get('Experiment Name')
#Translate all the project numbers to names in one go
#If you try to feed this script an old 2500 Sample Sheet this is where it will fail.
assert 'sampleproject' not in ss_csv.column_mapping, \
"A sampleproject (without the underscore) column was found. Is this an old 2500 SampleSheet?"
# Filter project_names to just the ones in the ss_csv. They should match already
# but don't depend on it.
rids['ProjectInfo'] = { n: project_names.get(n) for n in
set([ line[ss_csv.column_mapping['sample_project']]
for line in ss_csv.samplesheet_data ]) }
# NOTE - if a samplesheet has no 'lane' column then we shouldn't really be processing it,
# but as far as bcl2fastq is concerned this just means all lanes are identical, so for
# the purposes of this script I'll go with that.
if 'lane' in ss_csv.column_mapping:
ss_lanes = [ line[ss_csv.column_mapping['lane']] for line in ss_csv.samplesheet_data ]
else:
ss_lanes = [ str(x + 1) for x in range(int(rids['LaneCount'])) ]
for lanenum in sorted(set(ss_lanes)):
thislane = {'LaneNumber': lanenum}
# See if there is a Basemask. This breaks the original idea of this script as showing
# pre-demultiplexing data but we really want to see the BaseMask in the e-mail text
# when the final mail is sent.
thislane['BaseMask'] = get_lane_basemask(run_dir, lanenum)
#Add lane loading. In reality we probably need to get all lanes in one fetch,
#but here's a placeholder.
thislane['Loading'] = get_lane_loading(rids['Flowcell'])
lines_for_lane = [ line for line in ss_csv.samplesheet_data
if 'lane' not in ss_csv.column_mapping or
line[ss_csv.column_mapping['lane']] == lanenum ]
thislane['Contents'] = summarize_lane( lines_for_lane, ss_csv.column_mapping )
#If the lane contains a single sample, is that one barcode or is it unindexed?
#We'd like to report which.
if len(lines_for_lane) == 1:
index_lengths = ss_csv.get_index_lengths_by_lane()[lanenum]
#It's unindexed if there are no indices or if they contain only N's.
thislane['Unindexed'] = not any( index_lengths )
else:
thislane['Unindexed'] = False
rids['Lanes'].append(thislane)
return rids
def get_lane_basemask(run_dir, lanenum):
"""Code is copied from summarize_post_bcl2fastq.py but I want to keep these
functionalities separate.
"""
# I could also look in pipeline/output/QC/bc_check for the read1 basemask
# to get this a little earlier but I don't think it's worth it.
try:
with open(os.path.join( run_dir, "pipeline/output/demultiplexing",
f"lane{lanenum}/bcl2fastq.opts" )) as vfh:
for aline in vfh:
mo = re.match("--use-bases-mask +(.*)", aline.rstrip())
if mo:
# The mask might begin with "{lane}:" in which case lop that off
bm = mo.group(1).strip("'\"")
if bm.startswith(f"{lanenum}:"):
bm = bm[2:]
return bm
except FileNotFoundError:
# Fine.
return None
# If no matching line was found
return None
def summarize_lane(lane_lines, column_mapping):
"""Given a list of lines, summarize what they contain, returning
a dict of { project: { pool: [ list of libs ] } }
The caller is presumed to have filtered the lines by lane already.
"""
#Make a dict of dicts keyed on all the projects seen
res = dict()
for line in lane_lines:
sample_project = line[column_mapping['sample_project']]
sample_id = line[column_mapping['sample_id']]
#Pool and library should be combined in the sample_id
if '__' in sample_id:
sample_pool, sample_lib = sample_id.split('__')
else:
sample_pool, sample_lib = '', sample_id
# I used to set 'NoPool' to '' at this point but it turned out to be a bad idea.
#Avoid use of defaultdict as it gums up YAML serialization. This is equivalent.
res.setdefault(sample_project, dict()).setdefault(sample_pool, []).append(sample_lib)
return res
def output_txt(rids, fh):
def p(*a): print(*a, file=fh)
# Show the pipeline version
p( "Illuminatus {} [{}@{}:{}]".format(
os.environ.get("ILLUMINATUS_VERSION", "[unknown version]"),
os.environ.get("USER", "[unknown user]"),
os.environ.get("HOSTNAME", "[unknown host]"),
os.path.abspath(os.path.dirname(__file__)) ) )
p( "" )
# Basic metadata, followed be a per-lane summary.
expname_from_xml = rids.get('ExperimentName') or 'unknown'
expname_from_ss = rids.get('ExperimentSS')
p( f"Run ID: {rids['RunId']}" )
if expname_from_ss and expname_from_ss != expname_from_xml:
# We have conflicting names for this experiment
p( f"Experiment: {expname_from_xml} ({expname_from_ss})" )
else:
# We have one experiment name
p( f"Experiment: {expname_from_xml}" )
p( f"Instrument: {rids['Instrument']}" )
p( f"Flowcell Type: {rids.get('FCType', 'unknown')}" ) # May be missing if the YAML file is old.
p( f"Read length: {rids['Cycles']}" )
p( f"Active SampleSheet: SampleSheet.csv -> {rids['SampleSheet']}" )
p( "" )
# Summarize each lane
prn = rids['ProjectInfo']
for lane in rids['Lanes']:
# If we have info on the BaseMask, add it
if lane.get('BaseMask'):
p( f"Lane {lane['LaneNumber']} with --use-bases-mask={lane['BaseMask']}:" )
else:
p( f"Lane {lane['LaneNumber']}:" )
for project, pools in sorted(lane['Contents'].items()):
# pools will be a dict of poolname : [ library, ... ]
# Special case for PhiX
if project == 'ControlLane' and any(pools == {np: ['PhiX']} for np in NON_POOLS):
p( " - PhiX")
else:
if prn.get(project):
prn_name = prn[project].get('name', project)
prn_url = prn[project].get('url', '[no link]')
else:
prn_name = project
prn_url = '[no link]'
contents_str = ' '.join(squish_project_content(pools))
contents_label = 'Libraries' if set(pools).issubset(NON_POOLS) else \
'Contents' if not set(pools).isdisjoint(NON_POOLS) else \
'Pool' if len(pools) == 1 else 'Pools'
num_indexes = 0 if lane.get('Unindexed') else sum( len(p) for p in pools.values() )
p( " - Project {p} -- {cl} {l} -- Indexes {ni}".format(
p = prn_name,
l = contents_str,
cl = contents_label,
ni = num_indexes ) )
p( " - See {link}".format(link = prn_url) )
def output_tsv(rids, fh):
"""TSV table for the run report.
"""
def p(*a): print('\t'.join(a), file=fh)
#Headers
p("Lane", "Project", "Pool/Library", "Loaded (pmol)", "Loaded PhiX (%)")
for lane in rids['Lanes']:
#This time, squish content for all projects together when listing the pools.
#If there are more than 5 things in the lane, abbreviate the list. Users can always look
#at the detailed table.
pools_union = dict_union(lane['Contents'].values())
contents_str = ','.join( squish_project_content( pools_union , 5) )
p( lane['LaneNumber'],
','.join( sorted(lane['Contents']) ),
contents_str,
lane['Loading'].get('pmol', 'unknown'),
lane['Loading'].get('phix', 'unknown') )
def dict_union(list_of_dicts):
"""Given a list of dicts, combine them together.
If two dicts have a common key, sum the values.
"""
# I tried the funky looking one-liner comprehension:
# {k: v for d in list_of_dicts for k, v in d.items()}
# But that just takes the last value if there is a clash
res = dict()
for d in list_of_dicts:
for k, v in d.items():
if k in res:
res[k] += v
else:
res[k] = v
return res
def squish_project_content(dict_of_pools, maxlen=0):
"""Given a dict taken from rids['Lanes'][n]['Contents'] -- ie. a dict of pool: content_list
returns a human-readable list of contents.
"""
all_pools = sorted([ p for p in dict_of_pools if p not in NON_POOLS ])
non_pooled_libs = sorted([ p for np in NON_POOLS for p in dict_of_pools.get(np,[]) ])
#Prune those lists
if maxlen and len(all_pools) > maxlen:
all_pools[maxlen-1:] = [ 'plus {} more pools'.format(len(all_pools) + 1 - maxlen) ]
if maxlen and len(non_pooled_libs) > maxlen:
non_pooled_libs[maxlen-1:] = [ 'plus {} more libraries'.format(len(non_pooled_libs) + 1 - maxlen) ]
#Now return the lot.
return all_pools + non_pooled_libs
def get_lane_loading(flowcell):
"""A placeholder. At some point this will query the LIMS for lane loading info -
ie. pmol Loaded and PhiX %
And we'll probably need to mock it out in the test cases.
"""
return dict()
def get_chemistry(run_params, instrument):
"""Get the 'Consumable Version' from params and interpret it.
At present this tells us if the NovaSeq chemistry is 1.0 or 1.5
"""
con_vers = run_params.get('Consumable Version')
if not con_vers:
return None
con_note = "unknown"
if instrument.startswith('novaseq_'):
if con_vers == '1':
con_note = "chemistry 1.0"
elif con_vers == '3':
con_note = "chemistry 1.5; revcomp index2"
return f"SCV{con_vers} ({con_note})"
if __name__ == "__main__":
main(parse_args())