Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, method='GET', headers_type=Headers):
super(HTTPResponse, self).__init__()
self.method = method.upper()
self.headers_complete = False
self.message_begun = False
self.message_complete = False
self._headers_index = headers_type()
self._header_state = HEADER_STATE_INIT
self._current_header_field = None
self._current_header_value = None
self._header_position = 1
self._body_buffer = bytearray()
self.status_message = None
def __init__(self, host, port=None, headers=None,
block_size=BLOCK_SIZE,
connection_timeout=ConnectionPool.DEFAULT_CONNECTION_TIMEOUT,
network_timeout=ConnectionPool.DEFAULT_NETWORK_TIMEOUT,
disable_ipv6=False,
concurrency=1,
ssl=False, ssl_options=None, ssl_context_factory=None,
insecure=False,
proxy_host=None, proxy_port=None, version=HTTP_11,
headers_type=Headers):
if headers is None:
headers = {}
self.host = host
self.port = port
connection_host = self.host
connection_port = self.port
if proxy_host is not None:
assert proxy_port is not None, \
'you have to provide proxy_port if you set proxy_host'
self.use_proxy = True
connection_host = proxy_host
connection_port = proxy_port
else:
self.use_proxy = False
if ssl and ssl_options is None:
ssl_options = {}
try:
return len(body)
except TypeError:
try:
return os.fstat(body.fileno()).st_size
except (AttributeError, OSError):
return None
class HTTPClient(object):
HTTP_11 = 'HTTP/1.1'
HTTP_10 = 'HTTP/1.0'
BLOCK_SIZE = 1024 * 4 # 4KB
DEFAULT_HEADERS = Headers({
'User-Agent': 'python/gevent-http-client-' + __version__
})
@classmethod
def from_url(cls, url, **kw):
if not isinstance(url, URL):
url = URL(url)
enable_ssl = url.scheme == PROTO_HTTPS
if not enable_ssl:
kw.pop('ssl_options', None)
return cls(url.host, port=url.port, ssl=enable_ssl, **kw)
def __init__(self, host, port=None, headers=None,
block_size=BLOCK_SIZE,
connection_timeout=ConnectionPool.DEFAULT_CONNECTION_TIMEOUT,
network_timeout=ConnectionPool.DEFAULT_NETWORK_TIMEOUT,
import six
if six.PY3:
httplib = __import__('http.client').client
else:
httplib = __import__('httplib')
from geventhttpclient import response
from geventhttpclient import header
import gevent.socket
class HTTPLibHeaders(header.Headers):
def __getitem__(self, key):
value = super(HTTPLibHeaders, self).__getitem__(key)
if isinstance(value, (list, tuple)):
return ", ".join(value)
else:
return value
class HTTPResponse(response.HTTPSocketResponse):
def __init__(self, sock, method='GET', strict=0, debuglevel=0,
buffering=False, **kw):
if method is None:
method = 'GET'
else: