Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def socket_handler(listener):
sock = listener.accept()[0]
e = Encoder()
# We get two messages for the connection open and then a HEADERS
# frame.
receive_preamble(sock)
sock.recv(65535)
# Wait for request
req_event.wait(5)
# Now, send the headers for the response.
f = build_headers_frame(
[(':status', '200'), ('content-length', '14')],
e
)
f.stream_id = 1
sock.send(f.serialize())
def refresh_encoder(self):
self.encoder = Encoder()
def __init__(self):
self.encoder = Encoder()
def __init__(self):
self.encoder = Encoder()
def build_headers_frame(headers, encoder=None):
f = HeadersFrame(1)
e = encoder
if e is None:
e = Encoder()
e.huffman_coder = HuffmanEncoder(REQUEST_CODES, REQUEST_CODES_LENGTH)
f.data = e.encode(headers)
f.flags.add('END_HEADERS')
return f
length=0,
flags=FLAG_NO_FLAGS,
stream_id=0x0):
valid_flags = 0
for flag in self.VALID_FLAGS:
valid_flags |= flag
if flags | valid_flags != valid_flags:
raise ValueError('invalid flags detected.')
if state is None:
class State(object):
pass
state = State()
state.http2_settings = HTTP2_DEFAULT_SETTINGS.copy()
state.encoder = Encoder()
state.decoder = Decoder()
self.state = state
self.length = length
self.type = self.TYPE
self.flags = flags
self.stream_id = stream_id
length=0,
flags=FLAG_NO_FLAGS,
stream_id=0x0):
valid_flags = 0
for flag in self.VALID_FLAGS:
valid_flags |= flag
if flags | valid_flags != valid_flags:
raise ValueError('invalid flags detected.')
if state is None:
class State(object):
pass
state = State()
state.http2_settings = HTTP2_DEFAULT_SETTINGS.copy()
state.encoder = Encoder()
state.decoder = Decoder()
self.state = state
self.length = length
self.type = self.TYPE
self.flags = flags
self.stream_id = stream_id
def __init__(
self,
tcp_handler=None,
rfile=None,
wfile=None,
is_server=False,
dump_frames=False,
encoder=None,
decoder=None,
unhandled_frame_cb=None,
):
self.tcp_handler = tcp_handler or TCPHandler(rfile, wfile)
self.is_server = is_server
self.dump_frames = dump_frames
self.encoder = encoder or Encoder()
self.decoder = decoder or Decoder()
self.unhandled_frame_cb = unhandled_frame_cb
self.http2_settings = self.HTTP2_DEFAULT_SETTINGS.copy()
self.current_stream_id = None
self.connection_preface_performed = False