Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, k, cpus=1):
"""Initialization.
Parameters
----------
cpus : int
Number of cpus to use.
"""
self.logger = logging.getLogger('timestamp')
self.k = k
self.cpus = cpus
self.logger.info('Calculating unique kmers of size k = %d.' % self.k)
self.signatures = GenomicSignature(self.k)
K-mer size to use for calculating genomic signature
no_coverage : boolean
Flag indicating if coverage information should be used during clustering.
no_pca : boolean
Flag indicating if PCA of genomic signature should be calculated.
iterations: int
iterations to perform during clustering
genome_file : str
Sequences being clustered.
output_dir : str
Directory to write results.
"""
# get GC and mean coverage for each scaffold in genome
self.logger.info('Determining mean coverage and genomic signatures.')
signatures = GenomicSignature(K)
genome_stats = []
signature_matrix = []
seqs = seq_io.read(genome_file)
for seq_id, seq in seqs.items():
stats = scaffold_stats.stats[seq_id]
if not no_coverage:
genome_stats.append((np_mean(stats.coverage)))
else:
genome_stats.append(())
if K == 0:
pass
elif K == 4:
signature_matrix.append(stats.signature)
else:
criteria1 : str
First criteria used for splitting genome.
criteria2 : str
Second criteria used for splitting genome.
genome_file : str
Sequences being clustered.
output_dir : str
Directory to write results.
"""
seqs = seq_io.read(genome_file)
# calculate PCA if necessary
if 'pc' in criteria1 or 'pc' in criteria2:
self.logger.info('Performing PCA.')
signatures = GenomicSignature(K)
signature_matrix = []
seqs = seq_io.read(genome_file)
for seq_id, seq in seqs.items():
stats = scaffold_stats.stats[seq_id]
signature_matrix.append(stats.signature)
pc, _variance = self.pca(signature_matrix)
for i, seq_id in enumerate(seqs):
scaffold_stats.stats[seq_id].pc1 = pc[i][0]
scaffold_stats.stats[seq_id].pc2 = pc[i][1]
scaffold_stats.stats[seq_id].pc3 = pc[i][2]
# split bin
genome_id = remove_extension(genome_file)
fout1 = open(os.path.join(output_dir, genome_id + '_c1.fna'), 'w')
genome_scaffold_stats: d[scaffold_id] -> namedtuple of scaffold stats
Statistics for scaffolds in genome.
highlight_scaffold_ids : d[scaffold_id] -> color
Scaffolds in genome to highlight.
link_scaffold_ids : list of scaffold pairs
Pairs of scaffolds to link together.
mean_signature : float
Mean tetranucleotide signature of genome.
td_dist : d[length][percentile] -> critical value
TD distribution.
percentiles_to_plot : iterable
Percentile values to mark on plot.
"""
# histogram plot
genomic_signature = GenomicSignature(0)
delta_tds = []
for stats in genome_scaffold_stats.values():
delta_tds.append(genomic_signature.manhattan(stats.signature, mean_signature))
if axes_hist:
axes_hist.hist(delta_tds, bins=20, color=(0.5, 0.5, 0.5))
axes_hist.set_xlabel('tetranucleotide distance')
axes_hist.set_ylabel('# scaffolds (out of %d)' % len(delta_tds))
self.prettify(axes_hist)
# scatterplot
xlabel = 'tetranucleotide distance'
ylabel = 'Scaffold length (kbp)'
pts = self.data_pts(genome_scaffold_stats, mean_signature)
scaffold_length = defaultdict(list)
gc = defaultdict(list)
coverage = defaultdict(list)
signature = defaultdict(list)
for _scaffold_id, stats in scaffold_stats.stats.items():
if stats.genome_id == scaffold_stats.unbinned:
continue
genome_size[stats.genome_id] += stats.length
scaffold_length[stats.genome_id].append(stats.length)
gc[stats.genome_id].append(stats.gc)
coverage[stats.genome_id].append(stats.coverage)
signature[stats.genome_id].append(stats.signature)
# record statistics for each genome
genomic_signature = GenomicSignature(0)
self.genome_stats = {}
for genome_id in genome_size:
# calculate weighted mean and median statistics
weights = np_array(scaffold_length[genome_id])
len_array = np_array(scaffold_length[genome_id])
mean_len = ws.numpy_weighted_mean(len_array, weights)
median_len = ws.numpy_weighted_median(len_array, weights)
gc_array = np_array(gc[genome_id])
mean_gc = ws.numpy_weighted_mean(gc_array, weights)
median_gc = ws.numpy_weighted_median(gc_array, weights)
cov_array = np_array(coverage[genome_id]).T
mean_cov = ws.numpy_weighted_mean(cov_array, weights)
"""
# read reference distributions from file
self.logger.info('Reading reference distributions.')
self.gc_dist = self._read_distribution('gc_dist')
self.td_dist = self._read_distribution('td_dist')
# identify compatible scaffolds in each genome
fout = open(output_file, 'w')
fout.write('Scaffold id\tGenome id\tScaffold length (bp)\tCompatible distributions')
fout.write('\tScaffold GC\tMedian genome GC\tLower GC bound (%s%%)\tUpper GC bound (%s%%)' % (gc_per, gc_per))
fout.write('\tScaffold TD\tMedian genome TD\tUpper TD bound (%s%%)' % td_per)
fout.write('\tScaffold coverage\tMedian genome coverage\tCoverage correlation\tCoverage error')
fout.write('\t# genes\t% genes with homology\n')
genomic_signature = GenomicSignature(0)
self.logger.info('Identifying scaffolds compatible with bins.')
processed_scaffolds = 0
for scaffold_id, ss in scaffold_stats.stats.items():
processed_scaffolds += 1
if not self.logger.is_silent:
sys.stdout.write(' Processed {:,} of {:,} ({:.1f}%) scaffolds.\r'.format(
processed_scaffolds,
len(scaffold_stats.stats),
processed_scaffolds * 100.0 / len(scaffold_stats.stats)))
sys.stdout.flush()
if scaffold_id not in scaffolds_of_interest:
continue
for genome_id, gs in genome_stats.items():
def data_pts(self, genome_scaffold_stats, mean_signature):
"""Get data points to plot.
Parameters
----------
genome_scaffold_stats : d[scaffold_id] -> namedtuple of scaffold stats
Statistics for scaffolds in genome.
Returns
-------
dict : d[scaffold_id] -> (x, y)
"""
genomic_signature = GenomicSignature(0)
pts = {}
for scaffold_id, stats in genome_scaffold_stats.items():
pts[scaffold_id] = (genomic_signature.manhattan(stats.signature, mean_signature),
stats.length / 1000.0)
return pts
def outlier_info(self,
genome_id,
scaffold_ids,
scaffold_stats,
genome_stats,
gc_per,
td_per,
cov_corr,
cov_perc):
genomic_signature = GenomicSignature(0)
# make sure distributions have been loaded
self.read_distributions()
# find keys into GC and TD distributions
# gc -> [mean GC][scaffold length][percentile]
# td -> [scaffold length][percentile]
gs = genome_stats[genome_id]
closest_gc = find_nearest(list(self.gc_dist.keys()), gs.median_gc / 100.0)
sample_seq_len = list(self.gc_dist[closest_gc].keys())[0]
d = self.gc_dist[closest_gc][sample_seq_len]
gc_lower_bound_key = find_nearest(list(d.keys()), (100 - gc_per) / 2.0)
gc_upper_bound_key = find_nearest(list(d.keys()), (100 + gc_per) / 2.0)
td_bound_key = find_nearest(list(self.td_dist[list(self.td_dist.keys())[0]].keys()), td_per)