Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def view_all_runs(status, date_from, date_to, pipeline, parent_id, find, top):
runs_table = prettytable.PrettyTable()
runs_table.field_names = ["RunID", "Parent RunID", "Pipeline", "Version", "Status", "Started"]
runs_table.align = "r"
try:
statuses = []
if status is not None:
if status.upper() != 'ANY':
for status_value in status.split(','):
statuses.append(status_value.upper())
else:
statuses.append('RUNNING')
pipeline_id = None
pipeline_version_name = None
if pipeline is not None:
pipeline_name_parts = pipeline.split('@')
pipeline_model = Pipeline.get(pipeline_name_parts[0])
pipeline_id = pipeline_model.identifier
keys = []
for key in node_model.allocatable.keys():
if key not in keys:
keys.append(key)
for key in node_model.capacity.keys():
if key not in keys:
keys.append(key)
for key in keys:
ac_table.add_row([key, node_model.allocatable.get(key, ''), node_model.capacity.get(key, '')])
click.echo(ac_table)
click.echo()
if len(node_model.pods) > 0:
echo_title("Jobs:", line=False)
if len(node_model.pods) > 0:
pods_table = prettytable.PrettyTable()
pods_table.field_names = ["Name", "Namespace", "Status"]
pods_table.align = "l"
for pod in node_model.pods:
pods_table.add_row([pod.name, pod.namespace, state_utilities.color_state(pod.phase)])
click.echo(pods_table)
else:
click.echo('No jobs are available')
click.echo()
except ConfigNotFoundError as config_not_found_error:
click.echo(str(config_not_found_error), err=True)
except requests.exceptions.RequestException as http_error:
click.echo('Http error: {}'.format(str(http_error)), err=True)
except RuntimeError as runtime_error:
click.echo('Error: {}'.format(str(runtime_error)), err=True)
except ValueError as value_error:
click.echo('Error: {}'.format(str(value_error)), err=True)
if key.lower() == 'node-role.kubernetes.io/master':
table.add_row([key, click.style(value, fg='blue')])
elif key.lower() == 'kubeadm.alpha.kubernetes.io/role' and value.lower() == 'master':
table.add_row([key, click.style(value, fg='blue')])
elif key.lower() == 'cloud-pipeline/role' and value.lower() == 'edge':
table.add_row([key, click.style(value, fg='blue')])
elif key.lower() == 'runid':
table.add_row([key, click.style(value, fg='green')])
else:
table.add_row([key, value])
echo_title('Labels:')
click.echo(table)
click.echo()
if node_model.allocatable is not None or node_model.capacity is not None:
ac_table = prettytable.PrettyTable()
ac_table.field_names = ["", "Allocatable", "Capacity"]
ac_table.align = "l"
keys = []
for key in node_model.allocatable.keys():
if key not in keys:
keys.append(key)
for key in node_model.capacity.keys():
if key not in keys:
keys.append(key)
for key in keys:
ac_table.add_row([key, node_model.allocatable.get(key, ''), node_model.capacity.get(key, '')])
click.echo(ac_table)
click.echo()
if len(node_model.pods) > 0:
echo_title("Jobs:", line=False)
def generate_table(self, rows):
"""
Generates from a list of rows a PrettyTable object.
"""
table = PrettyTable(**self.kwargs)
for row in self.rows:
if len(row[0]) < self.max_row_width:
appends = self.max_row_width - len(row[0])
for i in range(1, appends):
row[0].append("-")
if row[1] is True:
self.make_fields_unique(row[0])
table.field_names = row[0]
else:
table.add_row(row[0])
return table
elif self.display_format=='flat': # Display First record on same row
full_headers = ['E#', 'R#', 'Experiment']+header_names
rows = []
for i, (exp_id, record_ids) in enumerate(exp_record_dict.items()):
if len(record_ids)==0:
rows.append([str(i), '', exp_id, ''] + ['-']*(len(headers)-1))
else:
for j, record_id in enumerate(record_ids):
rows.append([str(i) if j==0 else '', j, exp_id if j==0 else '']+row_func(record_id, headers, raise_display_errors=self.raise_display_errors, truncate_to=self.truncate_result_to, ignore_valid_keys=self.ignore_valid_keys))
assert len(rows[0])==len(full_headers)
rows, full_headers = remove_notes_if_no_notes(rows, full_headers)
if self.table_package == 'pretty_table':
from prettytable.prettytable import PrettyTable
table = str(PrettyTable(rows, field_names=full_headers, align='l', max_table_width=self.max_width))
elif self.table_package == 'tabulate':
table = tabulate(rows, headers=full_headers)
else:
raise NotImplementedError(self.table_package)
else:
raise NotImplementedError(self.display_format)
return table
def __getitem__(self, index):
new = PrettyTable()
new.field_names = self.field_names
for attr in self._options:
setattr(new, "_" + attr, getattr(self, "_" + attr))
setattr(new, "_align", getattr(self, "_align"))
if isinstance(index, slice):
for row in self._rows[index]:
new.add_row(row)
elif isinstance(index, int):
new.add_row(self._rows[index])
else:
raise Exception("Index %s is invalid, must be an integer or slice" % str(index))
return new