Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@curry
def get_uncle_reward(block_reward: int, block_number: int, uncle: BaseBlock) -> int:
block_number_delta = block_number - uncle.block_number
validate_lte(block_number_delta, MAX_UNCLE_DEPTH)
return (8 - block_number_delta) * block_reward // 8
@curry
def compute_difficulty(
bomb_delay: int,
parent_header: BlockHeader,
timestamp: int) -> int:
"""
https://github.com/ethereum/EIPs/issues/100
"""
parent_timestamp = parent_header.timestamp
validate_gt(timestamp, parent_timestamp, title="Header.timestamp")
parent_difficulty = parent_header.difficulty
offset = parent_difficulty // DIFFICULTY_ADJUSTMENT_DENOMINATOR
has_uncles = parent_header.uncles_hash != EMPTY_UNCLE_HASH
adj_factor = max(
(
Returns
-------
illum : np.ndarray, float, shape (M, N)
The estimated illumination over the image field.
See Also
--------
`correct_image_illumination`, `correct_multiimage_illumination`.
"""
# this function follows the "PyToolz" streaming data model to
# obtain the illumination estimate.
# first, define the functions for each individual step:
in_range = ('image' if input_bitdepth is None
else (0, 2**input_bitdepth - 1))
rescale = tz.curry(exposure.rescale_intensity)
normalize = (tz.partial(stretchlim, bottom=stretch_quantile)
if stretch_quantile > 0
else skimage.img_as_float)
# produce a stream of properly-scaled images
ims = (tz.pipe(fn, io.imread, rescale(in_range=in_range), normalize)
for fn in fns)
# take the mean of that stream
mean_image = mean(ims)
# return the median filter of that mean
radius = radius or min(mean_image.shape) // 4
illum = ndi.percentile_filter(mean_image, percentile=(quantile * 100),
footprint=morphology.disk(radius))
return illum
def get_branch_indices(node_index: int, depth: int) -> Iterable[int]:
"""
Get the indices of all ancestors up until the root for a node with a given depth.
"""
yield from take(depth, iterate(lambda index: index // 2, node_index))
def get_branch_indices(node_index: int, depth: int) -> Sequence[int]:
"""Get the indices of all ancestors up until the root for a node with a given depth."""
return tuple(take(depth, iterate(lambda index: index // 2, node_index)))
See Also
--------
`correct_image_illumination`, `correct_multiimage_illumination`.
"""
# this function follows the "PyToolz" streaming data model to
# obtain the illumination estimate.
# first, define the functions for each individual step:
in_range = ('image' if input_bitdepth is None
else (0, 2**input_bitdepth - 1))
rescale = tz.curry(exposure.rescale_intensity)
normalize = (tz.partial(stretchlim, bottom=stretch_quantile)
if stretch_quantile > 0
else skimage.img_as_float)
# produce a stream of properly-scaled images
ims = (tz.pipe(fn, io.imread, rescale(in_range=in_range), normalize)
for fn in fns)
# take the mean of that stream
mean_image = mean(ims)
# return the median filter of that mean
radius = radius or min(mean_image.shape) // 4
illum = ndi.percentile_filter(mean_image, percentile=(quantile * 100),
footprint=morphology.disk(radius))
return illum
def curry_namespace(ns):
return dict(
(name, curry(f) if should_curry(f) else f)
for name, f in ns.items()
if "__" not in name
)
def generate_collations():
explicit_params = {}
for period in itertools.count():
default_params = {
"shard_id": 0,
"period": period,
"body": zpad_right(b"body%d" % period, COLLATION_SIZE),
"proposer_address": zpad_right(b"proposer%d" % period, 20),
}
# only calculate chunk root if it wouldn't be replaced anyway
if "chunk_root" not in explicit_params:
default_params["chunk_root"] = calc_chunk_root(default_params["body"])
params = merge(default_params, explicit_params)
header = CollationHeader(
shard_id=params["shard_id"],
chunk_root=params["chunk_root"],
period=params["period"],
proposer_address=params["proposer_address"],
)
collation = Collation(header, params["body"])
explicit_params = (yield collation) or {}
def test_call_checks_nonce(vm):
computation, _ = vm.apply_transaction(SIGNED_DEFAULT_TRANSACTION)
assert computation.is_success
computation, _ = vm.apply_transaction(SIGNED_DEFAULT_TRANSACTION)
assert computation.is_error
transaction = UnsignedUserAccountTransaction(**merge(DEFAULT_TX_PARAMS, {
"nonce": 2,
})).as_signed_transaction(PRIVATE_KEY)
computation, _ = vm.apply_transaction(transaction)
assert computation.is_error
def test_from_to_iterable(nums):
nums_pl = nums
nums_pl = aio.from_iterable(nums_pl)
nums_pl = cz.partition_all(10, nums_pl)
nums_pl = aio.map(sum, nums_pl)
nums_pl = list(nums_pl)
nums_py = nums
nums_py = cz.partition_all(10, nums_py)
nums_py = map(sum, nums_py)
nums_py = list(nums_py)
assert nums_py == nums_pl