Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _decode_numbered_json_thing(self, thing):
thing = thing.decode().strip()
for ping in re.findall('\d+:\d+"primus::ping::\d+"', thing):
logger.debug("Received ping: {}".format(ping))
self.post_data(ping.replace("::ping::", "::pong::"))
results = []
for blob in re.findall("\d+:\d+(\{.*?\})(?=\d|$)", thing):
results.append(json.loads(blob))
if thing and not results and "::ping::" not in thing:
logger.debug("Could not parse monitoring response: {}".format(thing))
return results
def poll(self, retries=10):
logger.debug("Starting new long-poll request")
try:
response = self.client.session.get(
"{}?sessionId={}&EIO=3&transport=polling&sid={}".format(
self.root_url, self.session_id, self.sid
)
)
response.raise_for_status()
except HTTPError as e:
try:
message = "{} / {}".format(response.content, e)
except:
message = "{}".format(e)
logger.warn(
"Problem with submitting polling request: {} (will retry {} more times)".format(
message, retries
)
if ids is True:
ids = list(self._values.get(table, {}).keys())
if isinstance(ids, str):
ids = [ids]
# if we're in a transaction, add the requested IDs to a queue to refresh when the transaction completes
if self._client.in_transaction():
self._records_to_refresh[table] = list(
set(self._records_to_refresh.get(table, []) + ids)
)
continue
requestlist += [{"table": table, "id": extract_id(id)} for id in ids]
if requestlist:
logger.debug(
"Calling 'getRecordValues' endpoint for requests: {}".format(
requestlist
)
)
results = self._client.post(
"getRecordValues", {"requests": requestlist}
).json()["results"]
for request, result in zip(requestlist, results):
self._update_record(
request["table"],
request["id"],
value=result.get("value"),
role=result.get("role"),
)
def post(self, endpoint, data):
"""
All API requests on Notion.so are done as POSTs (except the websocket communications).
"""
url = urljoin(API_BASE_URL, endpoint)
response = self.session.post(url, json=data)
if response.status_code == 400:
logger.error(
"Got 400 error attempting to POST to {}, with data: {}".format(
endpoint, json.dumps(data, indent=2)
)
)
raise HTTPError(
response.json().get(
"message", "There was an error (400) submitting the request."
)
)
response.raise_for_status()
return response
response.raise_for_status()
except HTTPError as e:
try:
message = "{} / {}".format(response.content, e)
except:
message = "{}".format(e)
logger.warn(
"Problem with submitting polling request: {} (will retry {} more times)".format(
message, retries
)
)
time.sleep(0.1)
if retries <= 0:
raise
if retries <= 5:
logger.error(
"Persistent error submitting polling request: {} (will retry {} more times)".format(
message, retries
)
)
# if we're close to giving up, also try reinitializing the session
self.initialize()
self.poll(retries=retries - 1)
self._refresh_updated_records(
self._decode_numbered_json_thing(response.content)
)
logger.debug("Firing callback {} with kwargs: {}".format(self.callback, kwargs))
# trim down the parameters we'll be passing, to include only those the callback will accept
params = signature(self.callback).parameters
if not any(["**" in str(param) for param in params.values()]):
# there's no "**kwargs" in the callback signature, so remove any unaccepted params
for arg in list(kwargs.keys()):
if arg not in params:
del kwargs[arg]
# perform the callback, gracefully handling any exceptions
try:
# trigger the callback within its own thread, so it won't block others if it's long-running
threading.Thread(target=self.callback, kwargs=kwargs, daemon=True).start()
except Exception as e:
logger.error(
"Error while processing callback for {}: {}".format(
repr(self.record), repr(e)
)
def __call__(self, difference, old_val, new_val):
kwargs = {}
kwargs.update(self.extra_kwargs)
kwargs["record"] = self.record
kwargs["callback_id"] = self.callback_id
kwargs["difference"] = difference
kwargs["changes"] = self.record._convert_diff_to_changelist(
difference, old_val, new_val
)
logger.debug("Firing callback {} with kwargs: {}".format(self.callback, kwargs))
# trim down the parameters we'll be passing, to include only those the callback will accept
params = signature(self.callback).parameters
if not any(["**" in str(param) for param in params.values()]):
# there's no "**kwargs" in the callback signature, so remove any unaccepted params
for arg in list(kwargs.keys()):
if arg not in params:
del kwargs[arg]
# perform the callback, gracefully handling any exceptions
try:
# trigger the callback within its own thread, so it won't block others if it's long-running
threading.Thread(target=self.callback, kwargs=kwargs, daemon=True).start()
except Exception as e:
logger.error(
"Error while processing callback for {}: {}".format(
def poll(self, retries=10):
logger.debug("Starting new long-poll request")
try:
response = self.client.session.get(
"{}?sessionId={}&EIO=3&transport=polling&sid={}".format(
self.root_url, self.session_id, self.sid
)
)
response.raise_for_status()
except HTTPError as e:
try:
message = "{} / {}".format(response.content, e)
except:
message = "{}".format(e)
logger.warn(
"Problem with submitting polling request: {} (will retry {} more times)".format(
message, retries
)
)
time.sleep(0.1)
if retries <= 0:
raise
if retries <= 5:
logger.error(
"Persistent error submitting polling request: {} (will retry {} more times)".format(
message, retries
)
)
# if we're close to giving up, also try reinitializing the session
self.initialize()
self.poll(retries=retries - 1)