Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_simple_printable_row_string(row, columns):
s = ""
for column in columns:
if hasattr(row, column) and not (type(getattr(row, column))
is ovs.db.data.Atom):
value = getattr(row, column)
if isinstance(value, dict):
value = sorted(value.items())
s += "%s=%s " % (column, value)
s = s.strip()
s = re.sub('""|,|u?\'', "", s)
s = re.sub(r'UUID\(([^)]+)\)', r'\1', s)
s = re.sub('False', 'false', s)
s = re.sub('True', 'true', s)
s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
return s
def atom_from_string(base, value_string, symtab=None):
type_ = base.type
atom = None
if type_ == ovs.db.types.IntegerType:
atom = ovs.db.data.Atom(type_, int(value_string))
elif type_ == ovs.db.types.RealType:
# TODO:XXX negation
atom = ovs.db.data.Atom(
type_, ovs.db.parser.float_to_int(float(value_string)))
elif type_ == ovs.db.types.BooleanType:
if value_string in ("true", "yes", "on", "1"):
atom = ovs.db.data.Atom(type_, True)
elif value_string == ("false", "no", "off", "0"):
atom = ovs.db.data.Atom(type_, False)
elif type_ == ovs.db.types.StringType:
# TODO:XXXX escape: if value_string[0] == '"':
atom = ovs.db.data.Atom(type_, value_string)
elif type_ == ovs.db.types.UuidType:
if value_string[0] == "@":
assert symtab is not None
for uuid_string, row_update in table_update.iteritems():
if not ovs.ovsuuid.is_valid_string(uuid_string):
raise error.Error(' for table "%s" '
'contains bad UUID "%s" as member '
'name' % (table_name, uuid_string),
table_update)
uuid = ovs.ovsuuid.from_string(uuid_string)
if type(row_update) != dict:
raise error.Error(' for table "%s" '
'contains for %s that '
'is not an object'
% (table_name, uuid_string))
parser = ovs.db.parser.Parser(row_update, "row-update")
old = parser.get_optional("old", [dict])
new = parser.get_optional("new", [dict])
parser.finish()
if not old and not new:
raise error.Error(' missing "old" and '
'"new" members', row_update)
if self.__process_update(table, uuid, old, new):
self.change_seqno += 1
def __create_row(self, table, uuid):
data = {}
for column in table.columns.itervalues():
data[column.name] = ovs.db.data.Datum.default(column.type)
row = table.rows[uuid] = Row(self, table, uuid, data)
return row
column_schema = ovsrec_row._table.columns[column]
if key is not None:
value_json = ['map', [[key, value_json]]]
if column_schema.type.value.type == ovs.db.types.VoidType:
vsctl_fatal('cannot specify key to set for non-map column %s' %
column)
datum = ovs.db.data.Datum.from_json(column_schema.type, value_json,
self.symtab)
values = getattr(ovsrec_row, column, {})
values.update(datum.to_python(ovs.db.idl._uuid_to_row))
setattr(ovsrec_row, column, values)
else:
datum = ovs.db.data.Datum.from_json(column_schema.type, value_json,
self.symtab)
setattr(ovsrec_row, column,
datum.to_python(ovs.db.idl._uuid_to_row))
def atom_from_string(base, value_string, symtab=None):
type_ = base.type
atom = None
if type_ == ovs.db.types.IntegerType:
atom = ovs.db.data.Atom(type_, int(value_string))
elif type_ == ovs.db.types.RealType:
# TODO:XXX negation
atom = ovs.db.data.Atom(
type_, ovs.db.parser.float_to_int(float(value_string)))
elif type_ == ovs.db.types.BooleanType:
if value_string in ("true", "yes", "on", "1"):
atom = ovs.db.data.Atom(type_, True)
elif value_string == ("false", "no", "off", "0"):
atom = ovs.db.data.Atom(type_, False)
elif type_ == ovs.db.types.StringType:
# TODO:XXXX escape: if value_string[0] == '"':
atom = ovs.db.data.Atom(type_, value_string)
elif type_ == ovs.db.types.UuidType:
if value_string[0] == "@":
assert symtab is not None
def __column_name(self, column):
if column.type.key.type == ovs.db.types.UuidType:
return ovs.ovsuuid.to_json(column.type.key.type.default)
else:
return column.type.key.type.default
if type(name) != list and name == record_id:
if (referrer):
vsctl_fatal('multiple rows in %s match "%s"' %
(table_name, record_id))
referrer = ovsrec_row
if not referrer:
return None
final = None
if vsctl_row_id.uuid_column:
referrer.verify(vsctl_row_id.uuid_column)
uuid = getattr(referrer, vsctl_row_id.uuid_column)
uuid_ = referrer._data[vsctl_row_id.uuid_column]
assert uuid_.type.key.type == ovs.db.types.UuidType
assert uuid_.type.value is None
assert type(uuid) == list
if len(uuid) == 1:
final = uuid[0]
else:
final = referrer
return final
def from_json(json, symtab=None):
try:
s = ovs.db.parser.unwrap_json(json, "uuid", six.string_types, "string")
if not uuidRE.match(s):
raise error.Error("\"%s\" is not a valid UUID" % s, json)
return uuid.UUID(s)
except error.Error as e:
if not symtab:
raise e
try:
name = ovs.db.parser.unwrap_json(json, "named-uuid",
six.string_types, "string")
except error.Error:
raise e
if name not in symtab:
symtab[name] = uuid.uuid4()
return symtab[name]