Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete_by_index(self, index_name, query, options=None):
"""
@param index_name: name of an index to perform a query on
:type str
@param query: query that will be performed
:type IndexQuery
@param options: various operation options e.g. AllowStale or MaxOpsPerSec
:type BulkOperationOptions
@return: json
:rtype: dict
"""
path = Utils.build_path(index_name, query, options)
response = self._requests_handler.http_request_handler(path, "DELETE")
if response.status_code != 200 and response.status_code != 202:
try:
raise exceptions.ErrorResponseException(response.json()["Error"][:100])
except ValueError:
raise response.raise_for_status()
return response.json()
if index_query.sort_fields:
for field in index_query.sort_fields:
path += "&sort={0}".format(field)
if index_query.fetch:
for item in index_query.fetch:
path += "&fetch={0}".format(item)
if metadata_only:
path += "&metadata-only=true"
if index_entries_only:
path += "&debug=entries"
if includes and len(includes) > 0:
path += "".join("&include=" + item for item in includes)
response = self._requests_handler.http_request_handler(path, "GET",
force_read_from_master=force_read_from_master).json()
if "Error" in response:
raise exceptions.ErrorResponseException(response["Error"][:100])
return response
def set_response(self, response):
if response is None:
raise ValueError("response is invalid.")
if response.status_code == 201:
response = response.json()
return {"prefix": response["Prefix"], "server_tag": response["ServerTag"], "low": response["Low"],
"high": response["High"],
"last_size": response["LastSize"],
"last_range_at": response["LastRangeAt"]}
if response.status_code == 500:
raise exceptions.DatabaseDoesNotExistException(response.json()["Error"])
if response.status_code == 409:
raise exceptions.FetchConcurrencyException(response.json()["Error"])
raise exceptions.ErrorResponseException("Something is wrong with the request")
def set_response(self, response):
try:
response = response.json()
if "Error" in response:
raise exceptions.ErrorResponseException(response["Error"])
return response
except ValueError:
raise exceptions.ErrorResponseException(
"Failed to put document in the database please check the connection to the server")
def set_response(self, response):
if response is None:
raise exceptions.ErrorResponseException("Invalid Response")
try:
response = response.json()
if "Error" in response:
raise exceptions.ErrorResponseException(response["Error"])
return {"operation_id": response["OperationId"]}
except ValueError:
raise response.raise_for_status()
def delete(self, key, etag=None):
if key is None:
raise ValueError("None Key is not valid")
if not isinstance(key, str):
raise ValueError("key must be {0}".format(type("")))
headers = {}
if etag is not None:
headers["If-None-Match"] = etag
key = Utils.quote_key(key)
path = "docs/{0}".format(key)
response = self._requests_handler.http_request_handler(path, "DELETE", headers=headers)
if response.status_code != 204:
raise exceptions.ErrorResponseException(response.json()["Error"])
def set_response(self, response):
if response is None:
raise exceptions.ErrorResponseException("Invalid response")
if response.status_code != 200:
try:
json_response = response.json()
if "Error" in json_response:
raise Exception(json_response["Error"])
return response
except ValueError:
raise response.raise_for_status()
return response
def set_response(self, response):
if response is None:
return None
if response.status_code != 201:
response = response.json()
if "Error" in response:
raise exceptions.ErrorResponseException(response["Error"])
return response.raw.data
def delete_database(self, db_name, hard_delete=False):
db_name = db_name.replace("Rave/Databases/", "")
path = "databases/{0}".format(Utils.quote_key(db_name))
if hard_delete:
path += "?hard-delete=true"
response = self.requests_handler.http_request_handler(path, "DELETE", admin=True)
if response.content != '' and response.content != b'':
raise exceptions.ErrorResponseException(response.content)
return response
def set_response(self, response):
if response is None:
return None
try:
response = response.json()
if "Error" in response:
raise exceptions.ErrorResponseException(response["Error"])
except ValueError:
raise exceptions.ErrorResponseException(
"Failed to load document from the database please check the connection to the server")
return response