Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
wrapped_agent = RedirectAgent(wrapped_agent)
wrapped_agent = ContentDecoderAgent(wrapped_agent,
[('gzip', GzipDecoder)])
auth = kwargs.get('auth')
if auth:
wrapped_agent = add_auth(wrapped_agent, auth)
d = wrapped_agent.request(
method, url, headers=headers,
bodyProducer=bodyProducer)
timeout = kwargs.get('timeout')
if timeout:
delayedCall = default_reactor(kwargs.get('reactor')).callLater(
timeout, d.cancel)
def gotResult(result):
if delayedCall.active():
delayedCall.cancel()
return result
d.addBoth(gotResult)
if not kwargs.get('unbuffered', False):
d.addCallback(_BufferedResponse)
return d.addCallback(_Response)
def _client(*args, **kwargs):
reactor = default_reactor(kwargs.get('reactor'))
#pool = default_pool(reactor,
# kwargs.get('pool'),
# kwargs.get('persistent'))
# XXX setting pool to None is necessary to stop weird bug where deferreds
# never fire on requests after chunked requests.
pool = None
agent = UNIXCapableAgent(reactor, pool=pool)
return HTTPClient(agent)
def _client(*args, **kwargs):
agent = kwargs.get('agent')
if agent is None:
reactor = default_reactor(kwargs.get('reactor'))
pool = default_pool(reactor,
kwargs.get('pool'),
kwargs.get('persistent'))
agent = Agent(reactor, pool=pool)
return HTTPClient(agent)
wrapped_agent = RedirectAgent(wrapped_agent)
wrapped_agent = ContentDecoderAgent(wrapped_agent,
[(b'gzip', GzipDecoder)])
auth = kwargs.get('auth')
if auth:
wrapped_agent = add_auth(wrapped_agent, auth)
d = wrapped_agent.request(
method, url, headers=headers,
bodyProducer=bodyProducer)
timeout = kwargs.get('timeout')
if timeout:
delayedCall = default_reactor(kwargs.get('reactor')).callLater(
timeout, d.cancel)
def gotResult(result):
if delayedCall.active():
delayedCall.cancel()
return result
d.addBoth(gotResult)
if not kwargs.get('unbuffered', False):
d.addCallback(_BufferedResponse)
return d.addCallback(_Response, cookies)
def _client(*args, **kwargs):
reactor = default_reactor(kwargs.get('reactor'))
pool = default_pool(reactor,
kwargs.get('pool'),
persistent=False)
#kwargs.get('persistent'))
agent = UNIXAwareHttpAgent(reactor, pool=pool, **kwargs)
return UNIXAwareHttpClient(agent)