Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_size(self):
"""Test :func:`humanfriendly.parse_size()`."""
self.assertEqual(0, humanfriendly.parse_size('0B'))
self.assertEqual(42, humanfriendly.parse_size('42'))
self.assertEqual(42, humanfriendly.parse_size('42B'))
self.assertEqual(1000, humanfriendly.parse_size('1k'))
self.assertEqual(1024, humanfriendly.parse_size('1k', binary=True))
self.assertEqual(1000, humanfriendly.parse_size('1 KB'))
self.assertEqual(1000, humanfriendly.parse_size('1 kilobyte'))
self.assertEqual(1024, humanfriendly.parse_size('1 kilobyte', binary=True))
self.assertEqual(1000 ** 2 * 69, humanfriendly.parse_size('69 MB'))
self.assertEqual(1000 ** 3, humanfriendly.parse_size('1 GB'))
self.assertEqual(1000 ** 4, humanfriendly.parse_size('1 TB'))
self.assertEqual(1000 ** 5, humanfriendly.parse_size('1 PB'))
self.assertEqual(1000 ** 6, humanfriendly.parse_size('1 EB'))
self.assertEqual(1000 ** 7, humanfriendly.parse_size('1 ZB'))
self.assertEqual(1000 ** 8, humanfriendly.parse_size('1 YB'))
self.assertEqual(1000 ** 3 * 1.5, humanfriendly.parse_size('1.5 GB'))
self.assertEqual(1024 ** 8 * 1.5, humanfriendly.parse_size('1.5 YiB'))
self.assertRaises(humanfriendly.InvalidSize, humanfriendly.parse_size, '1q')
self.assertRaises(humanfriendly.InvalidSize, humanfriendly.parse_size, 'a')
def test_parse_size(self):
"""Test :func:`humanfriendly.parse_size()`."""
self.assertEqual(0, humanfriendly.parse_size('0B'))
self.assertEqual(42, humanfriendly.parse_size('42'))
self.assertEqual(42, humanfriendly.parse_size('42B'))
self.assertEqual(1000, humanfriendly.parse_size('1k'))
self.assertEqual(1024, humanfriendly.parse_size('1k', binary=True))
self.assertEqual(1000, humanfriendly.parse_size('1 KB'))
self.assertEqual(1000, humanfriendly.parse_size('1 kilobyte'))
self.assertEqual(1024, humanfriendly.parse_size('1 kilobyte', binary=True))
self.assertEqual(1000 ** 2 * 69, humanfriendly.parse_size('69 MB'))
self.assertEqual(1000 ** 3, humanfriendly.parse_size('1 GB'))
self.assertEqual(1000 ** 4, humanfriendly.parse_size('1 TB'))
self.assertEqual(1000 ** 5, humanfriendly.parse_size('1 PB'))
self.assertEqual(1000 ** 6, humanfriendly.parse_size('1 EB'))
self.assertEqual(1000 ** 7, humanfriendly.parse_size('1 ZB'))
self.assertEqual(1000 ** 8, humanfriendly.parse_size('1 YB'))
self.assertEqual(1000 ** 3 * 1.5, humanfriendly.parse_size('1.5 GB'))
self.assertEqual(1024 ** 8 * 1.5, humanfriendly.parse_size('1.5 YiB'))
self.assertRaises(humanfriendly.InvalidSize, humanfriendly.parse_size, '1q')
self.assertRaises(humanfriendly.InvalidSize, humanfriendly.parse_size, 'a')
idx2 += 1 # Containers
node_stats.container_count = int(info["SystemStatus"][idx + node + idx2][1].split(' ')[0])
idx2 += 1 # CPUs
node_stats.cores_reserved = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[0])
node_stats.cores_total = int(info["SystemStatus"][idx + node + idx2][1].split(' / ')[1])
idx2 += 1 # Memory
node_stats.memory_reserved = info["SystemStatus"][idx + node + idx2][1].split(' / ')[0]
node_stats.memory_total = info["SystemStatus"][idx + node + idx2][1].split(' / ')[1]
idx2 += 1 # Labels
node_stats.labels = info["SystemStatus"][idx + node + idx2][1].split(', ')
idx2 += 1 # Last update
node_stats.last_update = info["SystemStatus"][idx + node + idx2][1]
idx2 += 1 # Docker version
node_stats.server_version = info["SystemStatus"][idx + node + idx2][1]
node_stats.memory_reserved = humanfriendly.parse_size(node_stats.memory_reserved)
node_stats.memory_total = humanfriendly.parse_size(node_stats.memory_total)
pl_status.nodes.append(node_stats)
idx += idx2
pl_status.timestamp = time.time()
return pl_status
c = ContainerSpec()
c.Image = proc.image
c.Env = copy.deepcopy(proc.env)
c.set_env("TZ", 'Asia/Shanghai')
c.User = '' if not hasattr(proc, 'user') else proc.user
c.WorkingDir = '' if not hasattr(proc, 'working_dir') else proc.working_dir
c.DnsSearch = [] if not hasattr(
proc, 'dns_search') else copy.deepcopy(proc.dns_search)
c.Volumes = copy.deepcopy(proc.volumes)
c.SystemVolumes = copy.deepcopy(
proc.system_volumes) + get_system_volumes_from_etcd(app_name)
c.CloudVolumes = render_cloud_volumes(proc.cloud_volumes)
c.Command = proc.cmd
c.Entrypoint = proc.entrypoint
c.CpuLimit = proc.cpu
c.MemoryLimit = humanfriendly.parse_size(proc.memory)
c.Expose = 0 if not proc.port else proc.port.keys()[0]
c.LogConfig = None
return c
def info(self) -> ClusterStats: # pylint: disable=too-many-locals
"""Retrieve Kubernetes cluster statistics."""
pl_status = ClusterStats()
node_list = pykube.Node.objects(self.api).filter(namespace=pykube.all).iterator()
node_dict = {}
# Get basic information from nodes
for node in node_list:
nss = NodeStats(node.name)
nss.cores_total = float(node.obj['status']['allocatable']['cpu'])
nss.memory_total = humanfriendly.parse_size(node.obj['status']['allocatable']['memory'])
nss.labels = node.obj['metadata']['labels']
nss.status = 'online'
node_dict[str(socket.gethostbyname(node.name))] = nss
# Get information from all running pods, then accumulate to nodes
pod_list = pykube.Pod.objects(self.api).filter(namespace=pykube.all).iterator()
for pod in pod_list:
try:
host_ip = pod.obj['status']['hostIP']
except KeyError:
continue
nss = node_dict[host_ip]
nss.container_count += 1
spec_cont = pod.obj['spec']['containers'][0]
if 'resources' in spec_cont:
if 'requests' in spec_cont['resources']:
def _get_human_friendly_used_space(info):
return parse_size(info['usedSpace'].replace(',', ''))
def human_readable_size(size, binary=True):
if isinstance(size, (int, float)):
num_bytes = size
else:
try:
num_bytes = humanfriendly.parse_size(size)
except Exception:
return "NaN"
is_neg = num_bytes < 0
if is_neg:
num_bytes = abs(num_bytes)
output = humanfriendly.format_size(num_bytes, binary=binary)
if is_neg:
return "- {}".format(output)
return output
help='Path where Communications should be stored')
parser.add_argument('--max-file-size', type=str, default='1GiB',
help="Maximum size of (non-ZIP) files that can be read into memory "
"(e.g. '2G', '300MB')")
args = parser.parse_args()
logging.basicConfig(format='%(asctime)-15s %(levelname)s: %(message)s',
level=args.loglevel.upper())
comm_container = {}
if os.path.isdir(args.fetch_source):
comm_container = DirectoryBackedCommunicationContainer(args.fetch_source)
elif zipfile.is_zipfile(args.fetch_source):
comm_container = ZipFileBackedCommunicationContainer(args.fetch_source)
else:
max_file_size = humanfriendly.parse_size(args.max_file_size, binary=True)
comm_container = MemoryBackedCommunicationContainer(args.fetch_source,
max_file_size=max_file_size)
fetch_handler = CommunicationContainerFetchHandler(comm_container)
store_handler = DirectoryBackedStoreHandler(args.store_path)
logging.info('Fetch endpoint: http://{}:{}/fetch_http_endpoint/'.format(args.host, args.port))
logging.info('Store endpoint: http://{}:{}/store_http_endpoint/'.format(args.host, args.port))
ahs = AccessHTTPServer(args.host, args.port, args.static_path, fetch_handler, store_handler)
ahs.serve()
lambda x: humanfriendly.parse_size(x.strip('(').strip(')').replace('\xa0', ' ')))
idx = size[size == size.min()].index
def print_parsed_size(value):
"""Parse a human readable data size and print the number of bytes."""
output(parse_size(value))