Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def functional():
return count_by(itemgetter('hour'),
map(json.loads,
filter(None,
mapcat(lambda output: output.strip().split('\n'),
map(lambda date: logs[date.strftime('%Y/%m/%d')],
map(lambda days_ago: today - timedelta(days=days_ago),
range(1, days_of_logs + 1)))))))
tuple: :class:`chanjo.BaseInterval`, coverage (float), and
completeness (float)
"""
# setup: connect to BAM-file
bam = BamFile(bam_path)
# the pipeline
return pipe(
bed_stream,
filter(complement(comment_sniffer)), # filter out comments
map(text_type.rstrip), # strip invisble chars.
map(prefix(contig_prefix)), # prefix to contig
map(split(sep='\t')), # split lines
map(do(validate_bed_format)), # check correct format
map(lambda row: bed_to_interval(*row)), # convert to objects
map(extend_interval(extension=extension)), # extend intervals
group_intervals(bp_threshold=bp_threshold), # group by threshold
map(process_interval_group(bam)), # read coverage
concat, # flatten list of lists
map(calculate_metrics(threshold=cutoff)) # calculate cov./compl.
)
# setup up which columns to fetch to make BED file
# column 5 is just a silly default for the "score" field in BED
i = Interval # alias
columns = (i.contig, i.start - 1, i.end, i.id, i.strand)
# BED files are tab-delimited
delimiter = '\t'
# 1. fetch interval tuples from the database (producer)
# 2. stringify each item in each subsequence (interval tuple)
# 3. join lines on tab-character
# 4. prepend the header
bed_lines = pipe(
fetch_records(chanjo_db, columns),
map(map(str)), # convert fields to strings
map(juxt(compose(list, take(4)), # keep first 4 fields
lambda _: [str(bed_score)], # insert BED score
compose(list, last))), # keep last field
map(concat), # flatten each item
map(delimiter.join) # join on \t
)
for bed_line in bed_lines:
yield bed_line
def _get_lc_folds(date_range: Union[pd.DatetimeIndex, pd.PeriodIndex],
date_fold_filter_fn: Callable[[DateType], pd.DataFrame],
test_time: pd.Series,
time_column: str,
min_samples: int) -> List[Tuple[pd.Series, pd.Series]]:
return pipe(date_range,
map(date_fold_filter_fn), # iteratively filter the dates
map(lambda df: df[time_column]), # keep only time column
filter(lambda s: len(s.index) > min_samples),
lambda train: zip(train, repeat(test_time)),
list)
def get_all_launch_server_data(
tenant_id,
group_id,
now,
get_scaling_group_servers=get_scaling_group_servers,
get_clb_contents=get_clb_contents,
get_rcv3_contents=get_rcv3_contents):
"""
Gather all launch_server data relevant for convergence w.r.t given time,
in parallel where possible.
Returns an Effect of {'servers': [NovaServer], 'lb_nodes': [LBNode]}.
"""
eff = parallel(
[get_scaling_group_servers(tenant_id, group_id, now)
.on(map(NovaServer.from_server_details_json)).on(list),
get_clb_contents(),
get_rcv3_contents()]
).on(lambda (servers, clb_nodes_and_clbs, rcv3_nodes): {
'servers': servers,
'lb_nodes': clb_nodes_and_clbs[0] + rcv3_nodes,
'lbs': clb_nodes_and_clbs[1]
})
return eff
prev_params = {}
try:
prev_params = json.loads(prev_params_json.decode("utf-8"))
except Exception as e:
yield err(None, "terminator-params-err", params_json=prev_params_json)
entries, prev_params = yield read_entries(
ServiceType.CLOUD_FEEDS_CAP, url, prev_params, Direction.PREVIOUS,
log_msg_type="terminator-events-response")
yield Effect(
zk.UpdateNode(zk_prev_path, json.dumps(prev_params).encode("utf-8")))
entries = pipe(entries,
map(extract_info),
groupby(lambda e: e.tenant_id),
itervalues,
map(first))
yield parallel(map(process_entry, entries))
def discover_sqlcontext(ctx):
try:
table_names = list(map(str, ctx.tableNames()))
except AttributeError:
java_names = ctx._ssql_ctx.catalog().tables().keySet()
table_names = list(scala_set_to_set(ctx, java_names))
table_names.sort()
dshapes = zip(table_names, map(discover, map(ctx.table, table_names)))
return datashape.DataShape(datashape.Record(dshapes))
def _get_sc_folds(date_range: Union[pd.DatetimeIndex, pd.PeriodIndex],
date_fold_filter_fn: Callable[[DateType], pd.DataFrame],
time_column: str,
min_samples: int) -> List[Tuple[pd.Series, pd.Series]]:
return pipe(date_range,
map(date_fold_filter_fn), # iteratively filter the dates
map(lambda df: df[time_column]), # keep only time column
filter(lambda s: len(s.index) > min_samples),
list)
def _get_lc_folds(date_range: Union[pd.DatetimeIndex, pd.PeriodIndex],
date_fold_filter_fn: Callable[[DateType], pd.DataFrame],
test_time: pd.Series,
time_column: str,
min_samples: int) -> List[Tuple[pd.Series, pd.Series]]:
return pipe(date_range,
map(date_fold_filter_fn), # iteratively filter the dates
map(lambda df: df[time_column]), # keep only time column
filter(lambda s: len(s.index) > min_samples),
lambda train: zip(train, repeat(test_time)),
list)
def overlap_internal(x, axes):
""" Share boundaries between neighboring blocks
Parameters
----------
x: da.Array
A dask array
axes: dict
The size of the shared boundary per axis
The axes input informs how many cells to overlap between neighboring blocks
{0: 2, 2: 5} means share two cells in 0 axis, 5 cells in 2 axis
"""
dims = list(map(len, x.chunks))
expand_key2 = partial(expand_key, dims=dims, axes=axes)
# Make keys for each of the surrounding sub-arrays
interior_keys = pipe(
x.__dask_keys__(), flatten, map(expand_key2), map(flatten), concat, list
)
name = "overlap-" + tokenize(x, axes)
getitem_name = "getitem-" + tokenize(x, axes)
interior_slices = {}
overlap_blocks = {}
for k in interior_keys:
frac_slice = fractional_slice((x.name,) + k, axes)
if (x.name,) + k != frac_slice:
interior_slices[(getitem_name,) + k] = frac_slice
else: