Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def inner(*args, **kwargs):
try:
return await action(*args, **kwargs)
except h11.ProtocolError as ex:
log.log(str(ex))
traceback.print_exc(file=sys.stderr)
sys.exit("Fn <-> FDK connectivity issue: ".format(str(ex)))
def start(handle_code: customer_code.Function,
uds: str,
loop: asyncio.AbstractEventLoop=None):
"""
Unix domain socket HTTP server entry point
:param handle_code: customer's code
:type handle_code: fdk.customer_code.Function
:param uds: path to a Unix domain socket
:type uds: str
:param loop: event loop
:type loop: asyncio.AbstractEventLoop
:return: None
"""
log.log("in http_stream.start")
socket_path = os.path.normpath(str(uds).lstrip("unix:"))
socket_dir, socket_file = os.path.split(socket_path)
if socket_file == "":
sys.exit("malformed FN_LISTENER env var "
"value: {0}".format(socket_path))
phony_socket_path = os.path.join(
socket_dir, "phony" + socket_file)
log.log("deleting socket files if they exist")
try:
os.remove(socket_path)
os.remove(phony_socket_path)
except OSError:
pass
def serialize_response_data(data, content_type):
log.log("in serialize_response_data")
if data:
if "application/json" in content_type:
return bytes(ujson.dumps(data), "utf8")
if "text/plain" in content_type:
return bytes(str(data), "utf8")
if "application/xml" in content_type:
# returns a bytearray
if isinstance(data, str):
return bytes(data, "utf8")
return ElementTree.tostring(data, encoding='utf8', method='xml')
if "application/octet-stream" in content_type:
return data
return
rtr = router.Router()
rtr.add("/call", frozenset({"POST"}),
event_handler.event_handle(handle_code))
srv = app.AsyncHTTPServer(name="fdk", router=rtr)
start_serving, server_forever = srv.run(sock=sock, loop=loop)
try:
log.log("CHMOD 666 {0}".format(phony_socket_path))
os.chmod(phony_socket_path, 0o666)
log.log("phony socket permissions: {0}"
.format(oct(os.stat(phony_socket_path).st_mode)))
log.log("calling '.start_serving()'")
start_serving()
log.log("sym-linking {0} to {1}".format(
socket_path, phony_socket_path))
os.symlink(os.path.basename(phony_socket_path), socket_path)
log.log("socket permissions: {0}"
.format(oct(os.stat(socket_path).st_mode)))
log.log("starting infinite loop")
except (Exception, BaseException) as ex:
log.log(str(ex))
raise ex
server_forever()
async def send_eof_and_close(response_writer, close_writer=True):
if response_writer.can_write_eof():
try:
log.log("sending EOF")
response_writer.write_eof()
await response_writer.drain()
finally:
if close_writer:
await close(response_writer)
:type fn_format: str
"""
self.__app_id = app_id
self.__fn_id = fn_id
self.__call_id = call_id
self.__config = config if config else {}
self.__headers = headers if headers else {}
self.__http_headers = {}
self.__deadline = deadline
self.__content_type = content_type
self._request_url = request_url
self._method = method
self.__response_headers = {}
self.__fn_format = fn_format
log.log("request headers. gateway: {0} {1}"
.format(self.__is_gateway(), headers))
if self.__is_gateway():
self.__headers = hs.decap_headers(headers, True)
self.__http_headers = hs.decap_headers(headers, False)
:param data: request data
:type data: dict
:return: resulting object of distributed function
:rtype: object
"""
log.log("income data type: {0}".format(type(data)))
log.log("data len: {}".format(len(data)))
payload = ujson.loads(data)
(self_in_bytes,
action_in_bytes, action_args, action_kwargs) = (
payload['self'],
payload['action'],
payload['args'],
payload['kwargs'])
log.log("Got {} bytes of class instance".format(len(self_in_bytes)))
log.log("Got {} bytes of function".format(len(action_in_bytes)))
log.log("string class instance unpickling")
self = dill.loads(bytes(self_in_bytes))
log.log("class instance unpickled, type: {}".format(type(self)))
log.log("string class instance function unpickling")
action = dill.loads(bytes(action_in_bytes))
log.log("class instance function unpickled, type: {}".format(type(action)))
action_args.insert(0, self)
dependencies = action_kwargs.get("dependencies", {})
log.log("cached external methods found: {0}".format(len(dependencies) > 0))
async def pure_handler(request):
log.log("in pure_handler")
data = None
if request.has_body:
log.log("has body: {}".format(request.has_body))
log.log("request comes with data")
data = await request.content.read()
response = await runner.handle_request(
handle_func, constants.HTTPSTREAM,
request=request, data=data)
log.log("request execution completed")
headers = response.context().GetResponseHeaders()
response_content_type = headers.get(
constants.CONTENT_TYPE, "application/json"
)
headers.set(constants.CONTENT_TYPE, response_content_type)
kwargs = {
"headers": headers.http_raw()
}
sdata = serialize_response_data(
response.body(), response_content_type)
if response.status() >= 500:
kwargs.update(reason=sdata, status=500)
else:
How is it different from other Python FDK functions?
- This function works with serialized Python callable objects via wire.
Each function supplied with set of external dependencies that are
represented as serialized functions, no matter if they are module-level,
class-level Python objects
:param ctx: request context
:type ctx: fdk.context.RequestContext
:param data: request data
:type data: dict
:return: resulting object of distributed function
:rtype: object
"""
log.log("income data type: {0}".format(type(data)))
log.log("data len: {}".format(len(data)))
payload = ujson.loads(data)
(self_in_bytes,
action_in_bytes, action_args, action_kwargs) = (
payload['self'],
payload['action'],
payload['args'],
payload['kwargs'])
log.log("Got {} bytes of class instance".format(len(self_in_bytes)))
log.log("Got {} bytes of function".format(len(action_in_bytes)))
log.log("string class instance unpickling")
self = dill.loads(bytes(self_in_bytes))
log.log("class instance unpickled, type: {}".format(type(self)))
def SetResponseHeaders(self, headers, status_code):
log.log("setting headers. gateway: {0}".format(self.__is_gateway()))
if self.__is_gateway():
headers = hs.encap_headers(headers, status=status_code)
for k, v in headers.items():
self.__response_headers[k.lower()] = v