Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
url = urlunparse(url_parsed)
# timeout is (connect_timeout, read_timeout)
res = sesh.put(url, data=file_opened,
timeout=(get_lesser(60, current_time_left), current_time_left))
res.raise_for_status()
current_files_count += 1
print_progress(current_files_count, total_files_count,
os.path.normpath(os.path.join(rel_path, single_file)),
show_progress, get_timeout_left(target_timeout))
current_time_left = get_timeout_left(target_timeout)
if current_time_left == 0:
raise SFCTLInternalException('Upload has timed out. Consider passing a longer '
'timeout duration.')
url_path = (
os.path.normpath(os.path.join('ImageStore', basename,
rel_path, '_.dir'))
).replace('\\', '/')
url_parsed = list(urlparse(endpoint))
url_parsed[2] = url_path
url_parsed[4] = urlencode({'api-version': '6.1',
'timeout': current_time_left})
url = urlunparse(url_parsed)
res = sesh.put(url,
timeout=(get_lesser(60, current_time_left), current_time_left))
res.raise_for_status()
current_files_count += 1
def sfctl_cluster_version_matches(cluster_version, sfctl_version):
"""
Check if the sfctl version and the cluster version is compatible with each other.
:param cluster_version: str representing the cluster runtime version of the connected cluster
:param sfctl_version: str representing this sfctl distribution version
:return: True if they are a match. False otherwise.
"""
if sfctl_version in ['8.0.0']:
return cluster_version.startswith('6.5')
# If we forget to update this code before a new release, the tests which call this method
# will fail.
raise SFCTLInternalException(str.format(
'Invalid sfctl version {0} provided for check against cluster version {1}.',
sfctl_version,
cluster_version))