Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def save_config(self, details):
try:
os.mkdir('/'.join(config.TOKEN_FILE_PATH.split('/')[:-1]))
except OSError:
pass
with open(config.TOKEN_FILE_PATH, 'w') as file:
file.writelines(json.dumps(details))
def cli(verbose):
config.HUB_REST_ENDPOINT = config.HUB_LOCAL_REST_ENDPOINT
configure_logger(verbose)
def get_credentials(self):
if self.auth_header is None:
token = AuthClient().get_access_token(username="public", password="")
self.auth_header = f"Bearer {token}"
r = self.request(
"GET", config.GET_CREDENTIALS_SUFFIX, endpoint=config.HUB_REST_ENDPOINT,
).json()
details = {
"_id": r["_id"],
"region": r["region"],
"session_token": r["session_token"],
"access_key": r["access_key"],
"secret_key": r["secret_key"],
"endpoint": r["endpoint"],
"expiration": r["expiration"],
"bucket": r["bucket"],
}
self.save_config(details)
return details
def is_authenticated(cls):
return os.path.getsize(config.TOKEN_FILE_PATH) > 10
def get_token(cls):
logger.debug("Getting token...")
if not os.path.exists(config.TOKEN_FILE_PATH):
return None
with open(config.TOKEN_FILE_PATH, "r") as f:
token = f.read()
logger.debug("Got the key {} from {}.".format(token, config.TOKEN_FILE_PATH))
return token
def get_dataset_path(self, tag):
try:
dataset = self.request(
"GET",
config.GET_DATASET_PATH_SUFFIX,
params={"tag": tag},
endpoint=config.HUB_REST_ENDPOINT,
).json()
except NotFoundException:
dataset = None
return dataset
def request(
self,
method,
relative_url,
endpoint=None,
params={},
data={},
files={},
json={},
timeout=config.DEFAULT_TIMEOUT,
headers={},
):
if not endpoint:
endpoint = config.HUB_REST_ENDPOINT
request_url = urljoin(endpoint, relative_url)
headers["hub-cli-version"] = get_cli_version()
if (
"Authorization" not in headers
or headers["Authorization"] != self.auth_header
):
headers["Authorization"] = self.auth_header
try:
logger.debug("Sending: Headers {}, Json: {}".format(headers, json))
response = requests.request(
method,
request_url,
params=params,
data=data,
def save_config(self, details):
path = Path(config.STORE_CONFIG_PATH)
os.makedirs(path.parent, exist_ok=True)
with open(config.STORE_CONFIG_PATH, "w") as file:
file.writelines(json.dumps(details))
def get_config(self, public=False):
try:
if os.path.exists(config.TOKEN_FILE_PATH):
with open(config.TOKEN_FILE_PATH, 'r') as file:
details = file.readlines()
details = json.loads(''.join(details))
return details
else:
return gen()
# Disabled AWS credential lookup
if False and os.path.exists(config.AWSCRED_PATH):
return self.lookup_aws_creds()
raise Exception
except Exception as err:
raise NotAuthorized(
"No Hub or AWS credentials found. Please provide credentials by running '> hub configure'")
return {
'AWS_ACCESS_KEY_ID': '',
'AWS_SECRET_ACCESS_KEY': '',
'BUCKET': ''
}
return details
def is_authenticated(cls):
return os.path.getsize(config.TOKEN_FILE_PATH) > 10