Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@replicated
def put_document(self, index_name, doc_id, fields):
return self.__put_document(index_name, doc_id, fields)
@replicated
def open_index(self, index_name, index_config=None):
return self.__open_index(index_name, index_config=index_config)
@replicated
def addValue(self, value, cn):
self.__counter += value
return self.__counter, cn
@replicated
def delete_index(self, index_name):
return self.__delete_index(index_name)
@replicated
def incCounter(self):
self.__counter += 1
return self.__counter
@replicated
def pop(self, key):
self.__data.pop(key, None)
@replicated
def pop(self, key):
self.__data.pop(key, None)
@replicated
def put_document(self, index_name, doc_id, fields):
start_time = time.time()
try:
count = self.__put_document(index_name, doc_id, fields)
except Exception as ex:
count = -1
self.__logger.error('failed to put {0} in {1}: {2}'.format(doc_id, index_name, ex))
finally:
self.__record_core_metrics(start_time, inspect.getframeinfo(inspect.currentframe())[2])
return count
@replicated
def _delete(self, key, recursive=False, **kwargs):
if recursive:
for k in list(self.__data.keys()):
if k.startswith(key):
self.__pop(k)
elif not self.__check_requirements(self.__data.get(key, {}), **kwargs):
return False
else:
self.__pop(key)
return True
@replicated
def open_index(self, index_name, schema=None):
start_time = time.time()
try:
index = self.__open_index(index_name, schema=schema)
finally:
self.__record_core_metrics(start_time, inspect.getframeinfo(inspect.currentframe())[2])
return index