Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_request(self, server_node):
self.url = (f"{server_node.url}/databases/{server_node.database}"
f"/timeseries?docId={Utils.quote_key(self._document_id)}"
f"{f'&start={self._start}' if self._start > 0 else ''}"
f"{f'&pageSize={self._page_size}' if self._page_size < sys.maxsize else ''}")
for range_ in self._ranges:
self.url += (f"&name={Utils.quote_key(range_.name)}"
f"&from={Utils.datetime_to_string(range_.from_date)}"
def _json_default(o):
if o is None:
return None
if isinstance(o, datetime):
return Utils.datetime_to_string(o)
elif isinstance(o, timedelta):
return Utils.timedelta_to_str(o)
elif getattr(o, "__dict__", None):
return o.__dict__
elif isinstance(o, set):
return list(o)
elif isinstance(o, int) or isinstance(o, float):
return str(o)
else:
raise TypeError(repr(o) + " is not JSON serializable (Try add a json default method to store convention)")
def to_json(self):
_dict = {"Timestamp": Utils.datetime_to_string(self.timestamp), "Values": self.values}
if self.tag:
_dict["Tag"] = self.tag
return _dict
def to_json(self):
return {"From": Utils.datetime_to_string(self.from_date), "To": Utils.datetime_to_string(self.to_date)}
def to_json(self):
return {"Query": self.query,
"ChangeVectorForNextBatchStartingPoint": self.change_vector_for_next_batch_starting_point,
"SubscriptionId": self.subscription_id, "SubscriptionName": self.subscription_name,
"LastTimeServerMadeProgressWithDocuments": Utils.datetime_to_string(
self.last_time_server_made_progress_with_documents),
"LastClientConnectionTime": Utils.datetime_to_string(self.last_client_connection_time),
"Disabled": self.closed}
def to_json(self):
return {"Query": self.query,
"ChangeVectorForNextBatchStartingPoint": self.change_vector_for_next_batch_starting_point,
"SubscriptionId": self.subscription_id, "SubscriptionName": self.subscription_name,
"LastTimeServerMadeProgressWithDocuments": Utils.datetime_to_string(
self.last_time_server_made_progress_with_documents),
"LastClientConnectionTime": Utils.datetime_to_string(self.last_client_connection_time),
"Disabled": self.closed}