Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _process_body(self, data, gzipped):
if gzipped:
return force_utf8(self._gunzip_body(data))
return data
def __str__(self):
params = {}
request_keys = ("method", "host", "url", "data", "headers", "timeout", "body")
result_attrs = ("status", "reason", "msg", "body", "headers")
params["error"] = self.err
params["error_message"] = "Hubspot Error"
if self.result.body:
try:
result_body = json.loads(force_utf8(self.result.body))
except ValueError:
result_body = {}
params["error_message"] = result_body.get("message", "Hubspot Error")
for key in request_keys:
params[key] = self.request.get(key)
for attr in result_attrs:
params["result_{}".format(attr)] = getattr(self.result, attr, "")
params = self._dict_vals_to_str(params)
return self.as_str_template.format(**params)
def _dict_vals_to_str(self, data):
unicode_data = {}
for key, val in data.items():
if not val:
unicode_data[key] = ""
if isinstance(val, bytes):
unicode_data[key] = force_utf8(val)
elif isinstance(val, str):
unicode_data[key] = val
else:
unicode_data[key] = str(type(val))
return unicode_data
def _digest_result(self, data):
if data:
if isinstance(data, bytes):
data = utils.force_utf8(data)
if isinstance(data, str):
try:
data = json.loads(data)
except ValueError:
pass
return data