Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_query(query_strategy, n_features=50, n_sample=100,
n_instances_list=[0, 1, 5, 50], n_train_idx=[0, 1, 5, 50]):
classifier = get_model("rf")
query_model = get_query_model(query_strategy)
X = np.random.rand(n_sample, n_features)
y = np.concatenate((np.zeros(n_sample//2), np.ones(n_sample//2)), axis=0)
print(X.shape, y.shape)
order = np.random.permutation(n_sample)
print(order.shape)
X = X[order]
y = y[order]
sources = query_strategy.split('_')
classifier.fit(X, y)
assert isinstance(query_model.param, dict)
assert query_model.name == query_strategy
for n_instances in n_instances_list:
if use_granular:
with Logger.from_file(log_file) as logger:
# Two loops of training and classification.
reviewer.train()
reviewer.log_probabilities(logger)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)
reviewer.classify(query_idx, inclusions, logger)
reviewer.train()
reviewer.log_probabilities(logger)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)
reviewer.classify(query_idx, inclusions, logger)
else:
with Logger.from_file(log_file) as logger:
if log_file is None:
logger.set_labels(reviewer.y)
init_idx, init_labels = reviewer._prior_knowledge()
reviewer.query_i = 0
reviewer.train_idx = np.array([], dtype=np.int)
reviewer.classify(init_idx, init_labels, logger, method="initial")
reviewer._do_review(logger)
if log_file is None:
print(logger._log_dict)
check_log(logger)
if log_file is not None:
with Logger.from_file(log_file, read_only=True) as logger:
check_log(logger)
if not continue_from_log:
try:
if log_file is not None:
os.unlink(log_file)
except OSError:
pass
if monkeypatch is not None:
monkeypatch.setattr('builtins.input', lambda _: "0")
# start the review process.
reviewer = get_reviewer(data_fp, mode=mode, embedding_fp=embedding_fp,
prior_included=[1, 3], prior_excluded=[2, 4],
log_file=log_file,
**kwargs)
if use_granular:
with Logger.from_file(log_file) as logger:
# Two loops of training and classification.
reviewer.train()
reviewer.log_probabilities(logger)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)
reviewer.classify(query_idx, inclusions, logger)
reviewer.train()
reviewer.log_probabilities(logger)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)
reviewer.classify(query_idx, inclusions, logger)
else:
with Logger.from_file(log_file) as logger:
if log_file is None:
logger.set_labels(reviewer.y)
else:
with Logger.from_file(log_file) as logger:
if log_file is None:
logger.set_labels(reviewer.y)
init_idx, init_labels = reviewer._prior_knowledge()
reviewer.query_i = 0
reviewer.train_idx = np.array([], dtype=np.int)
reviewer.classify(init_idx, init_labels, logger, method="initial")
reviewer._do_review(logger)
if log_file is None:
print(logger._log_dict)
check_log(logger)
if log_file is not None:
with Logger.from_file(log_file, read_only=True) as logger:
check_log(logger)
def test_state_continue_h5():
inter_file = os.path.join(state_dir, "test_1_inst.h5")
if not os.path.isfile(inter_file):
reviewer = get_reviewer(
data_fp, mode="simulate", model="nb", embedding_fp=embedding_fp,
prior_idx=[1, 2, 3, 4], state_file=inter_file,
n_instances=1, n_queries=1)
reviewer.review()
copyfile(inter_file, h5_state_file)
check_model(mode="simulate", model="nb", state_file=h5_state_file,
continue_from_state=True, n_instances=1, n_queries=2)
def test_log_continue_h5(monkeypatch):
inter_file = os.path.join(log_dir, "test_1_inst.h5")
if not os.path.isfile(inter_file):
reviewer = get_reviewer(
data_fp, mode="simulate", model="nb", embedding_fp=embedding_fp,
prior_included=[1, 3], prior_excluded=[2, 4], log_file=inter_file,
n_instances=1, n_queries=1)
reviewer.review()
copyfile(inter_file, h5_log_file)
check_model(monkeypatch, model="nb", log_file=h5_log_file,
continue_from_log=True, n_instances=1, n_queries=2)
def check_model(monkeypatch=None, use_granular=False, log_file=h5_log_file,
continue_from_log=False, mode="oracle", **kwargs):
if not continue_from_log:
try:
if log_file is not None:
os.unlink(log_file)
except OSError:
pass
if monkeypatch is not None:
monkeypatch.setattr('builtins.input', lambda _: "0")
# start the review process.
reviewer = get_reviewer(data_fp, mode=mode, embedding_fp=embedding_fp,
prior_included=[1, 3], prior_excluded=[2, 4],
log_file=log_file,
**kwargs)
if use_granular:
with Logger.from_file(log_file) as logger:
# Two loops of training and classification.
reviewer.train()
reviewer.log_probabilities(logger)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)
reviewer.classify(query_idx, inclusions, logger)
reviewer.train()
reviewer.log_probabilities(logger)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)
def test_state_continue_json():
inter_file = os.path.join(state_dir, "test_1_inst.json")
if not os.path.isfile(inter_file):
reviewer = get_reviewer(
data_fp, mode="simulate", model="nb", embedding_fp=embedding_fp,
prior_idx=[1, 2, 3, 4], state_file=inter_file,
n_instances=1, n_queries=1)
reviewer.review()
copyfile(inter_file, json_state_file)
check_model(mode="simulate", model="nb", state_file=json_state_file,
continue_from_state=True, n_instances=1, n_queries=2)
def test_no_seed():
n_test_max = 100
as_data = ASReviewData.from_file(data_fp)
n_priored = np.zeros(len(as_data), dtype=int)
for _ in range(n_test_max):
reviewer = get_reviewer(
data_fp, mode="simulate", model="nb", state_file=None,
init_seed=None, n_prior_excluded=1, n_prior_included=1)
assert len(reviewer.start_idx) == 2
n_priored[reviewer.start_idx] += 1
if np.all(n_priored > 0):
return
raise ValueError(f"Error getting all priors in {n_test_max} iterations.")
def check_model(monkeypatch=None, use_granular=False, state_file=h5_state_file,
continue_from_state=False, mode="oracle", data_fp=data_fp,
**kwargs):
if not continue_from_state:
try:
if state_file is not None:
os.unlink(state_file)
except OSError:
pass
if monkeypatch is not None:
monkeypatch.setattr('builtins.input', lambda _: "0")
# start the review process.
reviewer = get_reviewer(data_fp, mode=mode, embedding_fp=embedding_fp,
prior_idx=[1, 2, 3, 4],
state_file=state_file,
**kwargs)
if use_granular:
with open_state(state_file) as state:
# Two loops of training and classification.
reviewer.train()
reviewer.log_probabilities(state)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)
reviewer.classify(query_idx, inclusions, state)
reviewer.train()
reviewer.log_probabilities(state)
query_idx = reviewer.query(1)
inclusions = reviewer._get_labels(query_idx)