Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _render_request(self, request):
"""
Receives an HTTP/POST|PUT request, and then calls the Publisher/Caller
processor.
"""
# read HTTP/POST|PUT body
body = request.content.read()
args = {native_string(x): y[0] for x, y in request.args.items()}
headers = request.requestHeaders
# check content type + charset encoding
#
content_type_header = headers.getRawHeaders(b"content-type", [])
if len(content_type_header) > 0:
content_type_elements = [
x.strip().lower()
for x in content_type_header[0].split(b";")
]
else:
content_type_elements = []
if self.decode_as_json:
# if the client sent a content type, it MUST be one of _ALLOWED_CONTENT_TYPES
given=content_type_elements[0],
log_category="AR452"
)
encoding_parts = {}
if len(content_type_elements) > 1:
try:
for item in content_type_elements:
if b"=" not in item:
# Don't bother looking at things "like application/json"
continue
# Parsing things like:
# charset=utf-8
_ = native_string(item).split("=")
assert len(_) == 2
# We don't want duplicates
key = _[0].strip().lower()
assert key not in encoding_parts
encoding_parts[key] = _[1].strip().lower()
except:
return self._deny_request(request, 400, log_category="AR450")
charset_encoding = encoding_parts.get("charset", "utf-8")
if charset_encoding not in ["utf-8", 'utf8']:
return self._deny_request(
request, 400,
log_category="AR450")
def _process(self, request, event):
# The topic we're going to send to
topic = self._options["topic"]
message = {}
message["headers"] = {
native_string(x): [native_string(z) for z in y]
for x, y in request.requestHeaders.getAllRawHeaders()}
message["body"] = event
publish_options = PublishOptions(acknowledge=True)
def _succ(result):
response_text = self._options.get("success_response", "OK").encode('utf8')
return self._complete_request(
request, 202, response_text,
reason="Successfully sent webhook from {ip} to {topic}",
topic=topic,
ip=request.getClientIP(),
log_category="AR201",
)
def _err(result):
#
if 'key' in args:
key_str = args["key"]
else:
if self._secret:
return self._deny_request(
request, 400,
reason="'key' field missing",
log_category="AR461")
# timestamp
#
if 'timestamp' in args:
timestamp_str = args["timestamp"]
try:
ts = datetime.datetime.strptime(native_string(timestamp_str), "%Y-%m-%dT%H:%M:%S.%fZ")
delta = abs((ts - datetime.datetime.utcnow()).total_seconds())
if self._timestamp_delta_limit and delta > self._timestamp_delta_limit:
return self._deny_request(
request, 400,
log_category="AR464")
except ValueError:
return self._deny_request(
request, 400,
reason="invalid timestamp '{0}' (must be UTC/ISO-8601, e.g. '2011-10-14T16:59:51.123Z')".format(native_string(timestamp_str)),
log_category="AR462")
else:
if self._secret:
return self._deny_request(
request, 400, reason="signed request required, but mandatory 'timestamp' field missing",
log_category="AR461")
signature_str = args["signature"]
else:
if self._secret:
return self._deny_request(
request, 400,
reason="'signature' field missing",
log_category="AR461")
# do more checks if signed requests are required
#
if self._secret:
if key_str != self._key:
return self._deny_request(
request, 401,
reason="unknown key '{0}' in signed request".format(native_string(key_str)),
log_category="AR460")
# Compute signature: HMAC[SHA256]_{secret} (key | timestamp | seq | nonce | body) => signature
hm = hmac.new(self._secret, None, hashlib.sha256)
hm.update(key_str)
hm.update(timestamp_str)
hm.update(seq_str)
hm.update(nonce_str)
hm.update(body)
signature_recomputed = base64.urlsafe_b64encode(hm.digest())
if signature_str != signature_recomputed:
return self._deny_request(request, 401,
log_category="AR459")
else:
self.log.debug("REST request signature valid.",
return self._deny_request(
request, 400,
reason="'seq' field missing",
log_category="AR461")
# nonce
#
if 'nonce' in args:
nonce_str = args["nonce"]
try:
# FIXME: check nonce
nonce = int(nonce_str) # noqa
except:
return self._deny_request(
request, 400,
reason="invalid nonce '{0}' (must be an integer)".format(native_string(nonce_str)),
log_category="AR462")
else:
if self._secret:
return self._deny_request(
request, 400,
reason="'nonce' field missing",
log_category="AR461")
# signature
#
if 'signature' in args:
signature_str = args["signature"]
else:
if self._secret:
return self._deny_request(
request, 400,
# timestamp
#
if 'timestamp' in args:
timestamp_str = args["timestamp"]
try:
ts = datetime.datetime.strptime(native_string(timestamp_str), "%Y-%m-%dT%H:%M:%S.%fZ")
delta = abs((ts - datetime.datetime.utcnow()).total_seconds())
if self._timestamp_delta_limit and delta > self._timestamp_delta_limit:
return self._deny_request(
request, 400,
log_category="AR464")
except ValueError:
return self._deny_request(
request, 400,
reason="invalid timestamp '{0}' (must be UTC/ISO-8601, e.g. '2011-10-14T16:59:51.123Z')".format(native_string(timestamp_str)),
log_category="AR462")
else:
if self._secret:
return self._deny_request(
request, 400, reason="signed request required, but mandatory 'timestamp' field missing",
log_category="AR461")
# seq
#
if 'seq' in args:
seq_str = args["seq"]
try:
# FIXME: check sequence
seq = int(seq_str) # noqa
except:
return self._deny_request(
if self._secret:
return self._deny_request(
request, 400, reason="signed request required, but mandatory 'timestamp' field missing",
log_category="AR461")
# seq
#
if 'seq' in args:
seq_str = args["seq"]
try:
# FIXME: check sequence
seq = int(seq_str) # noqa
except:
return self._deny_request(
request, 400,
reason="invalid sequence number '{0}' (must be an integer)".format(native_string(seq_str)),
log_category="AR462")
else:
if self._secret:
return self._deny_request(
request, 400,
reason="'seq' field missing",
log_category="AR461")
# nonce
#
if 'nonce' in args:
nonce_str = args["nonce"]
try:
# FIXME: check nonce
nonce = int(nonce_str) # noqa
except: