Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def normal_init(W, ops):
if (W ** 2).sum() != 0:
return
xp = get_array_module(W)
scale = xp.sqrt(1.0 / W.shape[-1])
shape = (W.shape[0], W.shape[-1])
size = xp.prod(shape)
for i in range(W.shape[1]):
xp.copyto(
W[:, i], xp.random.normal(loc=0, scale=scale, size=size).reshape(shape)
)
def from_padded(cls, padded: Array, lengths: List[int]) -> "RaggedArray":
if max(lengths, default=0) > padded.shape[1]:
return cls.from_truncated(padded, lengths)
mask = lengths2mask(lengths)
assert sum(mask) == sum(lengths)
all_rows = padded.reshape((-1,) + padded.shape[2:])
xp = get_array_module(all_rows)
data = xp.ascontiguousarray(all_rows[mask])
assert data.shape[0] == sum(lengths)
return cls(data, lengths)
def flatten(self, X, dtype=None, pad=0):
if X is None or len(X) == 0:
return self.allocate((0,), dtype=dtype or 'f')
xp = get_array_module(X[0])
X = [x for x in X if x.size != 0]
if int(pad) >= 1:
padded = []
for x in X:
padded.append(
xp.zeros((pad,) + x.shape[1:], dtype=x.dtype))
padded.append(x)
padded.append(
xp.zeros((pad,) + x.shape[1:], dtype=x.dtype))
X = padded
result = xp.concatenate(X)
if dtype is not None:
result = xp.asarray(result, dtype=dtype)
return result
def get_class_tokens(docs, drop=0.0):
"""Output a List[array], where the array is the class vector
for each sentence in the document. To backprop, we increment the values
in the Doc's d_last_hidden_state array.
"""
xp = get_array_module(docs[0]._.get(ATTRS.last_hidden_state))
outputs = []
doc_class_tokens = []
for doc in docs:
class_tokens = []
for i, wp in enumerate(doc._.get(ATTRS.word_pieces_)):
if is_class_token(wp):
class_tokens.append(i)
doc_class_tokens.append(xp.array(class_tokens, dtype="i"))
wp_tensor = doc._.get(ATTRS.last_hidden_state)
outputs.append(wp_tensor[doc_class_tokens[-1]])
def backprop_class_tokens(d_outputs, sgd=None):
for doc, class_tokens, dY in zip(docs, doc_class_tokens, d_outputs):
if doc._.get(ATTRS.d_last_hidden_state).size == 0:
xp = get_array_module(doc._.get(ATTRS.last_hidden_state))
grads = xp.zeros(doc._.get(ATTRS.last_hidden_state).shape, dtype="f")
def get_cossim_loss(self, yh, y, t):
# Add a small constant to avoid 0 vectors
# print()
# print("yh", yh)
# print("y", y)
# print("t", t)
yh = yh + 1e-8
y = y + 1e-8
# https://math.stackexchange.com/questions/1923613/partial-derivative-of-cosine-similarity
xp = get_array_module(yh)
norm_yh = xp.linalg.norm(yh, axis=1, keepdims=True)
norm_y = xp.linalg.norm(y, axis=1, keepdims=True)
mul_norms = norm_yh * norm_y
cos = (yh * y).sum(axis=1, keepdims=True) / mul_norms
# print("cos", cos)
d_yh = (y / mul_norms) - (cos * (yh / norm_yh ** 2))
# print("abs", xp.abs(cos - t))
loss = xp.abs(cos - t).sum()
# print("loss", loss)
# print("d_yh", d_yh)
inverse = np.asarray([int(t[i][0]) * d_yh[i] for i in range(len(t))])
# print("inverse", inverse)
return loss, -inverse
def _trans(X, *order):
"""Transpose and make contiguous"""
xp = get_array_module(X)
return xp.ascontiguousarray(X.transpose(order))
def _split_seqs(QKV, lengths, nH, nD):
assert sum(lengths) == QKV.shape[0], (sum(lengths), QKV.shape[0])
Qs = []
Ks = []
Vs = []
i = 0
xp = get_array_module(QKV)
for length in lengths:
qkv = QKV[i:i+length]
qkv = qkv.reshape((length, 3, nH*nD))
queries = xp.ascontiguousarray(qkv[:, 0])
keys = xp.ascontiguousarray(qkv[:, 1])
values = xp.ascontiguousarray(qkv[:, 2])
Qs.append(queries.reshape((-1, nH, nD)))
Ks.append(keys.reshape((-1, nH, nD)))
Vs.append(values.reshape((-1, nH, nD)))
i += length
return Qs, Ks, Vs
def softplus(self, X, threshold=20., out=None):
xp = get_array_module(X)
log1p_exp = xp.log1p(xp.exp(X))
indices = X >= threshold
log1p_exp[indices] = X[indices]
if out is None:
return log1p_exp
else:
out[:] = log1p_exp
return out