Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testHnswRecallL2(dataMatrix, queryMatrix, k, M=30, efC=200, efS=1000, numThreads=4):
queryQty = queryMatrix.shape[0]
indexTimeParams = {'M': M, 'indexThreadQty': numThreads, 'efConstruction': efC, 'post' : 0}
#Indexing
print('Index-time parameters', indexTimeParams)
spaceName='l2'
index = nmslib.init(method='hnsw', space=spaceName, data_type=nmslib.DataType.DENSE_VECTOR)
index.addDataPointBatch(dataMatrix)
start = time.time()
index.createIndex(indexTimeParams)
end = time.time()
print('Indexing time = %f' % (end-start))
# Querying
start = time.time()
nmslibFound = index.knnQueryBatch(queryMatrix, k=k, num_threads=numThreads)
end = time.time()
print('kNN time total=%f (sec), per query=%f (sec), per query adjusted for thread number=%f (sec)' %
(end - start, float(end - start) / queryQty, numThreads * float(end - start) / queryQty))
def setUp(self):
space_type = 'cosinesimil'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '.index'
if os.path.isfile(index_name):
os.remove(index_name)
self.index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
def test_vector_fresh(fast=True):
space_type = 'cosinesimil'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '.index'
if os.path.isfile(index_name):
os.remove(index_name)
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
start = time.time()
if fast:
data = read_data_fast('sample_dataset.txt')
print('data.shape', data.shape)
positions = nmslib.addDataPointBatch(index, np.arange(len(data), dtype=np.int32), data)
else:
for id, data in enumerate(read_data('sample_dataset.txt')):
pos = nmslib.addDataPoint(index, id, data)
if id != pos:
print('id %s != pos %s' % (id, pos))
sys.exit(1)
end = time.time()
print('added data in %s secs' % (end - start))
print("Results for the freshly created index:")
k = 3
for idx, data in enumerate(read_data_as_string('sample_queryset.txt')):
print(idx, nmslib.knnQuery(index, k, data))
nmslib.saveIndex(index, index_name)
print("The index %s is saved" % index_name)
nmslib.freeIndex(index)
if __name__ == '__main__':
print('DENSE_VECTOR', nmslib.DataType.DENSE_VECTOR)
print('SPARSE_VECTOR', nmslib.DataType.SPARSE_VECTOR)
print('OBJECT_AS_STRING', nmslib.DataType.OBJECT_AS_STRING)
print('DistType.INT', nmslib.DistType.INT)
print('DistType.FLOAT', nmslib.DistType.FLOAT)
test_vector_load()
test_vector_fresh()
test_vector_fresh(False)
test_vector_loaded()
gen_sparse_data()
test_sparse_vector_fresh()
vectors = unpickle(args.vectors_path)
dataset = read_csv(args.dataset_path)
# iterator = SQLiteDataIterator(data_url=args.database_url)
questions = [item['question'] for item in dataset]
answers = [item['answer'] for item in dataset]
data_matrix = vectors[0][1]
query_matrix = numpy.array([item[0] for item in vectors]).squeeze()
M = 15
efC = 100
num_threads = 4
space_name = 'l1'
index_time_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC}
index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR)
index.addDataPointBatch(data_matrix)
index.createIndex(index_time_params)
efS = 100
query_time_params = {'efSearch': efS}
index.setQueryTimeParams(query_time_params)
# db_size = len(iterator.doc_ids)
sentences_size = len(vectors[0][1])
correct_answers = 0
start_time = time.time()
try:
mapping = {}
questions = [item['question'] for item in dataset]
answers = [item['answer'] for item in dataset]
data_matrix = vectors[0][1]
query_matrix = numpy.array([item[0] for item in vectors]).squeeze()
M = 15
efC = 500
num_threads = 4
index_time_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC, 'post': 0,
'skip_optimized_index': 1 # using non-optimized index!
}
space_name = 'l2'
index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR)
index.addDataPointBatch(data_matrix)
index.createIndex(index_time_params)
# M = 15
# efC = 100
# num_threads = 4
# space_name = 'l2'
# index_time_params = {'M': M, 'indexThreadQty': num_threads, 'efConstruction': efC}
# index = nmslib.init(method='hnsw', space=space_name, data_type=nmslib.DataType.DENSE_VECTOR)
# index.addDataPointBatch(data_matrix)
# index.createIndex(index_time_params)
efS = 100
query_time_params = {'efSearch': efS}
index.setQueryTimeParams(query_time_params)
def fit(self, X):
import nmslib
self._index = nmslib.init(
self._nmslib_metric,
[],
self._method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
for i, x in enumerate(X):
nmslib.addDataPoint(self._index, i, x.tolist())
if os.path.exists(self._index_name):
logging.debug("Loading index from file")
nmslib.loadIndex(self._index, self._index_name)
else:
logging.debug("Create Index")
nmslib.createIndex(self._index, self._index_param)
if self._save_index:
nmslib.saveIndex(self._index, self._index_name)
nmslib.setQueryTimeParams(self._index, self._query_param)
metric = "l2_sparse"
else:
metric = "l2"
elif metric == "cosine":
if is_sparse:
metric = "cosinesimil_sparse_fast"
else:
metric = "cosinesimil"
else:
raise ValueError(
"HNSW only supports cosine and euclidean distance")
if is_sparse:
data_type = nmslib.DataType.SPARSE_VECTOR
else:
data_type = nmslib.DataType.DENSE_VECTOR
if index_params is None:
index_params = {
"efConstruction": 100,
"M": 5,
"delaunay_type": 2,
"post": 0,
"indexThreadQty": self._nprocs
}
if query_params is None:
query_params = {
"efSearch": 100
}
# create index
def _check_data(self, X):
if self.data_type == nmslib.DataType.SPARSE_VECTOR and not sparse.issparse(X):
# convert to CSR matrix
X = sparse.csr_matrix(scprep.utils.to_array_or_spmatrix(X))
elif self.data_type == nmslib.DataType.DENSE_VECTOR and sparse.issparse(X):
# convert to dense matrix
X = scprep.utils.toarray(X)
else:
# convert to numpy or scipy matrix
X = scprep.utils.to_array_or_spmatrix(X)
if self.data_type is None:
# set data_type from data
if sparse.issparse(X):
self.data_type = nmslib.DataType.SPARSE_VECTOR
else:
self.data_type = nmslib.DataType.DENSE_VECTOR
if self.data_type == nmslib.DataType.SPARSE_VECTOR:
# make sure sparse matrix is CSR format
X = sparse.csr_matrix(X)
# check space is compatible with sparse data
if self.space in self._DENSE_TYPES:
self.space = self._to_sparse_type(self.space)
else:
# check space is compatible with dense data
if self.space in self._SPARSE_TYPES:
self.space = self._to_dense_type(self.space)
return X
def _check_data(self, X):
if self.data_type == nmslib.DataType.SPARSE_VECTOR and not sparse.issparse(X):
# convert to CSR matrix
X = sparse.csr_matrix(scprep.utils.to_array_or_spmatrix(X))
elif self.data_type == nmslib.DataType.DENSE_VECTOR and sparse.issparse(X):
# convert to dense matrix
X = scprep.utils.toarray(X)
else:
# convert to numpy or scipy matrix
X = scprep.utils.to_array_or_spmatrix(X)
if self.data_type is None:
# set data_type from data
if sparse.issparse(X):
self.data_type = nmslib.DataType.SPARSE_VECTOR
else:
self.data_type = nmslib.DataType.DENSE_VECTOR
if self.data_type == nmslib.DataType.SPARSE_VECTOR:
# make sure sparse matrix is CSR format
X = sparse.csr_matrix(X)
# check space is compatible with sparse data
if self.space in self._DENSE_TYPES: