Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
end = hl.Locus(reference_genome.contigs[-1],
reference_genome.lengths[reference_genome.contigs[-1]])
ht = ht.select()
ht = ht.annotate(x=hl.scan.count())
ht = ht.annotate(y=ht.x + 1)
ht = ht.filter(ht.x // n != ht.y // n)
ht = ht.select()
ht = ht.annotate(start=hl.or_else(
hl.scan._prev_nonnull(hl.locus_from_global_position(ht.locus.global_position() + 1,
reference_genome=reference_genome)),
hl.locus_from_global_position(0, reference_genome=reference_genome)))
ht = ht.key_by()
ht = ht.select(interval=hl.interval(start=ht.start, end=ht.locus, includes_end=True))
intervals = ht.aggregate(hl.agg.collect(ht.interval))
last_st = hl.eval(
hl.locus_from_global_position(hl.literal(intervals[-1].end).global_position() + 1,
reference_genome=reference_genome))
interval = hl.Interval(start=last_st, end=end, includes_end=True)
intervals.append(interval)
return intervals
def _ac_an_parent_child_count(
proband_gt: hl.expr.CallExpression,
father_gt: hl.expr.CallExpression,
mother_gt: hl.expr.CallExpression,
) -> Dict[str, hl.expr.Int64Expression]:
"""
Helper method to get AC and AN for parents and children
"""
ac_parent_expr = hl.agg.sum(
father_gt.n_alt_alleles() + mother_gt.n_alt_alleles()
)
an_parent_expr = hl.agg.sum(
(hl.is_defined(father_gt) + hl.is_defined(mother_gt)) * 2
)
ac_child_expr = hl.agg.sum(proband_gt.n_alt_alleles())
an_child_expr = hl.agg.sum(hl.is_defined(proband_gt) * 2)
return {
f"ac_parents": ac_parent_expr,
f"an_parents": an_parent_expr,
f"ac_children": ac_child_expr,
f"an_children": an_child_expr,
}
def _summary_aggs(self):
length = hl.len(self)
return hl.tuple((
hl.agg.min(length),
hl.agg.max(length),
hl.agg.mean(length),
hl.agg.explode(lambda elt: hl.tuple((elt[0]._all_summary_aggs(), elt[1]._all_summary_aggs())), hl.array(self))))
attributes = ht.aggregate(hl.agg.explode(lambda x: hl.agg.collect_as_set(x), ht['attribute'].keys()))
sib_ht = sib_ht.group_by("sibs").aggregate(
sib_idx=(hl.agg.take(sib_ht.sib_idx, 1, ordering=sib_ht.sib_idx)[0])
)
sib_ht = sib_ht.group_by(sib_ht.sib_idx).aggregate(sibs=hl.agg.collect(sib_ht.sibs))
sib_ht = sib_ht.filter(hl.len(sib_ht.sibs) == 2).persist()
logger.info(
f"Generating sibling variant sharing counts using {sib_ht.count()} pairs."
)
sib_ht = sib_ht.explode("sibs").key_by("sibs")[mt.s]
# Create sibling sharing counters
sib_stats = hl.struct(
**{
f"n_sib_shared_variants_{name}": hl.sum(
hl.agg.filter(
expr,
hl.agg.group_by(
sib_ht.sib_idx,
hl.or_missing(
hl.agg.sum(hl.is_defined(mt.GT)) == 2,
hl.agg.min(get_alt_count(mt.locus, mt.GT, is_female)),
),
),
).values()
)
for name, expr in strata.items()
}
)
sib_stats = sib_stats.annotate(
**{
pca_loadings=loadings_ht[mt.row_key][loading_location],
pca_af=loadings_ht[mt.row_key][af_location],
)
mt = mt.filter_rows(
hl.is_defined(mt.pca_loadings)
& hl.is_defined(mt.pca_af)
& (mt.pca_af > 0)
& (mt.pca_af < 1)
)
gt_norm = (mt.GT.n_alt_alleles() - 2 * mt.pca_af) / hl.sqrt(
n_variants * 2 * mt.pca_af * (1 - mt.pca_af)
)
mt = mt.annotate_cols(scores=hl.agg.array_sum(mt.pca_loadings * gt_norm))
return mt.cols().select("scores")
ht.aggregate(hl.agg.array_agg(lambda i: hl.agg.linreg(ht.i0 + i, [ht.i1, ht.i2, ht.i3, ht.i4]),
hl.range(75)))
Uses the normal_ploidy_cutoff parameter to determine the ploidy cutoffs for XX and XY karyotypes.
Uses the aneuploidy_cutoff parameter to determine the cutoffs for sex aneuploidies.
Note that f-stat is used only to split the samples into roughly 'XX' and 'XY' categories and is not used in the final karyotype annotation.
:param ht: Table with f_stat and sex chromosome ploidies
:param f_stat_cutoff: f-stat to roughly divide 'XX' from 'XY' samples. Assumes XX samples are below cutoff and XY are above cutoff.
:param normal_ploidy_cutoff: Number of standard deviations to use when determining sex chromosome ploidy cutoffs for XX, XY karyotypes.
:param aneuploidy_cutoff: Number of standard deviations to use when sex chromosome ploidy cutoffs for aneuploidies.
:return: Tuple of ploidy cutoff tuples: ((x_ploidy_cutoffs), (y_ploidy_cutoffs))
"""
# Group sex chromosome ploidy table by f_stat cutoff and get mean/stdev for chrX/Y ploidies
sex_stats = ht.aggregate(
hl.agg.group_by(
hl.cond(ht.f_stat < f_stat_cutoff, "xx", "xy"),
hl.struct(x=hl.agg.stats(ht.chrX_ploidy), y=hl.agg.stats(ht.chrY_ploidy)),
)
)
logger.info(f"XX stats: {sex_stats['xx']}")
logger.info(f"XY stats: {sex_stats['xy']}")
cutoffs = (
(
sex_stats["xy"].x.mean + (normal_ploidy_cutoff * sex_stats["xy"].x.stdev),
(
sex_stats["xx"].x.mean
- (normal_ploidy_cutoff * sex_stats["xx"].x.stdev),
sex_stats["xx"].x.mean
+ (normal_ploidy_cutoff * sex_stats["xx"].x.stdev),
),
sex_stats["xx"].x.mean + (aneuploidy_cutoff * sex_stats["xx"].x.stdev),
),
_interval_key=intervals_ht.index(
callrate_mt.locus, all_matches=match
)._interval_key
)
if match:
callrate_mt = callrate_mt.explode_rows("_interval_key")
callrate_mt = callrate_mt.filter_rows(
hl.is_defined(callrate_mt._interval_key.interval)
)
callrate_mt = callrate_mt.select_entries(
GT=hl.or_missing(hl.is_defined(callrate_mt.GT), hl.struct())
)
callrate_mt = callrate_mt.group_rows_by(**callrate_mt._interval_key).aggregate(
callrate=hl.agg.fraction(hl.is_defined(callrate_mt.GT))
)
intervals_ht = intervals_ht.drop("_interval_key")
callrate_mt = callrate_mt.annotate_rows(
interval_info=hl.struct(**intervals_ht[callrate_mt.row_key])
)
return callrate_mt