-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutilities.py
177 lines (132 loc) · 4.7 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#! /usr/bin/env python3
'''
Functions and classes for use in motif extraction.
'''
import enum
import sys
class Format(enum.Enum):
FASTA = enum.auto()
PREALIGNED = enum.auto()
UNALIGNED = enum.auto()
def _detect_format(ifile):
'''
Detect the format of the input file.
Args:
ifile (str): The path to the input file.
Returns:
Format
'''
with open(ifile, 'r') as fh:
first = fh.readline().rstrip()
if first.startswith('>'):
return Format.FASTA
fh.seek(0)
# Non-FASTA files
length = len(first)
res = first[int(length / 2)]
for line in fh:
line = line.rstrip()
if len(line) != length or line[int(len(line) / 2)] != res:
return Format.UNALIGNED
return Format.PREALIGNED
def parse_fasta(fh):
'''
Parse the input fasta file handle to extract the sequences.
Args:
fh (file)
'''
seq = ''
for line in fh:
if line.startswith('>') and seq:
yield seq
seq = ''
if not line.startswith('>'):
seq += line.rstrip('\n')
def extract_seqs(ifile, central_res, length, is_bg=False, write=True):
'''
Process the given input file to return the aligned sequences.
Args:
ifile (str): The path to the input file.
central_res (str): The amino acid residue on which to align the
sequences.
length (int): The total length of each amino acid sequence.
is_bg (bool, optional): A boolean flag indicating whether the input
corresponds to background data.
write (bool, optional): If True, write the aligned sequences to a file.
Returns:
list: The aligned sequences.
'''
form = _detect_format(ifile)
needs_alignment = form == Format.FASTA or form == Format.UNALIGNED
if length is None and needs_alignment:
sys.exit('The length parameter is required for FASTA or unaligned '
'input files')
if needs_alignment:
if length % 2 == 0:
sys.exit('The sequence length must be an odd number')
with open(ifile, 'r') as fh:
if form == Format.FASTA:
if is_bg:
seqs = generate_bg(parse_fasta(fh), central_res, length)
else:
seqs = [align_sequence(s, central_res, length)
for s in parse_fasta(fh)]
elif form == Format.UNALIGNED:
seqs = [align_sequence(s.rstrip(), central_res, length)
for s in fh]
else:
seqs = [s.rstrip() for s in fh]
seqs = [s for s in list(set(seqs)) if s]
if write and needs_alignment:
with open(ifile + '.aligned', 'w') as fh:
fh.write('\n'.join(seqs))
return seqs
def generate_bg(seqs, central_res, length):
'''
Generate the background sequences given full input sequences.
'''
hlen = int(0.5 * length)
bg = []
for seq in seqs:
indices = [i for i, res in enumerate(seq) if res == central_res]
inner_seqs = [seq[max(idx - hlen, 0):min(idx + hlen + 1, len(seq))]
for idx in indices]
bg.extend([align_sequence(s, central_res, length) for s in inner_seqs])
return [s for s in bg if s]
def align_sequence(seq, central_res, length, extra_char='X'):
'''
Align the given sequence on the central residue.
Args:
seq (str): The sequence to be aligned.
central_res (str): The central residue.
length (int): The desired length of each sequence.
extra_char (str, optional): The character to be used to make up
the sequence length.
Returns:
str: The aligned sequence
'''
if length is None:
sys.exit('Cannot align sequence without specifying the length')
hlen = int(0.5 * length)
res_pos = [ii for ii, res in enumerate(seq) if res == central_res]
if hlen in res_pos:
split_index = hlen
else:
# Find the instance of central_res closest to hlen
min_dist, split_index = None, None
for index in res_pos:
dist = abs(hlen - index)
if min_dist is None or dist < min_dist:
min_dist, split_index = dist, index
if split_index is None:
return ''
left, right = seq[:split_index], seq[split_index+1:]
if len(left) < hlen:
left = extra_char * (hlen - len(left)) + left
elif len(left) > hlen:
left = left[len(left) - hlen:]
if len(right) < hlen:
right += extra_char * (hlen - len(right))
elif len(right) > hlen:
right = right[:hlen]
return central_res.join([left, right])