Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_adal_token_cache(self):
if self._adal_token_cache_attr is None:
import adal
all_entries = _load_tokens_from_file(self._token_file)
self._load_service_principal_creds(all_entries)
real_token = [x for x in all_entries if x not in self._service_principal_creds]
self._adal_token_cache_attr = adal.TokenCache(json.dumps(real_token))
return self._adal_token_cache_attr
def read_token_cache(token_cache_file):
if token_cache_file is None:
return None
token_cache = None
try:
logger.debug("reading token cache from %s", token_cache_file)
token_cache_fd = os.open(token_cache_file, os.O_CREAT, 0o600)
with os.fdopen(token_cache_fd, 'r') as token_cache_fh:
token_cache = adal.TokenCache(state=token_cache_fh.read())
except IOError as err:
logger.error(
"could not open token cache file %s: %s. continuing without cache",
token_cache_file, err)
os.close(token_cache_fd)
except ValueError as err:
logger.error("could not load cache from disk: %s", err)
return token_cache
def get_token(token_cache_file, authority_url, client_id, resource_url, user_id):
try:
with open(token_cache_file, "r+") as token_cache_fh:
token_cache = adal.TokenCache(state=token_cache_fh.read())
except IOError:
print("no token cache found")
with open(token_cache_file, "w+") as _:
token_cache = adal.TokenCache(state="")
context = adal.AuthenticationContext(authority_url, cache=token_cache)
try:
token = context.acquire_token(
resource_url,
user_id,
client_id
)
except adal.adal_error.AdalError:
token = None
if token is None:
print("No cached credentials")
code = context.acquire_user_code(resource_url, client_id)
print(code["message"])
try:
token = context.acquire_token_with_device_code(
def get_token(token_cache_file, authority_url, client_id, resource_url, user_id):
try:
with open(token_cache_file, "r+") as token_cache_fh:
token_cache = adal.TokenCache(state=token_cache_fh.read())
except IOError:
print("no token cache found")
with open(token_cache_file, "w+") as _:
token_cache = adal.TokenCache(state="")
context = adal.AuthenticationContext(authority_url, cache=token_cache)
try:
token = context.acquire_token(
resource_url,
user_id,
client_id
)
except adal.adal_error.AdalError:
token = None
if token is None:
print("No cached credentials")