Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
space_type = 'leven'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '.index'
if os.path.isfile(index_name):
os.remove(index_name)
self.index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.OBJECT_AS_STRING,
nmslib.DistType.INT)
def test_string_loaded():
DATA_STRS = ["xyz", "beagcfa", "cea", "cb",
"d", "c", "bdaf", "ddcd",
"egbfa", "a", "fba", "bcccfe",
"ab", "bfgbfdc", "bcbbgf", "bfbb"
]
QUERY_STRS = ["abc", "def", "ghik"]
space_type = 'leven'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '.index'
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.OBJECT_AS_STRING,
nmslib.DistType.INT)
for id, data in enumerate(DATA_STRS):
nmslib.addDataPoint(index, id, data)
print('Let\'s print a few data entries')
print('We have added %d data points' % nmslib.getDataPointQty(index))
for i in range(0,min(MAX_PRINT_QTY,nmslib.getDataPointQty(index))):
print(nmslib.getDataPoint(index,i))
print('Let\'s invoke the index-build process')
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
with TimeIt('fast_batch add data point'):
offset = 0
for data in read_data_fast_batch(f, 10000):
nmslib.addDataPointBatch(index, np.arange(len(data), dtype=np.int32) + offset, data)
offset += data.shape[0]
print('offset', offset)
nmslib.freeIndex(index)
if seq:
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
with TimeIt('seq add data point'):
for id, data in enumerate(read_data(f)):
nmslib.addDataPoint(index, id, data)
nmslib.freeIndex(index)
def test_vector_fresh(fast=True):
space_type = 'cosinesimil'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '.index'
if os.path.isfile(index_name):
os.remove(index_name)
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
start = time.time()
if fast:
data = read_data_fast('sample_dataset.txt')
print('data.shape', data.shape)
positions = nmslib.addDataPointBatch(index, np.arange(len(data), dtype=np.int32), data)
else:
for id, data in enumerate(read_data('sample_dataset.txt')):
pos = nmslib.addDataPoint(index, id, data)
if id != pos:
print('id %s != pos %s' % (id, pos))
def testHnswRecallL2(dataMatrix, queryMatrix, k, M=30, efC=200, efS=1000, numThreads=4):
queryQty = queryMatrix.shape[0]
indexTimeParams = {'M': M, 'indexThreadQty': numThreads, 'efConstruction': efC, 'post' : 0}
#Indexing
print('Index-time parameters', indexTimeParams)
spaceName='l2'
index = nmslib.init(method='hnsw', space=spaceName, data_type=nmslib.DataType.DENSE_VECTOR)
index.addDataPointBatch(dataMatrix)
start = time.time()
index.createIndex(indexTimeParams)
end = time.time()
print('Indexing time = %f' % (end-start))
# Querying
start = time.time()
nmslibFound = index.knnQueryBatch(queryMatrix, k=k, num_threads=numThreads)
end = time.time()
print('kNN time total=%f (sec), per query=%f (sec), per query adjusted for thread number=%f (sec)' %
(end - start, float(end - start) / queryQty, numThreads * float(end - start) / queryQty))
method_name = 'small_world_rand'
index_name = method_name + '.index'
if os.path.isfile(index_name):
os.remove(index_name)
f = '/tmp/foo.txt'
if not os.path.isfile(f):
print('creating %s' % f)
np.savetxt(f, np.random.rand(100000,1000), delimiter="\t")
print('done')
if fast:
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
with TimeIt('fast add data point'):
data = read_data_fast(f)
nmslib.addDataPointBatch(index, np.arange(len(data), dtype=np.int32), data)
nmslib.freeIndex(index)
if fast_batch:
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
with TimeIt('fast_batch add data point'):
offset = 0
for data in read_data_fast_batch(f, 10000):
os.remove(index_name)
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.DENSE_VECTOR,
nmslib.DistType.FLOAT)
start = time.time()
if fast:
data = read_data_fast('sample_dataset.txt')
print('data.shape', data.shape)
positions = nmslib.addDataPointBatch(index, np.arange(len(data), dtype=np.int32), data)
else:
for id, data in enumerate(read_data('sample_dataset.txt')):
pos = nmslib.addDataPoint(index, id, data)
if id != pos:
print('id %s != pos %s' % (id, pos))
sys.exit(1)
end = time.time()
print('added data in %s secs' % (end - start))
print('Let\'s print a few data entries')
print('We have added %d data points' % nmslib.getDataPointQty(index))
print("Distance between points (0,0) " + str(nmslib.getDistance(index, 0, 0)));
print("Distance between points (1,1) " + str(nmslib.getDistance(index, 1, 1)));
print("Distance between points (0,1) " + str(nmslib.getDistance(index, 0, 1)));
print("Distance between points (1,0) " + str(nmslib.getDistance(index, 1, 0)));
for i in range(0,min(MAX_PRINT_QTY,nmslib.getDataPointQty(index))):
print(nmslib.getDataPoint(index, i))
]
QUERY_STRS = ["abc", "def", "ghik"]
space_type = 'leven'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '.index'
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.OBJECT_AS_STRING,
nmslib.DistType.INT)
for id, data in enumerate(DATA_STRS):
nmslib.addDataPoint(index, id, data)
print('Let\'s print a few data entries')
print('We have added %d data points' % nmslib.getDataPointQty(index))
for i in range(0,min(MAX_PRINT_QTY,nmslib.getDataPointQty(index))):
print(nmslib.getDataPoint(index,i))
print('Let\'s invoke the index-build process')
index_param = ['NN=17', 'efConstruction=50', 'indexThreadQty=4']
query_time_param = ['efSearch=50']
nmslib.loadIndex(index, index_name)
print("The index %s is loaded" % index_name)
method_name = 'small_world_rand'
index_name = method_name + '.index'
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.OBJECT_AS_STRING,
nmslib.DistType.INT)
if batch:
print('DATA_STRS', DATA_STRS)
positions = nmslib.addDataPointBatch(index, np.arange(len(DATA_STRS), dtype=np.int32), DATA_STRS)
else:
for id, data in enumerate(DATA_STRS):
nmslib.addDataPoint(index, id, data)
print('Let\'s print a few data entries')
print('We have added %d data points' % nmslib.getDataPointQty(index))
print("Distance between points (0,0) " + str(nmslib.getDistance(index, 0, 0)));
print("Distance between points (1,1) " + str(nmslib.getDistance(index, 1, 1)));
print("Distance between points (0,1) " + str(nmslib.getDistance(index, 0, 1)));
print("Distance between points (1,0) " + str(nmslib.getDistance(index, 1, 0)));
for i in range(0,min(MAX_PRINT_QTY,nmslib.getDataPointQty(index))):
print(nmslib.getDataPoint(index,i))
print('Let\'s invoke the index-build process')
index_param = ['NN=17', 'efConstruction=50', 'indexThreadQty=4']
query_time_param = ['efSearch=50']
def test_sparse_vector_fresh():
space_type = 'cosinesimil_sparse_fast'
space_param = []
method_name = 'small_world_rand'
index_name = method_name + '_sparse.index'
if os.path.isfile(index_name):
os.remove(index_name)
index = nmslib.init(
space_type,
space_param,
method_name,
nmslib.DataType.SPARSE_VECTOR,
nmslib.DistType.FLOAT)
for id, data in enumerate(read_sparse_data('sample_sparse_dataset.txt')):
nmslib.addDataPoint(index, id, data)
print('We have added %d data points' % nmslib.getDataPointQty(index))
for i in range(0,min(MAX_PRINT_QTY,nmslib.getDataPointQty(index))):
print(nmslib.getDataPoint(index,i))
print('Let\'s invoke the index-build process')
index_param = ['NN=17', 'efConstruction=50', 'indexThreadQty=4']
query_time_param = ['efSearch=50']
nmslib.createIndex(index, index_param)
print('The index is created')
nmslib.setQueryTimeParams(index,query_time_param)