Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sparse_vector_fresh():
space_type = 'cosinesimil_sparse_fast'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '_sparse.index'
if os.path.isfile(index_name):
os.remove(index_name)
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.SPARSE_VECTOR,
nmslib.DistType.FLOAT)
for id, data in enumerate(read_sparse_data('sample_sparse_dataset.txt')):
nmslib.addDataPoint(index, id, data)
print('We have added %d data points' % nmslib.getDataPointQty(index))
for i in range(0,min(MAX_PRINT_QTY,nmslib.getDataPointQty(index))):
print(nmslib.getDataPoint(index,i))
print('Let\'s invoke the index-build process')
index_param = ['NN=17', 'efConstruction=50', 'indexThreadQty=4']
query_time_param = ['efSearch=50']
nmslib.createIndex(index, index_param)
print(res[:5])
for i in res[0]:
print(int(i), distance.cosine(q0, dataset[int(i),:]))
#space_type = 'cosinesimil_sparse'
space_type = 'cosinesimil_sparse_fast'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '_sparse.index'
if os.path.isfile(index_name):
os.remove(index_name)
index = nmslib.init(space_type,
space_param,
method_name,
nmslib.DataType.SPARSE_VECTOR,
nmslib.DistType.FLOAT)
if batch:
with TimeIt('batch add'):
positions = nmslib.addDataPointBatch(index, np.arange(len(dataset), dtype=np.int32), data_matrix)
print('positions', positions)
else:
d = []
q = []
with TimeIt('preparing'):
for data in dataset:
d.append([[i, v] for i, v in enumerate(data) if v > 0])
for data in queryset:
q.append([[i, v] for i, v in enumerate(data) if v > 0])
with TimeIt('adding points'):
for id, data in enumerate(d):
def setUp(self):
space_type = 'cosinesimil_sparse'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '.index'
if os.path.isfile(index_name):
os.remove(index_name)
self.index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.SPARSE_VECTOR,
nmslib.DistType.FLOAT)
k = 3
for idx, data in enumerate(read_data_as_string('sample_queryset.txt')):
print(idx, nmslib.knnQuery(index, k, data))
nmslib.saveIndex(index, index_name)
print("The index %s is saved" % index_name)
nmslib.freeIndex(index)
if __name__ == '__main__':
print('DENSE_VECTOR', nmslib.DataType.DENSE_VECTOR)
print('SPARSE_VECTOR', nmslib.DataType.SPARSE_VECTOR)
print('OBJECT_AS_STRING', nmslib.DataType.OBJECT_AS_STRING)
print('DistType.INT', nmslib.DistType.INT)
print('DistType.FLOAT', nmslib.DistType.FLOAT)
test_vector_load()
test_vector_fresh()
test_vector_fresh(False)
test_vector_loaded()
gen_sparse_data()
test_sparse_vector_fresh()
test_string_fresh()
"""
Load an approximate nearest neighbours index from disk.
Parameters
----------
tfidf_vectors_path : str, required.
The path to the tfidf vectors of the items in the index.
ann_index_path : str, required.
The path to the ann index.
ef_search: int, optional (default = 200)
Controls speed performance at query time. Max value is 2000,
but reducing to around ~100 will increase query speed by an order
of magnitude for a small performance hit.
"""
uml_concept_alias_tfidfs = scipy.sparse.load_npz(cached_path(tfidf_vectors_path)).astype(numpy.float32)
ann_index = nmslib.init(method='hnsw', space='cosinesimil_sparse', data_type=nmslib.DataType.SPARSE_VECTOR)
ann_index.addDataPointBatch(uml_concept_alias_tfidfs)
ann_index.loadIndex(cached_path(ann_index_path))
query_time_params = {'efSearch': ef_search}
ann_index.setQueryTimeParams(query_time_params)
return ann_index
uml_concept_aliases_path = f'{model_path}/concept_aliases.json'
start_time = datetime.datetime.now()
print(f'Loading list of concepted ids from {uml_concept_aliases_path}')
uml_concept_aliases = json.load(open(uml_concept_aliases_path))
print(f'Loading tfidf vectorizer from {tfidf_vectorizer_path}')
tfidf_vectorizer = load(tfidf_vectorizer_path)
if isinstance(tfidf_vectorizer, TfidfVectorizer):
print(f'Tfidf vocab size: {len(tfidf_vectorizer.vocabulary_)}')
print(f'Loading tfidf vectors from {tfidf_vectors_path}')
uml_concept_alias_tfidfs = scipy.sparse.load_npz(tfidf_vectors_path).astype(np.float32)
print(f'Loading ann index from {ann_index_path}')
ann_index = nmslib.init(method='hnsw', space='cosinesimil_sparse', data_type=nmslib.DataType.SPARSE_VECTOR)
ann_index.addDataPointBatch(uml_concept_alias_tfidfs)
ann_index.loadIndex(ann_index_path)
query_time_params = {'efSearch': efS}
ann_index.setQueryTimeParams(query_time_params)
end_time = datetime.datetime.now()
total_time = (end_time - start_time)
print(f'Loading concept ids, vectorizer, tfidf vectors and ann index took {total_time.total_seconds()} seconds')
return uml_concept_aliases, tfidf_vectorizer, ann_index
if metric == "euclidean":
if is_sparse:
metric = "l2_sparse"
else:
metric = "l2"
elif metric == "cosine":
if is_sparse:
metric = "cosinesimil_sparse_fast"
else:
metric = "cosinesimil"
else:
raise ValueError(
"HNSW only supports cosine and euclidean distance")
if is_sparse:
data_type = nmslib.DataType.SPARSE_VECTOR
else:
data_type = nmslib.DataType.DENSE_VECTOR
if index_params is None:
index_params = {
"efConstruction": 100,
"M": 5,
"delaunay_type": 2,
"post": 0,
"indexThreadQty": self._nprocs
}
if query_params is None:
query_params = {
"efSearch": 100
}
def _check_data(self, X):
if self.data_type == nmslib.DataType.SPARSE_VECTOR and not sparse.issparse(X):
# convert to CSR matrix
X = sparse.csr_matrix(scprep.utils.to_array_or_spmatrix(X))
elif self.data_type == nmslib.DataType.DENSE_VECTOR and sparse.issparse(X):
# convert to dense matrix
X = scprep.utils.toarray(X)
else:
# convert to numpy or scipy matrix
X = scprep.utils.to_array_or_spmatrix(X)
if self.data_type is None:
# set data_type from data
if sparse.issparse(X):
self.data_type = nmslib.DataType.SPARSE_VECTOR
else:
self.data_type = nmslib.DataType.DENSE_VECTOR
if self.data_type == nmslib.DataType.SPARSE_VECTOR:
# make sure sparse matrix is CSR format
def _check_data(self, X):
if self.data_type == nmslib.DataType.SPARSE_VECTOR and not sparse.issparse(X):
# convert to CSR matrix
X = sparse.csr_matrix(scprep.utils.to_array_or_spmatrix(X))
elif self.data_type == nmslib.DataType.DENSE_VECTOR and sparse.issparse(X):
# convert to dense matrix
X = scprep.utils.toarray(X)
else:
# convert to numpy or scipy matrix
X = scprep.utils.to_array_or_spmatrix(X)
if self.data_type is None:
# set data_type from data
if sparse.issparse(X):
self.data_type = nmslib.DataType.SPARSE_VECTOR
else:
self.data_type = nmslib.DataType.DENSE_VECTOR
if self.data_type == nmslib.DataType.SPARSE_VECTOR:
# make sure sparse matrix is CSR format
X = sparse.csr_matrix(X)
# check space is compatible with sparse data
if self.space in self._DENSE_TYPES:
self.space = self._to_sparse_type(self.space)
else:
# check space is compatible with dense data
if self.space in self._SPARSE_TYPES:
self.space = self._to_dense_type(self.space)
return X
if self.data_type == nmslib.DataType.SPARSE_VECTOR and not sparse.issparse(X):
# convert to CSR matrix
X = sparse.csr_matrix(scprep.utils.to_array_or_spmatrix(X))
elif self.data_type == nmslib.DataType.DENSE_VECTOR and sparse.issparse(X):
# convert to dense matrix
X = scprep.utils.toarray(X)
else:
# convert to numpy or scipy matrix
X = scprep.utils.to_array_or_spmatrix(X)
if self.data_type is None:
# set data_type from data
if sparse.issparse(X):
self.data_type = nmslib.DataType.SPARSE_VECTOR
else:
self.data_type = nmslib.DataType.DENSE_VECTOR
if self.data_type == nmslib.DataType.SPARSE_VECTOR:
# make sure sparse matrix is CSR format
X = sparse.csr_matrix(X)
# check space is compatible with sparse data
if self.space in self._DENSE_TYPES:
self.space = self._to_sparse_type(self.space)
else:
# check space is compatible with dense data
if self.space in self._SPARSE_TYPES:
self.space = self._to_dense_type(self.space)
return X