Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def delete(self, ci_id):
ci = db.session.query(CI).filter(CI.ci_id == ci_id).first()
if ci is not None:
attrs = db.session.query(CITypeAttribute.attr_id).filter(
CITypeAttribute.type_id == ci.type_id).all()
attr_names = []
for attr in attrs:
attr_names.append(CIAttributeCache.get(attr.attr_id).attr_name)
attr_names = set(attr_names)
for attr_name in attr_names:
Table = TableMap(attr_name=attr_name).table
db.session.query(Table).filter(Table.ci_id == ci_id).delete()
db.session.query(CIRelation).filter(
CIRelation.first_ci_id == ci_id).delete()
db.session.query(CIRelation).filter(
CIRelation.second_ci_id == ci_id).delete()
# db.session.query(CIAttributeHistory).filter(
# CIAttributeHistory.ci_id == ci_id).delete()
db.session.flush()
db.session.delete(ci)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
current_app.logger.error("delete CI error, {0}".format(str(e)))
return abort(400, "delete CI error, {0}".format(str(e)))
# todo: write history
def delete(self, attr_id):
attr, name = db.session.query(CIAttribute).filter_by(
attr_id=attr_id).first(), None
if attr:
if attr.is_choice:
choice_table = type_map["choice"].get(attr.value_type)
db.session.query(choice_table).filter(
choice_table.attr_id == attr_id).delete()
db.session.flush()
name = attr.attr_name
CIAttributeCache.clean(attr)
db.session.delete(attr)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
current_app.logger.error("delete attribute error, {0}".format(
str(e)))
return abort(500, str(e))
else:
return abort(404, "attribute you want to delete is not existed")
return name
def add_heartbeat(self, ci_type, unique):
ci_type = CITypeCache.get(ci_type)
if not ci_type:
return 'error'
uniq_key = CIAttributeCache.get(ci_type.uniq_id)
Table = TableMap(attr_name=uniq_key.attr_name).table
ci_id = db.session.query(Table.ci_id).filter(
Table.attr_id == uniq_key.attr_id).filter(
Table.value == unique).first()
if ci_id is None:
return 'error'
ci = db.session.query(CI).filter(CI.ci_id == ci_id.ci_id).first()
if ci is None:
return 'error'
ci.heartbeat = datetime.datetime.now()
db.session.add(ci)
db.session.commit()
return "ok"
def _get_choice_value(self, attr_id, value_type):
_table = type_map.get("choice").get(value_type)
choice_values = db.session.query(_table.value).filter(
_table.attr_id == attr_id).all()
return [choice_value.value for choice_value in choice_values]
def delete(self, ci_id):
ci = db.session.query(CI).filter(CI.ci_id == ci_id).first()
if ci is not None:
attrs = db.session.query(CITypeAttribute.attr_id).filter(
CITypeAttribute.type_id == ci.type_id).all()
attr_names = []
for attr in attrs:
attr_names.append(CIAttributeCache.get(attr.attr_id).attr_name)
attr_names = set(attr_names)
for attr_name in attr_names:
Table = TableMap(attr_name=attr_name).table
db.session.query(Table).filter(Table.ci_id == ci_id).delete()
db.session.query(CIRelation).filter(
CIRelation.first_ci_id == ci_id).delete()
db.session.query(CIRelation).filter(
CIRelation.second_ci_id == ci_id).delete()
# db.session.query(CIAttributeHistory).filter(
# CIAttributeHistory.ci_id == ci_id).delete()
ret_key="name",
uniq_key=None,
use_master=False):
res = dict()
for field in fields:
attr = CIAttributeCache.get(field)
if not attr:
current_app.logger.warn('attribute %s not found' % field)
return res
table = TableMap(attr_name=attr.attr_name).table
if use_master:
rs = db.session().using_bind("master").query(
table.value).filter_by(ci_id=ci_id).filter_by(
attr_id=attr.attr_id)
else:
rs = db.session.query(table.value).filter_by(
ci_id=ci_id).filter_by(attr_id=attr.attr_id)
field_name = getattr(attr, "attr_{0}".format(ret_key))
try:
if attr.is_multivalue:
if attr.value_type == 'datetime':
res[field_name] = [datetime.datetime.strftime(
x.value, '%Y-%m-%d %H:%M:%S') for x in rs.all()]
else:
res[field_name] = [x.value for x in rs.all()]
else:
x = rs.first()
if x:
if attr.value_type == 'datetime':
res[field_name] = datetime.datetime.strftime(
rs.first().value, '%Y-%m-%d %H:%M:%S')
else:
def get_children(self, ci_id, ret_key='name', relation_type="contain"):
second_cis = db.session.query(CIRelation.second_ci_id).filter(
CIRelation.first_ci_id == ci_id).filter(or_(
CIRelation.relation_type == relation_type,
CIRelation.relation_type == "deploy"))
second_ci_ids = (second_ci.second_ci_id for second_ci in second_cis)
ci_types = {}
for ci_id in second_ci_ids:
type_id = db.session.query(CI.type_id).filter(
CI.ci_id == ci_id).first().type_id
if type_id not in ci_types:
ci_types[type_id] = [ci_id]
else:
ci_types[type_id].append(ci_id)
res = {}
for type_id in ci_types:
ci_type = CITypeCache.get(type_id)
children = get_cis_by_ids(map(str, ci_types.get(type_id)),
def get_citypes(self, type_name=None):
ci_types = db.session.query(CIType).all() if type_name is None else \
db.session.query(CIType).filter(
CIType.type_name.ilike("%{0}%".format(type_name))).all()
res = list()
for ci_type in ci_types:
type_dict = row2dict(ci_type)
type_dict["uniq_key"] = CIAttributeCache.get(
type_dict["uniq_id"]).attr_name
res.append(type_dict)
return res
def get_attributes(self, name=None):
"""
return attribute by name,
if name is None, then return all attributes
"""
attrs = db.session.query(CIAttribute).filter(
CIAttribute.attr_name.ilike("%{0}%".format(name))).all() \
if name is not None else db.session.query(CIAttribute).all()
res = list()
for attr in attrs:
attr_dict = row2dict(attr)
if attr.is_choice:
attr_dict["choice_value"] = self._get_choice_value(
attr.attr_id, attr.value_type)
res.append(attr_dict)
return res
def get(cls, key):
if key is None:
return
ct = cache.get("CIType::ID::%s" % key) or \
cache.get("CIType::Name::%s" % key)
if ct is None:
ct = db.session.query(CIType).filter(
CIType.type_name == key).first() or \
db.session.query(CIType).filter(CIType.type_id == key).first()
if ct is not None:
CITypeCache.set(ct)
return ct