Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_proper_exception_is_raised_when_cert_validation_fails(self):
http = HTTPClient(token=AUTH_TOKEN, endpoint_url=END_URL)
self.mox.StubOutWithMock(httplib2.Http, 'request')
httplib2.Http.request(
URL, METHOD, headers=mox.IgnoreArg()
).AndRaise(httplib2.SSLHandshakeError)
self.mox.ReplayAll()
self.assertRaises(
exceptions.SslCertificateValidationError,
http._cs_request,
URL, METHOD
)
self.mox.VerifyAll()
def crequest(self, uri, trace=None, **kws):
"""
"""
headers = kws.pop('headers', {})
req = DummyRequest(uri, headers)
self.cookiejar.add_cookie_header(req)
headers = req.headers
if trace:
trace.request("HEADERS: %s" % headers)
try:
(r, body) = Http.request(self, uri, headers=headers,
redirections=0, **kws)
except RedirectLimit, err:
r = err.response
body = err.content
resp = DummyResponse(r)
self.cookiejar.extract_cookies(resp, req)
return r, body
parameters = None
req = Request.from_consumer_and_token(self.consumer, token=self.token,
http_method=method, http_url=uri, parameters=parameters)
req.sign_request(self.method, self.consumer, self.token)
if method == "POST":
body = req.to_postdata()
headers['Content-Type'] = 'application/x-www-form-urlencoded'
elif method == "GET":
uri = req.to_url()
else:
headers.update(req.to_header())
return httplib2.Http.request(self, uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=connection_type)
def request(self, uri, method="GET", body=None, headers={},
redirections=DEFAULT_MAX_REDIRECTS, connection_type=None,
force_exception_to_status_code=False):
"""
Inject Connection=close into the HTTP request headers.
"""
if not headers.has_key('Connection'):
headers['Connection'] = 'close'
return Http.request(self, uri, method, body, headers, redirections,
connection_type)
token=self.token, http_method=method, http_url=uri,
parameters=parameters, body=body, is_form_encoded=is_form_encoded)
req.sign_request(self.method, self.consumer, self.token)
scheme, netloc, path, params, query, fragment = urlparse(uri)
realm = urlunparse((scheme, netloc, '', None, None, None))
if is_form_encoded:
body = req.to_postdata()
elif method == "GET":
uri = req.to_url()
else:
headers.update(req.to_header(realm=realm))
return httplib2.Http.request(self, uri, method=method, body=body,
headers=headers, redirections=redirections,
connection_type=connection_type)
def unload_ipython_extension(shell):
_shell.InteractiveShell.run_cell_magic = _orig_run_cell_magic
_shell.InteractiveShell.run_line_magic = _orig_run_line_magic
_requests.Session.__init__ = _orig_init
_httplib2.Http.request = _orig_request
try:
del _IPython.get_ipython().user_ns['project_id']
del _IPython.get_ipython().user_ns['set_project_id']
except Exception:
pass # We mock IPython for tests so we need this.
# TODO(gram): unregister imports/magics/etc.
def rawcall(self, url, body=None, method="GET"):
client = oauth.Client(self.consumer, self.oauth_token)
oauth_request = oauth.Request.from_consumer_and_token(self.consumer, token=self.oauth_token, http_url=url, http_method=method)
oauth_request.sign_request(self.sigmethod, self.consumer, self.oauth_token)
headers = oauth_request.to_header()
headers['x-li-format'] = 'json'
body = json.dumps(body)
headers['Content-type'] = 'application/json'
headers['Content-Length'] = str(len(body))
resp, content = httplib2.Http.request(client, url, method=method, headers=headers, body=body)
data = content and json.loads(content) or resp
result = error = None
status = int(resp['status'])
if status < 200 or status >= 300:
error = data
else:
result = data
return result, error
force_login = request.POST.get('force_login') and True
# Create the consumer and client, make the request
client = oauth.Client(self.consumer)
params = {'oauth_callback': url(controller='account',
action="verify",
provider=self.provider,
qualified=True)}
if self.scope:
params[ 'scope' ] = self.scope
# We go through some shennanigans here to specify a callback url
oauth_request = oauth.Request.from_consumer_and_token(self.consumer,
http_url=self.request_token_url, parameters=params)
oauth_request.sign_request(self.sigmethod, self.consumer, None)
resp, content = httplib2.Http.request(client, self.request_token_url, method='GET',
headers=oauth_request.to_header())
if resp['status'] != '200':
raise AccessException("Error status: %r", resp['status'])
request_token = oauth.Token.from_string(content)
session['token'] = content
session.save()
# force_login is twitter specific
if self.provider == 'twitter.com' and asbool(request.POST.get('force_login')):
http_url=self.authorization_url+'?force_login=true'
else:
http_url=self.authorization_url
def request_new_token(self, uri, callback=None, params={}):
if callback is not None:
params['oauth_callback'] = callback
req = oauth2.Request.from_consumer_and_token(
self.consumer, token=self.token,
http_method='POST', http_url=uri, parameters=params,
is_form_encoded=True)
req.sign_request(self.method, self.consumer, self.token)
body = req.to_postdata()
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': str(len(body))
}
return httplib2.Http.request(self, uri, method='POST',
body=body, headers=headers)