Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_string(tmpdir):
args = [sys.executable, exitcodeapp.__file__, "foobar"]
p = Popen(args, env={"PYTHONPATH": ":".join(sys.path)}, stderr=PIPE)
status = p.wait()
assert status == 1
assert p.stderr.read() == b("foobar\n")
def test_parsemsg():
s = b(":foo!bar@localhost NICK foobar")
source, command, args = parsemsg(s)
assert source == (u("foo"), u("bar"), u("localhost"))
assert command == "NICK"
assert args == [u("foobar")]
s = b("")
source, command, args = parsemsg(s)
assert source == (None, None, None)
assert command is None
assert args == []
def _parse_chunk_size(self, data):
idx = data.find(b("\r\n"))
if idx < 0:
return None, None
line, rest_chunk = data[:idx], data[idx + 2:]
chunk_size = line.split(b(";"), 1)[0].strip()
try:
chunk_size = int(chunk_size, 16)
except ValueError:
raise InvalidChunkSize(chunk_size)
if chunk_size == 0:
self._parse_trailers(rest_chunk)
return 0, None
return chunk_size, rest_chunk
sec_key = headers.get("Sec-WebSocket-Key", "").encode("utf-8")
subprotocols = headers.elements("Sec-WebSocket-Protocol")
connection_tokens = [s.strip() for s in
headers.get("Connection", "").lower().split(",")]
try:
if ("Host" not in headers or headers.get("Upgrade", "").lower() != "websocket" or
"upgrade" not in connection_tokens or sec_key is None or len(base64.b64decode(sec_key)) != 16):
return httperror(request, response, code=400)
if headers.get("Sec-WebSocket-Version", "") != "13":
response.headers["Sec-WebSocket-Version"] = "13"
return httperror(request, response, code=400)
# Generate accept header information
msg = sec_key + b("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
hasher = hashlib.sha1()
hasher.update(msg)
accept = base64.b64encode(hasher.digest())
# Successful completion
response.status = 101
response.close = False
try:
del response.headers["Content-Type"]
except KeyError:
pass
response.headers["Upgrade"] = "WebSocket"
response.headers["Connection"] = "Upgrade"
response.headers["Sec-WebSocket-Accept"] = accept.decode()
if subprotocols:
response.headers["Sec-WebSocket-Protocol"] = self.select_subprotocol(subprotocols)
def canonical(self):
'''Canonicalize this url. This includes reordering parameters and args
to have a consistent ordering'''
self._query = b('&').join(
sorted([q for q in self._query.split(b('&'))])
)
self._params = b(';').join(
sorted([q for q in self._params.split(b(';'))])
)
return self
def abspath(self):
'''Clear out any '..' and excessive slashes from the path'''
# Remove double forward-slashes from the path
path = re.sub(b('\/{2,}'), b('/'), self._path)
# With that done, go through and remove all the relative references
unsplit = []
directory = False
for part in path.split(b('/')):
# If we encounter the parent directory, and there's
# a segment to pop off, then we should pop it off.
if part == b('..') and (not unsplit or unsplit.pop() is not None):
directory = True
elif part != b('.'):
directory = False
unsplit.append(part)
else:
directory = True
# With all these pieces, assemble!
if directory:
def abspath(self):
'''Clear out any '..' and excessive slashes from the path'''
# Remove double forward-slashes from the path
path = re.sub(b('\/{2,}'), b('/'), self._path)
# With that done, go through and remove all the relative references
unsplit = []
directory = False
for part in path.split(b('/')):
# If we encounter the parent directory, and there's
# a segment to pop off, then we should pop it off.
if part == b('..') and (not unsplit or unsplit.pop() is not None):
directory = True
elif part != b('.'):
directory = False
unsplit.append(part)
else:
directory = True
# With all these pieces, assemble!
if directory:
# If the path ends with a period, then it refers to a directory,
# not a file path
unsplit.append(b('/'))
self._path = b('/').join(unsplit)
body_part = b("").join(self._buf)
self._clen_rest -= len(body_part)
# maybe decompress
if self.__decompress_obj is not None:
body_part = self.__decompress_obj.decompress(body_part)
self._partial_body = True
self._body.append(body_part)
self._buf = []
if self._clen_rest <= 0:
self.__on_message_complete = True
return
else:
data = b("").join(self._buf)
try:
size, rest = self._parse_chunk_size(data)
except InvalidChunkSize as e:
self.errno = INVALID_CHUNK
self.errstr = "invalid chunk size [%s]" % str(e)
return -1
if size == 0:
return size
if size is None or len(rest) < size:
return None
body_part, rest = rest[:size], rest[size:]
if len(rest) < 2:
"""
import traceback
from ..six import b
from .components import BaseComponent
from .events import Event, exception
from .handlers import handler
from .values import Value
try:
from cPickle import dumps, loads
except ImportError:
from pickle import dumps, loads # NOQA
_sentinel = b('~~~')
class ipc(Event):
"""ipc Event
Send an event to a child/parent process
"""
def __init__(self, event, channel=None):
"""
:param event: Event to execute remotely.
:type event: :class:`circuits.core.events.Event`
:param channel: IPC Channel (channel to use on child/parent).
:type channel: str
"""
def __bytes__(self):
return b(self.__str__())