Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_instance_type(self):
jwt_auth_plugin = JWTAuthPlugin()
self.assertIsInstance(jwt_auth_plugin, AuthPlugin)
def test_auth_plugin_parse_auth_false(httpbin):
class Plugin(AuthPlugin):
auth_type = 'test-parse-false'
auth_parse = False
def get_auth(self, username=None, password=None):
assert username is None
assert password is None
assert self.raw_auth == BASIC_AUTH_HEADER_VALUE
return basic_auth(self.raw_auth)
plugin_manager.register(Plugin)
try:
r = http(
httpbin + BASIC_AUTH_URL,
'--auth-type',
Plugin.auth_type,
'--auth',
class HTTPieEdgeGridAuth(EdgeGridAuth):
def __init__(self, hostname, *args, **kwargs):
self.__hostname = hostname
super(HTTPieEdgeGridAuth, self).__init__(*args, **kwargs)
def __call__(self, r):
r = super(HTTPieEdgeGridAuth, self).__call__(r)
r.url = r.url.replace("http:","https:")
r.url = r.url.replace("localhost/",self.__hostname)
return super(HTTPieEdgeGridAuth, self).__call__(r)
class EdgeGridPlugin(AuthPlugin):
name = 'EdgeGrid auth'
auth_type = 'edgegrid'
description = ''
def get_auth(self, username, password):
rc_path = os.path.expanduser("~/.edgerc")
rc = EdgeRc(rc_path)
if not rc.has_section(username):
err_msg = "\nERROR: No section named '%s' was found in your .edgerc file\n" % username
err_msg += "ERROR: Please generate credentials for the script functionality\n"
err_msg += "ERROR: and run 'python gen_edgerc.py %s' to generate the credential file\n"
sys.stderr.write(err_msg)
sys.exit(1)
__license__ = 'BSD'
class JWTAuth(object):
"""JWTAuth to set the right Authorization header format of JWT"""
def __init__(self, token, auth_prefix):
self.token = token
self.auth_prefix = auth_prefix
def __call__(self, request):
request.headers['Authorization'] = '{0} {1}'.format(self.auth_prefix, self.token)
return request
class JWTAuthPlugin(AuthPlugin):
"""Plugin registration"""
name = 'JWT auth'
auth_type = 'jwt'
description = 'Set the right format for JWT auth request'
auth_require = False
prompt_password = False
def get_auth(self, username=None, password=None):
auth_prefix = os.environ.get('JWT_AUTH_PREFIX', 'Bearer')
env_token = os.environ.get('JWT_AUTH_TOKEN')
if username is None:
username = env_token
if username is None:
raise Exception('--auth or JWT_AUTH_TOKEN required error')
return JWTAuth(username, auth_prefix)
"""
OAuth plugin for HTTPie.
"""
from httpie.plugins import AuthPlugin
__version__ = '1.0.2'
__author__ = 'Jakub Roztocil'
__licence__ = 'BSD'
class OAuth1Plugin(AuthPlugin):
name = 'OAuth 1.0a 2-legged'
auth_type = 'oauth1'
description = ''
def get_auth(self, username, password):
from requests_oauthlib import OAuth1
return OAuth1(client_key=username, client_secret=password)
KEY = 'AWS_ACCESS_KEY_ID'
SECRET = 'AWS_SECRET_ACCESS_KEY'
class BytesHeadersFriendlyS3Auth(S3Auth):
def __call__(self, r):
for k, v in r.headers.items():
if isinstance(v, bytes):
# HTTPie passes bytes but S3Auth excepts text, so unless we
# decode it here, the signature will be incorrect:
# https://github.com/tax/python-requests-aws/blob/46f2e90ea48e18d8f32c6473fecdf0da4ef04847/awsauth.py#L104
r.headers[k] = v.decode('utf8')
return super(BytesHeadersFriendlyS3Auth, self).__call__(r)
class AWSAuthPlugin(AuthPlugin):
name = 'AWS auth'
auth_type = 'aws'
description = ''
auth_require = False
prompt_password = True
def get_auth(self, username=None, password=None):
# There's a differences between None and '': only use the
# env vars when --auth, -a not specified at all, otherwise
# the behaviour would be confusing to the user.
access_key = os.environ.get(KEY) if username is None else username
secret = os.environ.get(SECRET) if password is None else password
if not access_key or not secret:
missing = []
if not access_key:
missing.append(KEY)
def get_auth_plugins(self) -> List[Type[AuthPlugin]]:
return self.filter(AuthPlugin)
httpdate = now.strftime('%a, %d %b %Y %H:%M:%S GMT')
r.headers['Date'] = httpdate
url = urlparse.urlparse(r.url)
path = url.path
if url.query:
path = path + '?' + url.query
string_to_sign = '%s,%s,%s,%s,%s' % (method, content_type, content_md5, path, httpdate)
digest = hmac.new(self.secret_key, string_to_sign, hashlib.sha1).digest()
signature = base64.encodestring(digest).rstrip()
r.headers['Authorization'] = 'APIAuth %s:%s' % (self.access_id, signature)
return r
class ApiAuthPlugin(AuthPlugin):
name = 'ApiAuth auth'
auth_type = 'api-auth'
description = 'Sign requests using the ApiAuth authentication method'
def get_auth(self, access_id, secret_key):
return ApiAuth(access_id, secret_key)
if m:
return {"region": m.group(1),
"service": "es"}
p = re.compile("([^\.]+)\.([^\.]+)\.amazonaws.com$")
m = p.search(domain)
if m:
return {"region": m.group(2),
"service": m.group(1)}
raise ValueError("Could not determine AWS region or service from domain name.")
class AWSv4AuthPlugin(AuthPlugin):
name = 'AWS auth-v4'
auth_type = 'aws4'
description = 'Sign requests using the AWS Signature Version 4 Signing Process'
auth_require = False
auth_parse = False
prompt_password = False
def get_auth(self, username=None, password=None):
# To remain consistant with AWS tools, boto3 credential store is used
# to retrieve the users API keys. This means environment variables
# take precedent over ~/.aws/credentials and IAM roles. credentials
# can be provided on the CLI using the -a flag
# eg, -a :
# An attempt is made to try and determine the region, endpoint and
# service from a given request URL. If you are using a custom domain
"""
OAuth plugin for HTTPie.
"""
from httpie.plugins import AuthPlugin
from requests_oauthlib import OAuth1
__version__ = '1.0.2'
__author__ = 'Jakub Roztocil'
__licence__ = 'BSD'
class OAuth1Plugin(AuthPlugin):
name = 'OAuth 1.0a 2-legged'
auth_type = 'oauth1'
description = ''
def get_auth(self, username, password):
return OAuth1(client_key=username, client_secret=password)