Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse(self, url:bytes):
parsed = httptools.parse_url(url)
return (parsed.schema, parsed.host, parsed.port, parsed.path,
parsed.query, parsed.fragment, parsed.userinfo)
def __init__(self, url_bytes, headers, version, method):
# TODO: Content-Encoding detection
url_parsed = parse_url(url_bytes)
self.url = url_parsed.path.decode('utf-8')
self.headers = headers
self.version = version
self.method = method
self.query_string = url_parsed.query.decode('utf-8') if url_parsed.query else None
# Init but do not inhale
self.body = None
self.parsed_json = None
self.parsed_form = None
self.parsed_files = None
self.parsed_args = None
def on_url(self, url: bytes):
self.request.method = self.parser.get_method().decode().upper()
self.request.url = url
parsed = parse_url(url)
self.request.path = unquote(parsed.path.decode())
self.request.query_string = (parsed.query or b'').decode()
self.app.lookup(self.request)
def on_url(self, url):
method = self.parser.get_method()
parsed_url = httptools.parse_url(url)
raw_path = parsed_url.path
path = raw_path.decode("ascii")
if "%" in path:
path = urllib.parse.unquote(path)
self.url = url
self.expect_100_continue = False
self.headers = []
self.scope = {
"type": "http",
"http_version": "1.1",
"server": self.server,
"client": self.client,
"scheme": self.scheme,
"method": method.decode("ascii"),
"root_path": self.root_path,
"path": path,
async def handle(self, request, response):
url = httptools.parse_url(request.url)
url_path = url.path.strip(b'/')
if url_path == b'explore':
response.write(explore.EXPLORE_HTML, content_type='text/html')
return
try:
operation_name = None
variables = None
if request.method == 'POST':
if request.content_type and 'json' in request.content_type:
body = json.loads(request.body)
query = body['query']
operation_name = body.get('operationName')
if operation_name:
assert isinstance(operation_name, str)
def __init__(self, url_bytes: bytes, headers: dict,
version: str, method: str, transport) -> None:
self._parsed_url = parse_url(url_bytes)
self.app = None
self.headers = headers
self.version = version
self.method = method
self.transport = transport
# Init but do not inhale
self.body = []
self._parsed_json = _empty
self._parsed_jsonrpc = _empty
self.uri_template = None
self.stream = None
self.is_batch_jrpc = False
self.is_single_jrpc = False
def on_url(self, url):
self.ctx = Context()
self.ctx.write = self.transport.write
url = httptools.parse_url(url)
self.ctx.req.path = url.path.decode()
self.ctx.req.method = self.parser.get_method().decode()
def on_url(self, url: bytes):
url = parse_url(url)
self.msg.path = url.path.decode()
query = url.query
self.msg.params = dict(parse_qsl(query.decode())) if query else {}
def handle(self, request, response):
parsed_url = httptools.parse_url(self._current_url)
payload_size = parsed_url.path.decode('ascii')[1:]
if not payload_size:
payload_size = 1024
else:
payload_size = int(payload_size)
resp = _RESP_CACHE.get(payload_size)
if resp is None:
resp = b'X' * payload_size
_RESP_CACHE[payload_size] = resp
response.write(resp)
if not self._current_parser.should_keep_alive():
self._transport.close()
self._current_parser = None
self._current_request = None
def on_url(self, url: bytes):
self.request.url = url
parsed = parse_url(url)
self.request.path = unquote(parsed.path.decode())
self.request.query_string = (parsed.query or b'').decode()