Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# currently, there are no defaults XXX
kw = dict(
defaults=False,
external_scorer=False, # external scoring function
)
kw.update(keywords)
function = self._distance_method(
method, scale=scale, factor=factor,
restricted_chars=restricted_chars, mode=mode, gop=gop,
restriction=restriction, external_scorer=kw['external_scorer'])
concepts = [concept] if concept else sorted(self.rows)
for c in concepts:
log.info("Analyzing words for concept <{0}>.".format(c))
indices = self.get_list(row=c, flat=True)
matrix = []
for idxA, idxB in util.combinations2(indices):
try:
d = function(idxA, idxB)
except ZeroDivisionError:
log.warning(
"Encountered Zero-Division for the comparison of "
"{0} ({2}) and {1} ({3})".format(
''.join(self[idxA, self._segments]),
''.join(self[idxB, self._segments]),
idxA, idxB
))
d = 100
matrix += [d]
matrix = misc.squareform(matrix)
if not concept:
yield c, indices, matrix
else:
if logs == True:
logs = lambda x: -np.log2((1 - x) ** 2)
elif logs == False:
logs = lambda x: x
elif matrix_type == 'similarities':
evaluate = lambda x: True if x > threshold else False
if logs == True:
logs = lambda x: -np.log(x ** 2)
else:
logs = lambda x: x
else:
raise ValueError(matrix_type)
# check for threshold
if threshold:
for i, j in util.combinations2(range(len(imatrix))):
score = imatrix[i][j]
evaluation = logs(score) if evaluate(score) else 0
imatrix[i][j] = evaluation
imatrix[j][i] = evaluation
# check for self_loops
if add_self_loops == True:
for i in range(len(imatrix)):
imatrix[i][i] = 1
elif add_self_loops == False:
pass
else:
for i in range(len(imatrix)):
imatrix[i][i] = add_self_loops(imatrix[:, i])
# normalize the matrix
# transform the matrix
matrix = misc.transpose(getattr(msa, 'alm_matrix', msa))
# custom function for tokens2class
tk2k = lambda x: token2class(x, keywords['model'], cldf=keywords['cldf'],
diacritics=keywords['diacritics'], stress=keywords['stress'])
# check for local peaks
if keywords['local']:
if keywords['local'] == 'peaks':
# calculate a local index
peaks = []
for line in matrix:
sim = []
for charA, charB in util.combinations2(line):
if charA not in rcParams['gap_symbol'] \
and charB not in rcParams['gap_symbol']:
sim.append(keywords['model'](
tk2k(charA),
tk2k(charB)))
else:
sim.append(0.0)
peaks.append(sum(sim) / len(sim))
# get the average,min, and max of the peaks
pmean = sum(peaks) / len(peaks)
pmax = max(peaks)
# exclude those lines from matrix whose average is smaller than pmean
i = len(matrix) - 1
for peak in peaks[::-1]:
revert=True)
elif cluster_method in ['upgma', 'single', 'complete', 'ward']:
c = clustering.flat_cluster(cluster_method,
threshold, matrix,
revert=True)
else:
raise ValueError("No suitable cluster method specified.")
for i, (idx, pos, slc) in enumerate(trace):
C[idx] += [c[i] + k]
if kw['post_processing']:
_g = nx.Graph()
for i, (idx, pos, slc) in enumerate(trace):
_g.add_node((i,idx,pos))
remove_edges = []
for (i, n1), (j, n2) in util.combinations2(enumerate(_g.nodes())):
if C[n1[1]][n1[2]] == C[n2[1]][n2[2]]:
_g.add_edge(n1, n2)
if n1[1] == n2[1]:
# get scores for n1 and n2 with all the rest in
# the matrix to decide for one
sn1, sn2 = 0, 0
for i,row in enumerate(matrix):
sn1 += matrix[i][n1[0]]
sn2 += matrix[i][n2[0]]
sn1 = sn1 / len(matrix)
sn2 = sn2 / len(matrix)
if sn1 <= sn2:
remove_edges += [n2]
else:
remove_edges += [n1]
for node in remove_edges:
# check for matrix type
if matrix_type == 'distances':
evaluate = lambda x: x < threshold
elif matrix_type == 'similarities':
evaluate = lambda x: x > threshold
elif matrix_type == 'weights':
evaluate = lambda x: False
else:
raise ValueError(matrix_type)
# get the edges and the adjacency from the thresholds
edges = set()
adjacency = dict([(t, set()) for t in taxa])
weights = {}
for i, j in util.combinations2(range(len(taxa))):
taxA, taxB = taxa[i], taxa[j]
if evaluate(matrix[i][j]):
edges.add((taxA, taxB))
adjacency[taxA].add(taxB)
adjacency[taxB].add(taxA)
elif matrix_type == 'weights':
if matrix[i][j] < threshold:
edges.add((taxA, taxB))
adjacency[taxA].add(taxB)
adjacency[taxB].add(taxA)
edges.add((taxB, taxA))
weights[taxA, taxB] = -np.log2((1 - matrix[i][j]) ** 2)
weights[taxB, taxA] = -np.log2((1 - matrix[i][j]) ** 2)
weights = weights or None
if edges:
def _get_wad(matrix, threshold, use_log=False):
"""
Get weighted average degree.
"""
def log_f(x):
return -np.log(1 - x) if use_log else x
degreeDict = defaultdict(list)
for i, j in util.combinations2(range(len(matrix))):
score = matrix[i][j]
if score < threshold:
deg = log_f(score)
degreeDict[i].append(deg)
degreeDict[j].append(deg)
deg_sum = 0
for weights in degreeDict.values():
deg = sum(weights)
deg_sum += deg
if degreeDict:
return deg_sum / len(degreeDict)
sample : callable
Callable returning an iterator of pairs sampled from the list of
pairs passed as sole argument.
edit_dist_normalized : bool
Whether edit_dist should be normalized.
Returns
-------
Generator of lists of distances for sampled pairs per taxa pair.
"""
function = self._align_method(
method, distance=True,
return_distance=True, pprint=False, mode=mode, scale=scale,
factor=factor, gop=gop, normalized=edit_dist_normalized)
for taxA, taxB in util.combinations2(self.cols):
distances = []
for pA, pB in sample(self.pairs[taxA, taxB]):
try:
d = function(pA, pB)
except ZeroDivisionError:
self.log.error("Zero-Warning")
d = 1.0
distances.append(d)
yield distances