Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def where_between(self, field_name, start, end, exact=False):
field_name = Query.escape_if_needed(field_name)
self._add_operator_if_needed()
self.negate_if_needed(field_name)
if isinstance(start, timedelta):
start = Utils.timedelta_tick(start)
if isinstance(end, timedelta):
end = Utils.timedelta_tick(end)
from_parameter_name = self.add_query_parameter("*" if start is None else start)
to_parameter_name = self.add_query_parameter("NULL" if end is None else end)
token = _Token(field_name=field_name, token="between", value=(from_parameter_name, to_parameter_name),
exact=exact)
token.write = self.rql_where_write(token)
self._where_tokens.append(token)
return self
def where_less_than_or_equal(self, field_name, value):
field_name = Query.escape_if_needed(field_name)
self._add_operator_if_needed()
self.negate_if_needed(field_name)
if isinstance(value, timedelta):
value = Utils.timedelta_tick(value)
token = _Token(field_name=field_name, token="less_than_or_equal", value=self.add_query_parameter(value))
token.write = self.rql_where_write(token)
self._where_tokens.append(token)
return self
def where_between(self, field_name, start, end, exact=False):
field_name = Query.escape_if_needed(field_name)
self._add_operator_if_needed()
self.negate_if_needed(field_name)
if isinstance(start, timedelta):
start = Utils.timedelta_tick(start)
if isinstance(end, timedelta):
end = Utils.timedelta_tick(end)
from_parameter_name = self.add_query_parameter("*" if start is None else start)
to_parameter_name = self.add_query_parameter("NULL" if end is None else end)
token = _Token(field_name=field_name, token="between", value=(from_parameter_name, to_parameter_name),
exact=exact)
token.write = self.rql_where_write(token)
self._where_tokens.append(token)
return self
def get_query_hash(self):
query_hash = xxhash.xxh64()
query_hash.update(self.query)
query_hash.update(bytes(self.wait_for_non_stale_results))
if self.wait_for_non_stale_results_timeout:
query_hash.update(bytes(Utils.timedelta_tick(self.wait_for_non_stale_results_timeout)))
if self.cutoff_etag:
query_hash.update(bytes(self.cutoff_etag))
query_hash.update(bytes(self.start))
query_hash.update(self.page_size.to_bytes(8, byteorder='big'))
if self.query_parameters:
str_query_parameters = json.dumps(self.query_parameters).encode('utf-8')
query_hash.update(str_query_parameters)
if self.facet_setup_doc:
query_hash.update(bytes(self.facet_setup_doc))
if self.facets:
json_facets = [facet.to_json() for facet in self.facets]
str_facets = json.dumps(json_facets).encode('utf-8')
query_hash.update(str_facets)
return query_hash.intdigest()
def where_greater_than_or_equal(self, field_name, value):
field_name = Query.escape_if_needed(field_name)
self._add_operator_if_needed()
self.negate_if_needed(field_name)
if isinstance(value, timedelta):
value = Utils.timedelta_tick(value)
token = _Token(field_name=field_name, token="greater_than_or_equal", value=self.add_query_parameter(value))
token.write = self.rql_where_write(token)
self._where_tokens.append(token)
return self
def get_query_hash(self):
query_hash = xxhash.xxh64()
query_hash.update(self.query)
query_hash.update(bytes(self.wait_for_non_stale_results))
if self.wait_for_non_stale_results_timeout:
query_hash.update(bytes(Utils.timedelta_tick(self.wait_for_non_stale_results_timeout)))
query_hash.update(bytes(self.show_timings))
if self.cutoff_etag:
query_hash.update(bytes(self.cutoff_etag))
query_hash.update(bytes(self.start))
query_hash.update(self.page_size.to_bytes(8, byteorder='big'))
if self.query_parameters:
str_query_parameters = json.dumps(self.query_parameters).encode('utf-8')
query_hash.update(str_query_parameters)
return query_hash.intdigest()
def where_greater_than(self, field_name, value):
field_name = Query.escape_if_needed(field_name)
self._add_operator_if_needed()
self.negate_if_needed(field_name)
if isinstance(value, timedelta):
value = Utils.timedelta_tick(value)
token = _Token(field_name=field_name, token="greater_than", value=self.add_query_parameter(value))
token.write = self.rql_where_write(token)
self._where_tokens.append(token)
return self
def where_less_than(self, field_name, value):
field_name = Query.escape_if_needed(field_name)
self._add_operator_if_needed()
self.negate_if_needed(field_name)
if isinstance(value, timedelta):
value = Utils.timedelta_tick(value)
token = _Token(field_name=field_name, token="less_than", value=self.add_query_parameter(value))
token.write = self.rql_where_write(token)
self._where_tokens.append(token)
return self