Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_class(self):
from diffprivlib.mechanisms import DPMechanism
self.assertTrue(issubclass(LaplaceBoundedDomain, DPMechanism))
def setup_method(self, method):
if method.__name__ .endswith("prob"):
global_seed(314159)
self.mech = LaplaceBoundedDomain()
features = var.shape[0]
local_epsilon = self.epsilon / 2
local_epsilon /= features
if len(self.bounds) != features:
raise ValueError("Bounds must be specified for each feature dimension")
# Extra np.array() a temporary fix for PyLint bug: https://github.com/PyCQA/pylint/issues/2747
new_mu = np.array(np.zeros_like(mu))
new_var = np.array(np.zeros_like(var))
for feature in range(features):
local_diameter = self.bounds[feature][1] - self.bounds[feature][0]
mech_mu = Laplace().set_sensitivity(local_diameter / n_samples).set_epsilon(local_epsilon)
mech_var = LaplaceBoundedDomain().set_sensitivity((n_samples - 1) * local_diameter ** 2 / n_samples ** 2)\
.set_epsilon(local_epsilon).set_bounds(0, float("inf"))
new_mu[feature] = mech_mu.randomise(mu[feature])
new_var[feature] = mech_var.randomise(var[feature])
return new_mu, new_var
def _update_centers(self, X, centers, labels, dims, total_iters):
"""Updates the centers of the KMeans algorithm for the current iteration, while satisfying differential
privacy.
Differential privacy is satisfied by adding (integer-valued, using :class:`.GeometricFolded`) random noise to
the count of nearest neighbours to the previous cluster centers, and adding (real-valued, using
:class:`.LaplaceBoundedDomain`) random noise to the sum of values per dimension.
"""
epsilon_0, epsilon_i = self._split_epsilon(dims, total_iters)
geometric_mech = GeometricFolded().set_sensitivity(1).set_bounds(0.5, float("inf")).set_epsilon(epsilon_0)
laplace_mech = LaplaceBoundedDomain().set_epsilon(epsilon_i)
for cluster in range(self.n_clusters):
if cluster not in labels:
continue
cluster_count = sum(labels == cluster)
noisy_count = geometric_mech.randomise(cluster_count)
cluster_sum = np.sum(X[labels == cluster], axis=0)
# Extra np.array() a temporary fix for PyLint bug: https://github.com/PyCQA/pylint/issues/2747
noisy_sum = np.array(np.zeros_like(cluster_sum))
for i in range(dims):
laplace_mech.set_sensitivity(self.bounds[i][1] - self.bounds[i][0]) \
.set_bounds(noisy_count * self.bounds[i][0], noisy_count * self.bounds[i][1])
noisy_sum[i] = laplace_mech.randomise(cluster_sum[i])
ranges = np.ones_like(actual_var) * range
else:
ranges = np.array(range)
if not (ranges > 0).all():
raise ValueError("Ranges must be specified for each value returned by np.var(), and must be non-negative")
if ranges.shape != actual_var.shape:
raise ValueError("Shape of range must be same as shape of np.var()")
if isinstance(actual_var, np.ndarray):
# Extra np.array() a temporary fix for PyLint bug: https://github.com/PyCQA/pylint/issues/2747
dp_var = np.array(np.zeros_like(actual_var))
iterator = np.nditer(actual_var, flags=['multi_index'])
while not iterator.finished:
dp_mech = LaplaceBoundedDomain().set_epsilon(epsilon).set_bounds(0, float("inf"))\
.set_sensitivity((ranges[iterator.multi_index] / num_datapoints) ** 2 * (num_datapoints - 1))
dp_var[iterator.multi_index] = dp_mech.randomise(float(iterator[0]))
iterator.iternext()
return dp_var
range = np.ravel(ranges)[0]
dp_mech = LaplaceBoundedDomain().set_epsilon(epsilon).set_bounds(0, float("inf")).\
set_sensitivity(range ** 2 / num_datapoints)
return dp_mech.randomise(actual_var)