Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def move_label_from_pool_to_labeled(project_id, paper_i, label):
print(f"Move {paper_i} from pool to labeled")
# load the papers from the pool
pool_idx = read_pool(project_id)
# Remove the paper from the pool.
try:
pool_idx.remove(int(paper_i))
except (IndexError, ValueError):
print(f"Failed to remove {paper_i} from the pool.")
return
write_pool(project_id, pool_idx)
# Add the paper to the reviewed papers.
labeled = read_label_history(project_id)
labeled.append([int(paper_i), int(label)])
write_label_history(project_id, labeled)
def get_instance(project_id):
"""Get a new instance to review.
Arguments
---------
project_id: str
The id of the current project.
"""
fp_lock = get_lock_path(project_id)
with SQLiteLock(fp_lock, blocking=True, lock_name="active"):
pool_idx = read_pool(project_id)
if len(pool_idx) > 0:
logging.info(f"Requesting {pool_idx[0]} from project {project_id}")
return pool_idx[0]
else:
# end of pool
logging.info(f"No more records for project {project_id}")
return None
return
query_idx = np.array([x[0] for x in diff_history], dtype=int)
inclusions = np.array([x[1] for x in diff_history], dtype=int)
# Classify the new labels, train and store the results.
with open_state(state_file) as state:
reviewer.classify(query_idx, inclusions, state, method=label_method)
reviewer.train()
reviewer.log_probabilities(state)
new_query_idx = reviewer.query(reviewer.n_pool()).tolist()
reviewer.log_current_query(state)
proba = state.pred_proba.tolist()
with SQLiteLock(lock_file, blocking=True, lock_name="active") as lock:
current_pool = read_pool(project_id)
in_current_pool = np.zeros(len(as_data))
in_current_pool[current_pool] = 1
new_pool = [x for x in new_query_idx
if in_current_pool[x]]
write_pool(project_id, new_pool)
write_proba(project_id, proba)
def move_label_from_labeled_to_pool(project_id, paper_i, label):
print(f"Move {paper_i} from labeled to pool")
# load the papers from the pool
pool_list = read_pool(project_id)
# Add the paper to the reviewed papers.
labeled_list = read_label_history(project_id)
labeled_list_new = []
for item_id, item_label in labeled_list:
item_id = int(item_id)
item_label = int(item_label)
if paper_i == item_id:
pool_list.append(item_id)
else:
labeled_list_new.append([item_id, item_label])
def api_random_prior_papers(project_id): # noqa: F401
"""Get a selection of random papers to find exclusions.
This set of papers is extracted from the pool, but without
the already labeled items.
"""
lock_fp = get_lock_path(project_id)
with SQLiteLock(lock_fp, blocking=True, lock_name="active"):
pool = read_pool(project_id)
# with open(get_labeled_path(project_id, 0), "r") as f_label:
# prior_labeled = json.load(f_label)
# excluded the already labeled items from our random selection.
# prior_labeled_index = [int(label) for label in prior_labeled.keys()]
# pool = [i for i in pool if i not in prior_labeled_index]
# sample from the pool (this is already done atm of initializing
# the pool. But doing it again because a double shuffle is always
# best)
try:
pool_random = np.random.choice(pool, 5, replace=False)
except Exception:
raise ValueError("Not enough random indices to sample from.")