Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def scatter_(self, dim, index, src):
"""Writes all values from the tensor `src` into `self` at the indices
specified in the `index` tensor. For each value in `src`, its output index
is specified by its index in `src` for `dimension != dim` and by the
corresponding value in `index` for `dimension = dim`.
"""
if torch.is_tensor(src):
src = ArithmeticSharedTensor(src)
assert isinstance(
src, ArithmeticSharedTensor
), "Unrecognized scatter src type: %s" % type(src)
self.share.scatter_(dim, index, src.share)
return self
def PRZS(*size):
"""
Generate a Pseudo-random Sharing of Zero (using arithmetic shares)
This function does so by generating `n` numbers across `n` parties with
each number being held by exactly 2 parties. One of these parties adds
this number while the other subtracts this number.
"""
tensor = ArithmeticSharedTensor(src=SENTINEL)
current_share = generate_random_ring_element(*size, generator=comm.get().g0)
next_share = generate_random_ring_element(*size, generator=comm.get().g1)
tensor.share = current_share - next_share
return tensor
1. Obtain uniformly random sharings [a],[b] and [c] = [a * b]
2. Additively hide [x] and [y] with appropriately sized [a] and [b]
3. Open ([epsilon] = [x] - [a]) and ([delta] = [y] - [b])
4. Return [z] = [c] + (epsilon * [b]) + ([a] * delta) + (epsilon * delta)
"""
assert op in ["mul", "matmul", "conv2d", "conv_transpose2d"]
provider = crypten.mpc.get_default_provider()
a, b, c = provider.generate_additive_triple(x.size(), y.size(), op, *args, **kwargs)
# Stack to vectorize reveal if possible
if x.size() == y.size():
from .arithmetic import ArithmeticSharedTensor
eps_del = ArithmeticSharedTensor.stack([x - a, y - b]).reveal()
epsilon = eps_del[0]
delta = eps_del[1]
else:
epsilon = (x - a).reveal()
delta = (y - b).reveal()
# z = c + (a * delta) + (epsilon * b) + epsilon * delta
# TODO: Implement crypten.mul / crypten.matmul / crypten.conv{_transpose}2d
c._tensor += getattr(torch, op)(epsilon, b._tensor, *args, **kwargs)
c += getattr(a, op)(delta, *args, **kwargs)
c += getattr(torch, op)(epsilon, delta, *args, **kwargs)
return c
def shallow_copy(self):
"""Create a shallow copy"""
result = ArithmeticSharedTensor(src=SENTINEL)
result.encoder = self.encoder
result.share = self.share
return result
def shallow_copy(self):
"""Create a shallow copy"""
result = ArithmeticSharedTensor(src=SENTINEL)
result.encoder = self.encoder
result.share = self.share
return result
def _add_property_function(function_name):
def property_func(self, *args, **kwargs):
return getattr(self.share, function_name)(*args, **kwargs)
setattr(ArithmeticSharedTensor, function_name, property_func)
def from_shares(share, precision=None, src=0):
"""Generate an ArithmeticSharedTensor from a share from each party"""
result = ArithmeticSharedTensor(src=SENTINEL)
result.share = share
result.encoder = FixedPointEncoder(precision_bits=precision)
return result
def stack(tensors, *args, **kwargs):
"""Perform tensor stacking"""
for i, tensor in enumerate(tensors):
if torch.is_tensor(tensor):
tensors[i] = ArithmeticSharedTensor(tensor)
assert isinstance(
tensors[i], ArithmeticSharedTensor
), "Can't stack %s with ArithmeticSharedTensor" % type(tensor)
result = tensors[0].shallow_copy()
result.share = torch.stack(
[tensor.share for tensor in tensors], *args, **kwargs
)
return result