Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self.enc_model:
enc_data = self.enc.predict(train_data)
self.class_proto = {} # type: dict
self.class_enc = {} # type: dict
for i in range(self.classes):
idx = np.where(preds == i)[0]
self.class_proto[i] = np.expand_dims(np.mean(enc_data[idx], axis=0), axis=0)
self.class_enc[i] = enc_data[idx]
elif self.use_kdtree:
logger.warning('No encoder specified. Using k-d trees to represent class prototypes.')
if trustscore_kwargs is not None:
ts = TrustScore(**trustscore_kwargs)
else:
ts = TrustScore()
if self.is_cat: # map categorical to numerical data
train_data = ord_to_num(train_data_ord, self.d_abs)
ts.fit(train_data, preds, classes=self.classes)
self.kdtrees = ts.kdtrees
self.X_by_class = ts.X_kdtree
v_pad = np.pad(v, (0, n_pad), 'constant')
self.d_abs_ragged.append(v_pad)
self.d_abs_ragged = np.array(self.d_abs_ragged)
if self.enc_model:
enc_data = self.enc.predict(train_data)
self.class_proto = {} # type: dict
self.class_enc = {} # type: dict
for i in range(self.classes):
idx = np.where(preds == i)[0]
self.class_proto[i] = np.expand_dims(np.mean(enc_data[idx], axis=0), axis=0)
self.class_enc[i] = enc_data[idx]
elif self.use_kdtree:
logger.warning('No encoder specified. Using k-d trees to represent class prototypes.')
if trustscore_kwargs is not None:
ts = TrustScore(**trustscore_kwargs)
else:
ts = TrustScore()
if self.is_cat: # map categorical to numerical data
train_data = ord_to_num(train_data_ord, self.d_abs)
ts.fit(train_data, preds, classes=self.classes)
self.kdtrees = ts.kdtrees
self.X_by_class = ts.X_kdtree
self.d_abs_ragged = np.array(self.d_abs_ragged)
if self.enc_model:
enc_data = self.enc.predict(train_data)
self.class_proto = {} # type: dict
self.class_enc = {} # type: dict
for i in range(self.classes):
idx = np.where(preds == i)[0]
self.class_proto[i] = np.expand_dims(np.mean(enc_data[idx], axis=0), axis=0)
self.class_enc[i] = enc_data[idx]
elif self.use_kdtree:
logger.warning('No encoder specified. Using k-d trees to represent class prototypes.')
if trustscore_kwargs is not None:
ts = TrustScore(**trustscore_kwargs)
else:
ts = TrustScore()
if self.is_cat: # map categorical to numerical data
train_data = ord_to_num(train_data_ord, self.d_abs)
ts.fit(train_data, preds, classes=self.classes)
self.kdtrees = ts.kdtrees
self.X_by_class = ts.X_kdtree
# find instances where the gradient is 0
idx_nograd = np.where(f(preds) - g(preds) <= - self.kappa)[0]
if len(idx_nograd) == X.shape[0]:
return np.zeros(X.shape)
dl_df = f(preds_pert_pos) - f(preds_pert_neg) # N*P
dl_dg = g(preds_pert_pos) - g(preds_pert_neg) # N*P
dl_dp = dl_df - dl_dg # N*P
dl_dp = np.reshape(dl_dp, (X.shape[0], -1)) / (2 * self.eps[0]) # NxP
# dP/dx -> PxF
X_pert_pos, X_pert_neg = perturb(X, self.eps[1], proba=False) # (N*F)x(shape of X[0])
X_pert = np.concatenate([X_pert_pos, X_pert_neg], axis=0)
if self.is_cat:
X_pert = num_to_ord(X_pert, self.d_abs)
if self.ohe:
X_pert = ord_to_ohe(X_pert, cat_vars_ord)[0]
preds_concat = self.predict(X_pert)
n_pert = X_pert_pos.shape[0]
dp_dx = preds_concat[:n_pert] - preds_concat[n_pert:] # (N*F)*P
dp_dx = np.reshape(np.reshape(dp_dx, (X.shape[0], -1)),
(X.shape[0], preds.shape[1], -1), order='F') / (2 * self.eps[1]) # NxPxF
# dL/dx -> Bx(shape of X[0])
grads = np.einsum('ij,ijk->ik', dl_dp, dp_dx) # NxF
# set instances where gradient is 0 to 0
if len(idx_nograd) > 0:
grads[idx_nograd] = np.zeros(grads.shape[1:])
grads = np.mean(grads, axis=0) # B*F
grads = np.reshape(grads, (self.batch_size,) + grads_shape) # B*(shape of X[0])
return grads
Instance to encode and calculate distance metrics for
adv_class
Predicted class on the perturbed instance
orig_class
Predicted class on the original instance
eps
Small number to avoid dividing by 0
Returns
-------
Ratio between the distance to the prototype of the predicted class for the original instance and
the prototype of the predicted class for the perturbed instance.
"""
if self.enc_model:
if self.is_cat:
X = num_to_ord(X, self.d_abs)
if self.ohe:
X = ord_to_ohe(X, self.cat_vars_ord)
X_enc = self.enc.predict(X)
adv_proto = self.class_proto[adv_class]
orig_proto = self.class_proto[orig_class]
dist_adv = np.linalg.norm(X_enc - adv_proto)
dist_orig = np.linalg.norm(X_enc - orig_proto)
elif self.use_kdtree:
dist_adv = self.kdtrees[adv_class].query(X, k=1)[0]
dist_orig = self.kdtrees[orig_class].query(X, k=1)[0]
else:
logger.warning('Need either an encoder or the k-d trees enabled to compute distance scores.')
return dist_orig / (dist_adv + eps)
print('Gradient numerical attack min/max: {:.3f}/{:.3f}'.format(grads_num.min(),
grads_num.max()))
print('Gradient numerical mean/abs mean: {:.3f}/{:.3f}'.format(np.mean(grads_num),
np.mean(np.abs(grads_num))))
sys.stdout.flush()
# update best perturbation (distance) and class probabilities
# if beta * L1 + L2 < current best and predicted label is different from the initial label:
# update best current step or global perturbations
for batch_idx, (dist, proba, adv_idx) in enumerate(zip(loss_l1_l2, pred_proba, adv)):
Y_class = np.argmax(Y[batch_idx])
adv_class = np.argmax(proba)
adv_idx = np.expand_dims(adv_idx, axis=0)
if self.is_cat: # map back to categories
adv_idx = num_to_ord(adv_idx, self.d_abs)
if self.ohe: # map back from ordinal to OHE
adv_idx = ord_to_ohe(adv_idx, self.cat_vars_ord)[0]
# calculate trust score
if threshold > 0.:
score = self.score(adv_idx, np.argmax(pred_proba), Y_class)
above_threshold = score > threshold
else:
above_threshold = True
# current step
if (dist < current_best_dist[batch_idx] and compare(proba, Y_class) and above_threshold
and adv_class in target_class):
current_best_dist[batch_idx] = dist
current_best_proba[batch_idx] = adv_class
Instance around which gradient is evaluated
Y
One-hot representation of instance labels
grads_shape
Shape of gradients.
cat_vars_ord
Dict with as keys the categorical columns and as values
the number of categories per categorical variable.
Returns
-------
Array with gradients.
"""
# map back to categories to make predictions
if self.is_cat:
X_pred = num_to_ord(X, self.d_abs)
if self.ohe:
X_pred = ord_to_ohe(X_pred, cat_vars_ord)[0]
else:
X_pred = X
# N = gradient batch size; F = nb of features; P = nb of prediction classes; B = instance batch size
# dL/dP -> BxP
preds = self.predict(X_pred) # NxP
preds_pert_pos, preds_pert_neg = perturb(preds, self.eps[0], proba=True) # (N*P)xP
def f(preds_pert):
return np.sum(Y * preds_pert, axis=1)
def g(preds_pert):
return np.max((1 - Y) * preds_pert, axis=1)
grads = grads_graph + grads_num_s
self.sess.run(self.apply_grads, feed_dict={self.grad_ph: grads})
# update adv and adv_s with perturbed instances
self.sess.run([self.adv_updater, self.adv_updater_s, self.delta, self.delta_s])
# compute overall and attack loss, L1+L2 loss, prediction probabilities
# on perturbed instances and new adv
# L1+L2 and prediction probabilities used to see if adv is better than the current best adv under FISTA
if self.model:
loss_tot, loss_attack, loss_l1_l2, pred_proba, adv = \
self.sess.run([self.loss_total, self.loss_attack, self.l1_l2, self.pred_proba, self.adv])
else:
X_der = self.adv.eval(session=self.sess) # get updated perturbed instances
if self.is_cat: # map back to categories to make predictions
X_der = num_to_ord(X_der, self.d_abs)
if self.ohe:
X_der = ord_to_ohe(X_der, self.cat_vars_ord)[0]
pred_proba = self.predict(X_der)
# compute attack, total and L1+L2 losses as well as new perturbed instance
loss_attack = self.loss_fn(pred_proba, Y)
feed_dict = {self.loss_attack: loss_attack}
loss_tot, loss_l1_l2, adv = self.sess.run([self.loss_total, self.l1_l2, self.adv],
feed_dict=feed_dict)
if i % log_every == 0 or i % print_every == 0:
loss_l2, loss_l1, loss_ae, loss_proto = \
self.sess.run([self.loss_l2, self.loss_l1, self.loss_ae, self.loss_proto])
target_proba = np.sum(pred_proba * Y)
nontarget_proba_max = np.max((1 - Y) * pred_proba)
loss_opt = loss_l1_l2 + loss_attack + loss_ae + loss_proto
np.mean(np.abs(grads_num))))
sys.stdout.flush()
# update best perturbation (distance) and class probabilities
# if beta * L1 + L2 < current best and predicted label is different from the initial label:
# update best current step or global perturbations
for batch_idx, (dist, proba, adv_idx) in enumerate(zip(loss_l1_l2, pred_proba, adv)):
Y_class = np.argmax(Y[batch_idx])
adv_class = np.argmax(proba)
adv_idx = np.expand_dims(adv_idx, axis=0)
if self.is_cat: # map back to categories
adv_idx = num_to_ord(adv_idx, self.d_abs)
if self.ohe: # map back from ordinal to OHE
adv_idx = ord_to_ohe(adv_idx, self.cat_vars_ord)[0]
# calculate trust score
if threshold > 0.:
score = self.score(adv_idx, np.argmax(pred_proba), Y_class)
above_threshold = score > threshold
else:
above_threshold = True
# current step
if (dist < current_best_dist[batch_idx] and compare(proba, Y_class) and above_threshold
and adv_class in target_class):
current_best_dist[batch_idx] = dist
current_best_proba[batch_idx] = adv_class
# global
if (dist < overall_best_dist[batch_idx] and compare(proba, Y_class) and above_threshold
One-hot representation of instance labels
grads_shape
Shape of gradients.
cat_vars_ord
Dict with as keys the categorical columns and as values
the number of categories per categorical variable.
Returns
-------
Array with gradients.
"""
# map back to categories to make predictions
if self.is_cat:
X_pred = num_to_ord(X, self.d_abs)
if self.ohe:
X_pred = ord_to_ohe(X_pred, cat_vars_ord)[0]
else:
X_pred = X
# N = gradient batch size; F = nb of features; P = nb of prediction classes; B = instance batch size
# dL/dP -> BxP
preds = self.predict(X_pred) # NxP
preds_pert_pos, preds_pert_neg = perturb(preds, self.eps[0], proba=True) # (N*P)xP
def f(preds_pert):
return np.sum(Y * preds_pert, axis=1)
def g(preds_pert):
return np.max((1 - Y) * preds_pert, axis=1)
# find instances where the gradient is 0
idx_nograd = np.where(f(preds) - g(preds) <= - self.kappa)[0]