-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathsample_genration.py
144 lines (110 loc) · 5.52 KB
/
sample_genration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import sys
import os
import numpy as np
import librosa
import multiprocessing as mp
from util import LRU
from cnnmodel.feature_extraction import mfcc_extraction
from cnnmodel.feature_extraction import non_mfcc_extraction
OPTIMAL_DURATION = 0.115 # we use a frame width of .025 s with stride of .010 s. duration = 0.115 will have 10 frames
class Phoneme:
def __init__(self, path, id_, word, phoneme):
self.path = path
self.id_ = id_
self.word = word
self.phoneme = phoneme
class SampleExtraction:
def __init__(self, wav_root, alignment_file, out_dir):
self.wav_root = wav_root
self.alignment_file = alignment_file
self.out_dir = out_dir
self.pool = mp.Pool(mp.cpu_count())
self.make_directories()
def make_directories(self):
os.makedirs(self.out_dir + '/0', exist_ok=True)
os.makedirs(self.out_dir + '/1', exist_ok=True)
os.makedirs(self.out_dir + '/2', exist_ok=True)
print('Created directories for each label in path: {}'.format(self.out_dir))
def get_phoneme_features(self, index, n, vowel_phonemes, features_cache):
# if out of bound then
if index < 0 or index >= n:
return np.zeros(shape=(1, 13, 30), dtype=np.float32), np.zeros(6, dtype=np.float32)
phoneme = vowel_phonemes[index]
if phoneme not in features_cache:
signal, samplerate = librosa.load(self.wav_root + '/' + phoneme.path, sr=None)
optimal_signal_len = int(samplerate * OPTIMAL_DURATION)
signal_len = len(signal)
excess = signal_len - optimal_signal_len
left_pad = abs(excess // 2)
right_pad = abs(excess) - left_pad
if signal_len > optimal_signal_len:
signal_mfcc = signal[left_pad:-right_pad]
elif signal_len < optimal_signal_len:
signal_mfcc = np.concatenate([np.zeros(left_pad), signal, np.zeros(right_pad)], axis=0)
else:
signal_mfcc = signal
# extract MFCC features, should be a matrix of shape (1, 13, 30)
mfcc_features = mfcc_extraction.get_mfcc(signal_mfcc, samplerate)
# returned np array is of shape (13, 30), add a new channel axis
mfcc_features = mfcc_features[np.newaxis, :, :]
# extract non MFCC features, should be a vector of shape (6,)
non_mfcc_features = non_mfcc_extraction.get_non_mfcc(signal, samplerate)
features_cache[phoneme] = (mfcc_features, non_mfcc_features)
return features_cache[phoneme]
def generate_samples(self, vowel_phonemes):
n = len(vowel_phonemes)
features_cache = LRU(size=5)
for i in range(n):
phoneme = vowel_phonemes[i]
label = phoneme.phoneme[-1]
pre_mfcc, pre_non_mfcc = self.get_phoneme_features(i - 1, n, vowel_phonemes, features_cache)
anchor_mfcc, anchor_non_mfcc = self.get_phoneme_features(i, n, vowel_phonemes, features_cache)
suc_mfcc, suc_non_mfcc = self.get_phoneme_features(i + 1, n, vowel_phonemes, features_cache)
mfcc_tensor = np.concatenate([pre_mfcc, anchor_mfcc, suc_mfcc], axis=0)
non_mfcc_vector = np.concatenate([pre_non_mfcc, anchor_non_mfcc, suc_non_mfcc], axis=0)
file_name = phoneme.id_ + '_' + phoneme.word + '_' + phoneme.phoneme
np.save(self.out_dir + '/' + label + '/' + file_name + '_mfcc.npy', mfcc_tensor)
np.save(self.out_dir + '/' + label + '/' + file_name + '_other.npy', non_mfcc_vector)
print('finished writing {} samples for id: {}, word: {}'.
format(n, vowel_phonemes[0].id_, vowel_phonemes[0].word))
def extract_features(self):
phoneme_alignment_file = open(self.alignment_file, 'r')
current_word = None
curr_vowels = []
for line in phoneme_alignment_file:
path, id_, word, phoneme = line[:-1].split('\t')
phoneme = Phoneme(path, id_, word, phoneme)
if not current_word:
current_word = (id_, word)
if phoneme.phoneme[-1].isnumeric():
curr_vowels.append(phoneme)
elif current_word == (id_, word):
if phoneme.phoneme[-1].isnumeric():
curr_vowels.append(phoneme)
elif current_word != (id_, word):
# new word encountered. create training samples from the old list
self.pool.apply_async(self.generate_samples, args=[curr_vowels])
# overwrite the curr_word and curr_vowels
current_word = (id_, word)
curr_vowels = []
if phoneme.phoneme[-1].isnumeric():
curr_vowels.append(phoneme)
self.pool.apply(self.generate_samples, args=[curr_vowels])
phoneme_alignment_file.close()
self.pool.close()
self.pool.join()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
def main(wav_root, alignment_file, out_dir):
sample_extraction = SampleExtraction(wav_root, alignment_file, out_dir)
sample_extraction.extract_features()
if __name__ == '__main__':
# script needs three command line arguments
# 1. root path of the folder with wav files split into phonemes
# 2. tab separated file with phoneme info
# 3. output path where npy files will be generated
main(sys.argv[1], sys.argv[2], sys.argv[3])