Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_branch_indices(node_index: int, depth: int) -> Iterable[int]:
"""
Get the indices of all ancestors up until the root for a node with a given depth.
"""
yield from take(depth, iterate(lambda index: index // 2, node_index))
def get_branch_indices(node_index: int, depth: int) -> Sequence[int]:
"""Get the indices of all ancestors up until the root for a node with a given depth."""
return tuple(take(depth, iterate(lambda index: index // 2, node_index)))
def compute_up(t, seq, **kwargs):
if t.n < 100:
return tuple(take(t.n, seq))
else:
return take(t.n, seq)
def calc_merkle_tree(items: Sequence[Hashable]) -> MerkleTree:
"""Calculate the Merkle tree corresponding to a list of items."""
if len(items) == 0:
raise ValidationError("No items given")
n_layers = math.log2(len(items)) + 1
if not n_layers.is_integer():
raise ValidationError("Item number is not a power of two")
n_layers = int(n_layers)
leaves = tuple(keccak(item) for item in items)
tree = cast(MerkleTree, tuple(take(n_layers, iterate(_hash_layer, leaves)))[::-1])
if len(tree[0]) != 1:
raise Exception("Invariant: There must only be one root")
return tree
--------
>>> image1 = np.ones((2, 2), dtype=int) * 1
>>> image2 = np.ones((2, 2), dtype=int) * 2
>>> joined = stack_channels((None, image1, image2))
>>> joined.shape
(2, 2, 3)
>>> joined[0, 0]
array([0, 1, 2])
>>> joined = stack_channels((image1, image2), order=[None, 0, 1])
>>> joined.shape
(2, 2, 3)
>>> joined[0, 0]
array([0, 1, 2])
"""
# ensure we support iterators
images = list(tz.take(len(order), images))
# ensure we grab an image and not `None`
def is_array(obj): return isinstance(obj, np.ndarray)
image_prototype = next(filter(is_array, images))
# A `None` in `order` implies no image at that position
ordered_ims = [images[i] if i is not None else None for i in order]
ordered_ims = [np.zeros_like(image_prototype) if image is None else image
for image in ordered_ims]
# stack images with np.dstack, but if only a single channel is passed,
# don't add an extra dimension
stack_image = np.squeeze(np.dstack(ordered_ims))
while ordered_ims:
del ordered_ims[-1]
return stack_image
def calc_merkle_tree_from_leaves(leaves: Sequence[Hash32]) -> MerkleTree:
if len(leaves) == 0:
raise ValueError("No leaves given")
n_layers = math.log2(len(leaves)) + 1
if not n_layers.is_integer():
raise ValueError("Number of leaves is not a power of two")
n_layers = int(n_layers)
reversed_tree = tuple(take(n_layers, iterate(_hash_layer, leaves)))
tree = MerkleTree(tuple(reversed(reversed_tree)))
if len(tree[0]) != 1:
raise Exception("Invariant: There must only be one root")
return tree
:param vec: vectorizer
:type vec: DictVectorizer/FeatureHasher
:param ids: list of IDs of the the samples to use
:type ids: list
:param batch_size:
:type batch_size: int
:returns: sparse matrix
:rtype: sp.sparse.csr.csr_matrix
"""
X = []
samples = self._generate_samples(ids, 'x')
while True:
X_list = list(take(batch_size, samples))
if not X_list: break
X_part = vec.transform(X_list)
del X_list
X.append(X_part)
del X_part
return sp.sparse.csr_matrix(np.vstack([x.todense() for x in X]))