Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# We need to fix the output of get_bewit. It returns a url-safe base64
# encoded string, which contains a list of tokens separated by '\'.
# The first one is the clientId, the second is an int, the third is
# url-safe base64 encoded MAC, the fourth is the ext param.
# The problem is that the nested url-safe base64 encoded MAC must be
# base64 (i.e. not url safe) or server-side will complain.
# id + '\\' + exp + '\\' + mac + '\\' + options.ext;
resource = mohawk.base.Resource(
credentials={
'id': clientId,
'key': accessToken,
'algorithm': 'sha256',
},
method='GET',
ext=utils.toStr(self.makeHawkExt()),
url=requestUrl,
timestamp=expiration,
nonce='',
# content='',
# content_type='',
)
bewit = mohawk.bewit.get_bewit(resource)
return bewit.rstrip('=')
# We need to fix the output of get_bewit. It returns a url-safe base64
# encoded string, which contains a list of tokens separated by '\'.
# The first one is the clientId, the second is an int, the third is
# url-safe base64 encoded MAC, the fourth is the ext param.
# The problem is that the nested url-safe base64 encoded MAC must be
# base64 (i.e. not url safe) or server-side will complain.
# id + '\\' + exp + '\\' + mac + '\\' + options.ext;
resource = mohawk.base.Resource(
credentials={
'id': clientId,
'key': accessToken,
'algorithm': 'sha256',
},
method='GET',
ext=utils.toStr(self.makeHawkExt()),
url=requestUrl,
timestamp=expiration,
nonce='',
# content='',
# content_type='',
)
bewit = mohawk.bewit.get_bewit(resource)
return bewit.rstrip('=')
if 'expiration' in kwargs:
expiration = kwargs['expiration']
del kwargs['expiration']
else:
expiration = self.options['signedUrlExpiration']
expiration = int(time.time() + expiration) # Mainly so that we throw if it's not a number
requestUrl = self.buildUrl(methodName, *args, **kwargs)
if not self._hasCredentials():
raise exceptions.TaskclusterAuthFailure('Invalid Hawk Credentials')
clientId = utils.toStr(self.options['credentials']['clientId'])
accessToken = utils.toStr(self.options['credentials']['accessToken'])
def genBewit():
# We need to fix the output of get_bewit. It returns a url-safe base64
# encoded string, which contains a list of tokens separated by '\'.
# The first one is the clientId, the second is an int, the third is
# url-safe base64 encoded MAC, the fourth is the ext param.
# The problem is that the nested url-safe base64 encoded MAC must be
# base64 (i.e. not url safe) or server-side will complain.
# id + '\\' + exp + '\\' + mac + '\\' + options.ext;
resource = mohawk.base.Resource(
credentials={
'id': clientId,
'key': accessToken,
'algorithm': 'sha256',
},
if 'expiration' in kwargs:
expiration = kwargs['expiration']
del kwargs['expiration']
else:
expiration = self.options['signedUrlExpiration']
expiration = int(time.time() + expiration) # Mainly so that we throw if it's not a number
requestUrl = self.buildUrl(methodName, *args, **kwargs)
if not self._hasCredentials():
raise exceptions.TaskclusterAuthFailure('Invalid Hawk Credentials')
clientId = utils.toStr(self.options['credentials']['clientId'])
accessToken = utils.toStr(self.options['credentials']['accessToken'])
def genBewit():
# We need to fix the output of get_bewit. It returns a url-safe base64
# encoded string, which contains a list of tokens separated by '\'.
# The first one is the clientId, the second is an int, the third is
# url-safe base64 encoded MAC, the fourth is the ext param.
# The problem is that the nested url-safe base64 encoded MAC must be
# base64 (i.e. not url safe) or server-side will complain.
# id + '\\' + exp + '\\' + mac + '\\' + options.ext;
resource = mohawk.base.Resource(
credentials={
'id': clientId,
'key': accessToken,
'algorithm': 'sha256',
},
if expiry - start > datetime.timedelta(days=31):
raise exceptions.TaskclusterFailure('Only 31 days allowed')
# We multiply times by 1000 because the auth service is JS and as a result
# uses milliseconds instead of seconds
cert = dict(
version=1,
scopes=scopes,
start=calendar.timegm(start.utctimetuple()) * 1000,
expiry=calendar.timegm(expiry.utctimetuple()) * 1000,
seed=utils.slugId().encode('ascii') + utils.slugId().encode('ascii'),
)
# if this is a named temporary credential, include the issuer in the certificate
if name:
cert['issuer'] = utils.toStr(clientId)
sig = ['version:' + utils.toStr(cert['version'])]
if name:
sig.extend([
'clientId:' + utils.toStr(name),
'issuer:' + utils.toStr(clientId),
])
sig.extend([
'seed:' + utils.toStr(cert['seed']),
'start:' + utils.toStr(cert['start']),
'expiry:' + utils.toStr(cert['expiry']),
'scopes:'
] + scopes)
sigStr = '\n'.join(sig).encode()
if isinstance(accessToken, six.text_type):
cert = dict(
version=1,
scopes=scopes,
start=calendar.timegm(start.utctimetuple()) * 1000,
expiry=calendar.timegm(expiry.utctimetuple()) * 1000,
seed=utils.slugId() + utils.slugId(),
)
# if this is a named temporary credential, include the issuer in the certificate
if name:
cert['issuer'] = utils.toStr(clientId)
sig = ['version:' + utils.toStr(cert['version'])]
if name:
sig.extend([
'clientId:' + utils.toStr(name),
'issuer:' + utils.toStr(clientId),
])
sig.extend([
'seed:' + utils.toStr(cert['seed']),
'start:' + utils.toStr(cert['start']),
'expiry:' + utils.toStr(cert['expiry']),
'scopes:'
] + scopes)
sigStr = '\n'.join(sig).encode()
if isinstance(accessToken, six.text_type):
accessToken = accessToken.encode()
sig = hmac.new(accessToken, sigStr, hashlib.sha256).digest()
cert['signature'] = utils.encodeStringForB64Header(sig)
# We multiply times by 1000 because the auth service is JS and as a result
# uses milliseconds instead of seconds
cert = dict(
version=1,
scopes=scopes,
start=calendar.timegm(start.utctimetuple()) * 1000,
expiry=calendar.timegm(expiry.utctimetuple()) * 1000,
seed=utils.slugId() + utils.slugId(),
)
# if this is a named temporary credential, include the issuer in the certificate
if name:
cert['issuer'] = utils.toStr(clientId)
sig = ['version:' + utils.toStr(cert['version'])]
if name:
sig.extend([
'clientId:' + utils.toStr(name),
'issuer:' + utils.toStr(clientId),
])
sig.extend([
'seed:' + utils.toStr(cert['seed']),
'start:' + utils.toStr(cert['start']),
'expiry:' + utils.toStr(cert['expiry']),
'scopes:'
] + scopes)
sigStr = '\n'.join(sig).encode()
if isinstance(accessToken, six.text_type):
accessToken = accessToken.encode()
sig = hmac.new(accessToken, sigStr, hashlib.sha256).digest()
# We multiply times by 1000 because the auth service is JS and as a result
# uses milliseconds instead of seconds
cert = dict(
version=1,
scopes=scopes,
start=calendar.timegm(start.utctimetuple()) * 1000,
expiry=calendar.timegm(expiry.utctimetuple()) * 1000,
seed=utils.slugId().encode('ascii') + utils.slugId().encode('ascii'),
)
# if this is a named temporary credential, include the issuer in the certificate
if name:
cert['issuer'] = utils.toStr(clientId)
sig = ['version:' + utils.toStr(cert['version'])]
if name:
sig.extend([
'clientId:' + utils.toStr(name),
'issuer:' + utils.toStr(clientId),
])
sig.extend([
'seed:' + utils.toStr(cert['seed']),
'start:' + utils.toStr(cert['start']),
'expiry:' + utils.toStr(cert['expiry']),
'scopes:'
] + scopes)
sigStr = '\n'.join(sig).encode()
if isinstance(accessToken, six.text_type):
accessToken = accessToken.encode()
sig = hmac.new(accessToken, sigStr, hashlib.sha256).digest()