Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
X=a[0]
X=X.astype(int)
# Create HMM
D=bond_dimension
N=X.shape[1]
d=np.max(X+1)
list_of_states=[]
for i in xrange(N):
list_of_states.append([])
for u in xrange(bond_dimension):
dictionnary=dict()
for l in xrange(d):
dictionnary[str(l)] = np.random.rand()
list_of_states[i].append(pomegranate.State(pomegranate.DiscreteDistribution(dictionnary)))
model = pomegranate.HiddenMarkovModel()
for i in xrange(N-1):
for d in xrange(D):
for d2 in xrange(D):
model.add_transition(list_of_states[i][d],list_of_states[i+1][d2],np.random.rand())
for d in xrange(D):
model.add_transition(model.start,list_of_states[0][d],np.random.rand())
for d in xrange(D):
model.add_transition(list_of_states[N-1][d],model.end,np.random.rand())
model.bake()
# Train HMM
begin = time.time()
sequencetrain=[[str(i) for i in v] for v in X]
np.random.seed()
model.fit(sequencetrain,algorithm='baum-welch',stop_threshold=1e-50,min_iterations=1000,\
max_iterations=n_iter)
def get_suffix_matcher_hmm(pattern):
model = Model(name="Suffix Matcher HMM Model")
insert_distribution = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
insert_states = []
match_states = []
delete_states = []
hmm_name = 'suffix'
for i in range(len(pattern) + 1):
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, hmm_name)))
for i in range(len(pattern)):
distribution_map = dict({'A': 0.01, 'C': 0.01, 'G': 0.01, 'T': 0.01})
distribution_map[pattern[i]] = 0.97
match_states.append(State(DiscreteDistribution(distribution_map), name='M%s_%s' % (str(i + 1), hmm_name)))
for i in range(len(pattern)):
delete_states.append(State(None, name='D%s_%s' % (str(i + 1), hmm_name)))
def get_constant_number_of_repeats_matcher_hmm(patterns, copies, vpaths):
model = Model(name="Repeating Pattern Matcher HMM Model")
if vpaths:
alignment = get_multiple_alignment_of_repeats_from_reads(vpaths)
transitions, emissions = build_profile_hmm_pseudocounts_for_alignment(settings.MAX_ERROR_RATE, alignment)
else:
transitions, emissions = build_profile_hmm_for_repeats(patterns, settings.MAX_ERROR_RATE)
matches = [m for m in emissions.keys() if m.startswith('M')]
last_end = None
for repeat in range(copies):
insert_states = []
match_states = []
delete_states = []
for i in range(len(matches) + 1):
insert_distribution = DiscreteDistribution(emissions['I%s' % i])
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, repeat)))
def get_prefix_matcher_hmm(pattern):
model = Model(name="Prefix Matcher HMM Model")
insert_distribution = DiscreteDistribution({'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25})
insert_states = []
match_states = []
delete_states = []
hmm_name = 'prefix'
for i in range(len(pattern) + 1):
insert_states.append(State(insert_distribution, name='I%s_%s' % (i, hmm_name)))
for i in range(len(pattern)):
distribution_map = dict({'A': 0.01, 'C': 0.01, 'G': 0.01, 'T': 0.01})
distribution_map[pattern[i]] = 0.97
match_states.append(State(DiscreteDistribution(distribution_map), name='M%s_%s' % (str(i + 1), hmm_name)))
for i in range(len(pattern)):
delete_states.append(State(None, name='D%s_%s' % (str(i + 1), hmm_name)))
def load_segmentation_model(modeldata):
model = HiddenMarkovModel('model')
states = {}
for s in modeldata:
if len(s['emission']) == 1:
emission = NormalDistribution(*s['emission'][0][:2])
else:
weights = np.array([w for _, _, w in s['emission']])
dists = [NormalDistribution(mu, sigma)
for mu, sigma, _ in s['emission']]
emission = GeneralMixtureModel(dists, weights=weights)
state = State(emission, name=s['name'])
states[s['name']] = state
model.add_state(state)
if 'start_prob' in s:
model.add_transition(model.start, state, s['start_prob'])
def update_hmm(self):
num_states = self.num_states
start_prob = self.start_prob
num_emissions = self.num_emissions
hmm = HiddenMarkovModel('hmm')
dist = [DiscreteDistribution(dict(zip(range(num_emissions), self.emissions[i]))) for i in range(num_states)]
states = [State(dist[i], 's' + str(i).zfill(2)) for i in range(num_states)]
hmm.add_states(states)
for i in range(num_states):
s_i = states[i]
hmm.add_transition(hmm.start, s_i, start_prob[i])
for j in range(num_states):
s_j = states[j]
p = self.transitions[i, j]
hmm.add_transition(s_i, s_j, p)
self.hmm = hmm
self.hmm.bake()
def oriHMMParams(self):
"""
Set initial parameters for the Hidden Markov Model (HMM).
Attributes
----------
HMMParams : dict
Has 3 keys: "A", state transition matrix, "B" (emission probabilities),
specifying parameters (Means, Variances, Weights) of the mixture
Gaussian distributions for each hidden state, and "pi", indicating
the hidden state weights. This dict will be updated after learning
procedure.
"""
hmm = HiddenMarkovModel()
# GMM emissions
# 4 Hidden States:
# 0--start, 1--downstream, 2--upstream, 3--end
numdists = 3 # Three-distribution Gaussian Mixtures
var = 7.5 / (numdists - 1)
means = [[], [], [], []]
for i in range(numdists):
means[3].append(i * 7.5 / ( numdists - 1 ) + 2.5)
means[2].append(i * 7.5 / ( numdists - 1 ))
means[1].append(-i * 7.5 / ( numdists - 1 ))
means[0].append(-i * 7.5 / ( numdists - 1 ) - 2.5)
states = []
for i, m in enumerate(means):
tmp = []
for j in m:
tmp.append(NormalDistribution(j, var))
def get_vntr_matcher_hmm(self, read_length):
"""Try to load trained HMM for this VNTR
If there was no trained HMM, it will build one and store it for later usage
"""
logging.info('Using read length %s' % read_length)
copies = self.get_copies_for_hmm(read_length)
base_name = str(self.reference_vntr.id) + '_' + str(read_length) + '.json'
stored_hmm_file = settings.TRAINED_HMMS_DIR + base_name
if settings.USE_TRAINED_HMMS and os.path.isfile(stored_hmm_file):
model = Model()
model = model.from_json(stored_hmm_file)
return model
flanking_region_size = read_length
vntr_matcher = self.build_vntr_matcher_hmm(copies, flanking_region_size)
if settings.USE_TRAINED_HMMS:
json_str = vntr_matcher.to_json()
with open(stored_hmm_file, 'w') as outfile:
outfile.write(json_str)
return vntr_matcher
def make_hmm_model(emission_mat, transition_probs):
model = pomegranate.HiddenMarkovModel('ndf')
ictal_emissions = {i:emission_mat[1,i] for i in range(emission_mat.shape[1])}
baseline_emissions = {i:emission_mat[0,i] for i in range(emission_mat.shape[1])}
ictal = pomegranate.State(pomegranate.DiscreteDistribution(ictal_emissions ), name = '1')
baseline = pomegranate.State(pomegranate.DiscreteDistribution(baseline_emissions), name = '0')
model.add_state(ictal)
model.add_state(baseline)
model.add_transition( model.start, ictal, 0.05 )
model.add_transition( model.start, baseline, 99.95)
model.add_transition( baseline, baseline, transition_probs[0,0] )
model.add_transition( baseline, ictal, transition_probs[0,1] )
model.add_transition( ictal, ictal , transition_probs[1,1] )