Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data = self._data
not_array = isinstance(data, str)
if motif_length == 1:
counts = CategoryCounter(data)
else:
if len(data) % motif_length != 0:
warnings.warn(
"%s length not divisible by %s, truncating"
% (self.name, motif_length)
)
limit = (len(data) // motif_length) * motif_length
data = data[:limit]
if not_array:
counts = CategoryCounter(
data[i : i + motif_length] for i in range(0, limit, motif_length)
)
else:
counts = CategoryCounter(
tuple(v) for v in data.reshape(limit // motif_length, motif_length)
)
if not not_array:
for key in list(counts):
indices = [key] if motif_length == 1 else key
motif = self.alphabet.to_chars(indices).astype(str)
motif = "".join(motif)
counts[motif] = counts.pop(key)
exclude = []
if not include_ambiguity or not allow_gap:
is_degen = self.moltype.is_degenerate
def test_add(self):
"""allow adding elements, or series"""
nums = number.CategoryCounter("AAAACCCGGGGT")
nums += "A"
self.assertEqual(nums["A"], 5)
def make_weights(counts, n):
"""Return the weights for replacement states for each possible character.
We compute the weight as the normalized frequency of the replacement state
divided by 2*n."""
char_prob = list(counts.to_freqs().items())
weights = []
for C, P in char_prob:
alts = CategoryFreqs({c: p for c, p in char_prob if c != C})
alts = alts.to_normalized()
alts = CategoryCounter({c: w / (2 * n) for c, w in list(alts.items())})
weights += [(C, alts)]
return weights
# exclude_hanlder == ignore_excludes, so I explicitly
# check, and bypass this block if possible.
if exclude_handler != ignore_excludes:
for col in (col1, col2):
states = set(col)
for exclude in excludes:
if exclude in states:
try:
col = exclude_handler(col, excludes)
break
except TypeError:
return null_value
# Calculate entropy of pos1 & pos2, if they weren't passed in.
if not h1:
h1 = CategoryCounter(col1).entropy
if not h2:
h2 = CategoryCounter(col2).entropy
# Calculate the joint entropy of pos1 & pos2
joint_h = joint_entropy(col1, col2)
# Calculate MI using the specified method -- return null_value when
# the specified MI cannot be calculated
# (e.g., mi_calculator=nmi and joint_h=0.0)
try:
result = mi_calculator(h1, h2, joint_h)
if result <= ROUND_ERROR:
result = 0.0
except ZeroDivisionError:
result = null_value
return result
def calc_pair_scale(seqs, obs1, obs2, weights1, weights2):
"""Return entropies and weights for comparable alignment.
A comparable alignment is one in which, for each paired state ij, all
alternate observable paired symbols are created. For instance, let the
symbols {A,C} be observed at position i and {A,C} at position j. If we
observe the paired types {AC, AA}. A comparable alignment would involve
replacing an AC pair with a CC pair."""
# scale is calculated as the product of mi from col1 with alternate
# characters. This means the number of states is changed by swapping
# between the original and selected alternate, calculating the new mi
pair_freqs = CategoryCounter(seqs)
weights1 = dict(weights1)
weights2 = dict(weights2)
scales = []
for a, b in list(pair_freqs.keys()):
weights = weights1[a]
pr = a + b
pair_freqs -= pr
obs1 -= a
# make comparable alignments by mods to col 1
for c, w in list(weights.items()):
new_pr = c + b
pair_freqs += new_pr
obs1 += c
if exclude_handler != ignore_excludes:
for col in (col1, col2):
states = set(col)
for exclude in excludes:
if exclude in states:
try:
col = exclude_handler(col, excludes)
break
except TypeError:
return null_value
# Calculate entropy of pos1 & pos2, if they weren't passed in.
if not h1:
h1 = CategoryCounter(col1).entropy
if not h2:
h2 = CategoryCounter(col2).entropy
# Calculate the joint entropy of pos1 & pos2
joint_h = joint_entropy(col1, col2)
# Calculate MI using the specified method -- return null_value when
# the specified MI cannot be calculated
# (e.g., mi_calculator=nmi and joint_h=0.0)
try:
result = mi_calculator(h1, h2, joint_h)
if result <= ROUND_ERROR:
result = 0.0
except ZeroDivisionError:
result = null_value
return result
def joint_entropy(pos1, pos2):
""" Calculate the joint entroy of a pair of positions """
return CategoryCounter(join_positions(pos1, pos2)).entropy