Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def unquote_to_bytes(string):
"""unquote_to_bytes('abc%20def') -> b'abc def'."""
# Note: strings are encoded as UTF-8. This is only an issue if it contains
# unescaped non-ASCII characters, which URIs should not.
if not string:
# Is it a string-like object?
string.split
return bytes(b'')
if isinstance(string, str):
string = string.encode('utf-8')
### For Python-Future:
# It is already a byte-string object, but force it to be newbytes here on
# Py2:
string = bytes(string)
###
bits = string.split(b'%')
if len(bits) == 1:
return string
res = [bits[0]]
append = res.append
for item in bits[1:]:
try:
append(_hextobyte[item[:2]])
append(item[2:])
def unquote_to_bytes(string):
"""unquote_to_bytes('abc%20def') -> b'abc def'."""
# Note: strings are encoded as UTF-8. This is only an issue if it contains
# unescaped non-ASCII characters, which URIs should not.
if not string:
# Is it a string-like object?
string.split
return bytes(b'')
if isinstance(string, str):
string = string.encode('utf-8')
### For Python-Future:
# It is already a byte-string object, but force it to be newbytes here on
# Py2:
string = bytes(string)
###
bits = string.split(b'%')
if len(bits) == 1:
return string
res = [bits[0]]
append = res.append
for item in bits[1:]:
try:
append(_hextobyte[item[:2]])
append(item[2:])
except KeyError:
append(b'%')
append(item)
return bytes(b'').join(res)
sockaddr = ctypes.cast(ifaddr.ifa_addr, ctypes.POINTER(Sockaddr))
iffamily = sockaddr.contents.sa_family
if iffamily == AF_INET:
sockaddrin = ctypes.cast(ifaddr.ifa_addr, ctypes.POINTER(Sockaddrin))
address = rdf_client_network.NetworkAddress()
address.address_type = rdf_client_network.NetworkAddress.Family.INET
address.packed_bytes = struct.pack("=L", sockaddrin.contents.sin_addr)
iface.addresses.append(address)
elif iffamily == AF_INET6:
sockaddrin = ctypes.cast(ifaddr.ifa_addr, ctypes.POINTER(Sockaddrin6))
address = rdf_client_network.NetworkAddress()
address.address_type = rdf_client_network.NetworkAddress.Family.INET6
address.packed_bytes = bytes(list(sockaddrin.contents.sin6_addr))
iface.addresses.append(address)
elif iffamily == AF_LINK:
sockaddrdl = ctypes.cast(ifaddr.ifa_addr, ctypes.POINTER(Sockaddrdl))
nlen = sockaddrdl.contents.sdl_nlen
alen = sockaddrdl.contents.sdl_alen
iface.mac_address = bytes(sockaddrdl.contents.sdl_data[nlen:nlen + alen])
else:
raise ValueError("Unexpected socket address family: %s" % iffamily)
return itervalues(ifaces)
def _tunnel(self):
self._set_hostport(self._tunnel_host, self._tunnel_port)
connect_str = "CONNECT %s:%d HTTP/1.0\r\n" % (self.host, self.port)
connect_bytes = connect_str.encode("ascii")
self.send(connect_bytes)
for header, value in self._tunnel_headers.items():
header_str = "%s: %s\r\n" % (header, value)
header_bytes = header_str.encode("latin-1")
self.send(header_bytes)
self.send(bytes(b'\r\n'))
response = self.response_class(self.sock, method=self._method)
(version, code, message) = response._read_status()
if code != 200:
self.close()
raise socket.error("Tunnel connection failed: %d %s" % (code,
message.strip()))
while True:
line = response.fp.readline(_MAXLINE + 1)
if len(line) > _MAXLINE:
raise LineTooLong("header line")
if not line:
# for sites which EOF without sending a trailer
break
if line in (b'\r\n', b'\n', b''):
def _send_output(self, message_body=None):
"""Send the currently buffered request and clear the buffer.
Appends an extra \\r\\n to the buffer.
A message_body may be specified, to be appended to the request.
"""
self._buffer.extend((bytes(b""), bytes(b"")))
msg = bytes(b"\r\n").join(self._buffer)
del self._buffer[:]
# If msg and message_body are sent in a single send() call,
# it will avoid performance problems caused by the interaction
# between delayed ack and the Nagle algorithm.
if isinstance(message_body, bytes):
msg += message_body
message_body = None
self.send(msg)
if message_body is not None:
# message_body was not a string (i.e. it is a file), and
# we must run the risk of Nagle.
self.send(message_body)
def read(self, amt=None):
if self.fp is None:
return bytes(b"")
if self._method == "HEAD":
self._close_conn()
return bytes(b"")
if amt is not None:
# Amount is given, so call base class version
# (which is implemented in terms of self.readinto)
return bytes(super(HTTPResponse, self).read(amt))
else:
# Amount is not given (unbounded read) so we must check self.length
# and self.chunked
if self.chunked:
return self._readall_chunked()
if self.length is None:
s = self.fp.read()
else:
try:
s = self._safe_read(self.length)
except IncompleteRead:
self._close_conn()
raise
self.length = 0
value = nv[1].replace('+', ' ')
value = unquote(value, encoding=encoding, errors=errors)
value = _coerce_result(value)
r.append((name, value))
return r
def unquote_plus(string, encoding='utf-8', errors='replace'):
"""Like unquote(), but also replace plus signs by spaces, as required for
unquoting HTML form values.
unquote_plus('%7e/abc+def') -> '~/abc def'
"""
string = string.replace('+', ' ')
return unquote(string, encoding, errors)
_ALWAYS_SAFE = frozenset(bytes(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
b'abcdefghijklmnopqrstuvwxyz'
b'0123456789'
b'_.-'))
_ALWAYS_SAFE_BYTES = bytes(_ALWAYS_SAFE)
_safe_quoters = {}
class Quoter(collections.defaultdict):
"""A mapping from bytes (in range(0,256)) to strings.
String values are percent-encoded byte values, unless the key < 128, and
in the "safe" set (either the specified safe set, or default set).
"""
# Keeps a cache internally, using defaultdict, for efficiency (lookups
# of cached keys don't call Python code at all).
def __init__(self, safe):
"""safe: bytes object."""
"""Send a request header line to the server.
For example: h.putheader('Accept', 'text/html')
"""
if self.__state != _CS_REQ_STARTED:
raise CannotSendHeader()
if hasattr(header, 'encode'):
header = header.encode('ascii')
values = list(values)
for i, one_value in enumerate(values):
if hasattr(one_value, 'encode'):
values[i] = one_value.encode('latin-1')
elif isinstance(one_value, int):
values[i] = str(one_value).encode('ascii')
value = bytes(b'\r\n\t').join(values)
header = header + bytes(b': ') + value
self._output(header)
def __init__(self, safe):
"""safe: bytes object."""
self.safe = _ALWAYS_SAFE.union(bytes(safe))