Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Calling .tocsr on a COO matrix with duplicate entries
# changes its data arrays in-place, leading to out-of-bounds
# array accesses in the WARP code.
# Reported in https://github.com/lyst/lightfm/issues/117.
rows, cols = (1000, 100)
mat = sp.random(rows, cols)
mat.data[:] = 1
# Duplicate entries in the COO matrix
mat.data = np.concatenate((mat.data, mat.data[:1000]))
mat.row = np.concatenate((mat.row, mat.row[:1000]))
mat.col = np.concatenate((mat.col, mat.col[:1000]))
for loss in ("warp", "bpr", "warp-kos"):
model = LightFM(loss="warp")
model.fit(mat)
def test_movielens_accuracy_fit():
model = LightFM(random_state=SEED)
model.fit(train, epochs=10)
train_predictions = model.predict(train.row, train.col)
test_predictions = model.predict(test.row, test.col)
assert roc_auc_score(train.data, train_predictions) > 0.84
assert roc_auc_score(test.data, test_predictions) > 0.76
def test_param_sanity():
with pytest.raises(AssertionError):
LightFM(no_components=-1)
with pytest.raises(AssertionError):
LightFM(user_alpha=-1.0)
with pytest.raises(AssertionError):
LightFM(item_alpha=-1.0)
with pytest.raises(ValueError):
LightFM(max_sampled=-1.0)
def test_intersections_check():
no_users, no_items = (10, 100)
train, test = _generate_data(no_users, no_items)
model = LightFM(loss="bpr")
model.fit_partial(train)
# check error is raised when train and test have interactions in common
with pytest.raises(ValueError):
evaluation.auc_score(
model, train, train_interactions=train, check_intersections=True
)
with pytest.raises(ValueError):
evaluation.recall_at_k(
model, train, train_interactions=train, check_intersections=True
)
with pytest.raises(ValueError):
evaluation.precision_at_k(
model, train, train_interactions=train, check_intersections=True
def test_warp_precision_high_interaction_values():
model = LightFM(learning_rate=0.05, loss="warp", random_state=SEED)
_train = train.copy()
_train.data = _train.data * 5
model.fit_partial(_train, epochs=10)
(train_precision, test_precision, full_train_auc, full_test_auc) = _get_metrics(
model, _train, test
)
assert train_precision > 0.45
assert test_precision > 0.07
assert full_train_auc > 0.93
assert full_test_auc > 0.9
def test_bpr_precision_multithreaded():
model = LightFM(learning_rate=0.05, loss="bpr", random_state=SEED)
model.fit_partial(train, epochs=10, num_threads=4)
(train_precision, test_precision, full_train_auc, full_test_auc) = _get_metrics(
model, train, test
)
assert train_precision > 0.45
assert test_precision > 0.07
assert full_train_auc > 0.91
assert full_test_auc > 0.87
def test_hogwild_accuracy():
# Should get comparable accuracy with 2 threads
model = LightFM(random_state=SEED)
model.fit_partial(train, epochs=10, num_threads=2)
train_predictions = model.predict(train.row, train.col, num_threads=2)
test_predictions = model.predict(test.row, test.col, num_threads=2)
assert roc_auc_score(train.data, train_predictions) > 0.84
assert roc_auc_score(test.data, test_predictions) > 0.76
def test_precision_at_k():
no_users, no_items = (10, 100)
train, test = _generate_data(no_users, no_items)
model = LightFM(loss="bpr")
# We want a high precision to catch the k=1 case
model.fit_partial(test)
for k in (10, 5, 1):
# Without omitting train interactions
precision = evaluation.precision_at_k(model, test, k=k)
expected_mean_precision = _precision_at_k(model, test, k)
assert np.allclose(precision.mean(), expected_mean_precision)
assert len(precision) == (test.getnnz(axis=1) > 0).sum()
assert (
len(evaluation.precision_at_k(model, train, preserve_rows=True))
== test.shape[0]
)
def test_movielens_accuracy_sample_weights():
# Scaling weights down and learning rate up
# by the same amount should result in
# roughly the same accuracy
scale = 0.5
weights = train.copy()
weights.data = np.ones(train.getnnz(), dtype=np.float32) * scale
for (loss, exp_score) in (("logistic", 0.74), ("bpr", 0.84), ("warp", 0.89)):
model = LightFM(loss=loss, random_state=SEED)
model.learning_rate * 1.0 / scale
model.fit_partial(train, sample_weight=weights, epochs=10)
(train_precision, test_precision, full_train_auc, full_test_auc) = _get_metrics(
model, train, test
)
assert full_train_auc > exp_score
def test_overfitting():
# Let's massivly overfit
model = LightFM(no_components=50, random_state=SEED)
model.fit_partial(train, epochs=30)
train_predictions = model.predict(train.row, train.col)
test_predictions = model.predict(test.row, test.col)
overfit_train = roc_auc_score(train.data, train_predictions)
overfit_test = roc_auc_score(test.data, test_predictions)
assert overfit_train > 0.99
assert overfit_test < 0.75