Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from datetime import datetime
from bson import ObjectId
from mongoengine import Document, EmbeddedDocument, fields
class Vote(EmbeddedDocument):
ip_address = fields.StringField(unique=True)
name = fields.StringField()
date = fields.DateTimeField(default=datetime.now)
class Comment(EmbeddedDocument):
id = fields.ObjectIdField(unique=True, default=ObjectId)
text = fields.StringField()
date = fields.DateTimeField(default=datetime.now)
email = fields.EmailField()
upvotes = fields.ListField(fields.EmbeddedDocumentField(Vote))
class Article(Document):
title = fields.StringField(unique=True)
text = fields.StringField()
comments = fields.ListField(fields.EmbeddedDocumentField(Comment))
top_comment = fields.EmbeddedDocumentField(Comment)
tags = fields.ListField(fields.StringField())
class SlowOp(Document):
meta = {
"collection": "noc.slowops",
"strict": False,
"auto_create_index": False,
"indexes": [{"fields": ["expire"], "expireAfterSeconds": 0}],
}
STATUS_RUNNING = "R"
STATUS_COMPLETE = "C"
STATUS_FAILED = "F"
SLOW_TIMEOUT = 5.0
SLOW_TTL = datetime.timedelta(days=1)
ts = DateTimeField()
expire = DateTimeField()
app_id = StringField()
user = StringField()
status = StringField(choices=[STATUS_RUNNING, STATUS_COMPLETE, STATUS_FAILED])
duration = FloatField()
pickled_result = StringField()
pool = concurrent.futures.ThreadPoolExecutor(100)
TimeoutError = concurrent.futures.TimeoutError
@classmethod
def submit(cls, fn, app_id=None, user=None, *args, **kwargs):
def on_complete(f):
logger.debug("Completion slow operation %s", so.id)
if f.exception():
so.status = cls.STATUS_FAILED
from noc.sa.models.managedobject import ManagedObject
from noc.lib import nosql
from noc.lib.dateutils import total_seconds
class ArchivedEvent(document.Document):
"""
"""
meta = {
"collection": "noc.events.archive",
"allow_inheritance": True,
"indexes": ["timestamp", "alarms"]
}
status = "S"
timestamp = fields.DateTimeField(required=True)
managed_object = nosql.ForeignKeyField(ManagedObject, required=True)
event_class = nosql.PlainReferenceField(EventClass, required=True)
start_timestamp = fields.DateTimeField(required=True)
repeats = fields.IntField(required=True)
raw_vars = nosql.RawDictField()
resolved_vars = nosql.RawDictField()
vars = fields.DictField()
log = fields.ListField(nosql.EmbeddedDocumentField(EventLog))
alarms = fields.ListField(nosql.ObjectIdField())
def __unicode__(self):
return u"%s" % self.id
@property
def duration(self):
"""
import mongoengine.signals
import dateutil.parser
## NOC modules
from noc.lib.serialize import json_decode
from noc.support.cp import CPClient
logger = logging.getLogger(__name__)
class Crashinfo(Document):
meta = {
"collection": "noc.crashinfo",
"indexes": [("status", "timestamp")]
}
uuid = UUIDField(unique=True, primary_key=True)
timestamp = DateTimeField(required=True)
status = StringField(
choices=[
("N", "New"),
("r", "Reporting"),
("R", "Reported"),
("X", "Rejected"),
("f", "Fix ready"),
("F", "Fixed")
],
default="N"
)
process = StringField()
branch = StringField()
tip = StringField()
comment = StringField()
priority = StringField(
import datetime
## Third-party modules
from mongoengine.document import Document
from mongoengine.fields import IntField, DateTimeField
class Outage(Document):
meta = {
"collection": "noc.fm.outages",
"allow_inheritance": False,
"indexes": ["object", ("object", "-start")]
}
object = IntField()
start = DateTimeField()
stop = DateTimeField() # None for active outages
def __unicode__(self):
return u"%d" % self.object
@property
def is_active(self):
return self.stop is None
@classmethod
def register_outage(cls, object, status, ts=None):
"""
Change current outage status
:param cls:
:param object: Managed Object
:param status: True - if object is down, False - otherwise
:param ts: Effective event timestamp. None for current time
"""
meta = {
"collection": "noc.links",
"strict": False,
"auto_create_index": False,
"indexes": ["subinterfaces", "linked_objects"],
}
subinterfaces = PlainReferenceListField("inv.SubInterface")
# List of linked objects
linked_objects = ListField(IntField())
# Name of discovery method or "manual"
discovery_method = StringField()
# Timestamp of first discovery
first_discovered = DateTimeField(default=datetime.datetime.now)
# Timestamp of last confirmation
last_seen = DateTimeField()
# L3 path cost
l3_cost = IntField(default=1)
def __str__(self):
return "(%s)" % ", ".join([smart_text(i) for i in self.subinterfaces])
def clean(self):
self.linked_objects = sorted(set(i.managed_object.id for i in self.subinterfaces))
super(L3Link, self).clean()
from noc.core.mongo.fields import ForeignKeyField
from noc.sa.models.managedobject import ManagedObject
@six.python_2_unicode_compatible
class Discovery(Document):
meta = {
"collection": "noc.schedules.inv.discovery",
"strict": False,
"auto_create_index": False,
}
job_class = StringField(db_field="jcls")
schedule = DictField()
ts = DateTimeField(db_field="ts")
last = DateTimeField()
last_success = DateTimeField(db_field="st")
last_duration = FloatField(db_field="ldur")
last_status = StringField(db_field="ls")
status = StringField(db_field="s")
managed_object = ForeignKeyField(ManagedObject, db_field="key")
data = DictField()
traceback = DictField()
runs = IntField()
faults = IntField(db_field="f")
log = ListField()
def __str__(self):
return "%s: %s" % (self.managed_object, self.job_class)
if callable(attr):
return "%s" % attr()
return attr
def pre_save(self, model_instance, add):
value = six.u(self.create_slug(model_instance, add))
setattr(model_instance, self.attname, value)
return value
def get_internal_type(self):
return "SlugField"
class CreationDateTimeField(DateTimeField):
"""
CreationDateTimeField
By default, sets editable=False, blank=True, default=datetime.now
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault('default', datetime.datetime.now)
DateTimeField.__init__(self, *args, **kwargs)
def get_internal_type(self):
return "DateTimeField"
class ModificationDateTimeField(CreationDateTimeField):
"""
enable_container = BooleanField()
enable_link = BooleanField()
enable_managedobject = BooleanField()
enable_managedobjectprofile = BooleanField()
enable_networksegment = BooleanField()
enable_networksegmentprofile = BooleanField()
enable_service = BooleanField()
enable_serviceprofile = BooleanField()
enable_subscriber = BooleanField()
enable_resourcegroup = BooleanField()
enable_ttsystem = BooleanField()
# Usage statistics
last_extract = DateTimeField()
last_successful_extract = DateTimeField()
extract_error = StringField()
last_load = DateTimeField()
last_successful_load = DateTimeField()
load_error = StringField()
_id_cache = cachetools.TTLCache(maxsize=100, ttl=60)
def __str__(self):
return self.name
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_id_cache"), lock=lambda _: id_lock)
def get_by_id(cls, id):
return RemoteSystem.objects.filter(id=id).first()
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_id_cache"), lock=lambda _: id_lock)
def get_by_name(cls, name):
file_name = StringField(unique=True)
# Subscription is active
is_active = BooleanField(default=True)
# email subject
subject = StringField()
# Run report as user
run_as = ForeignKeyField(User)
# Send result to notification group
# If empty, only file will be written
notification_group = ForeignKeyField(NotificationGroup)
# Predefined report id
# :
report = StringField()
#
last_status = BooleanField(default=False)
last_run = DateTimeField()
PREFIX = "var/reports"
JCLS = "noc.main.models.reportsubscription.ReportJob"
class RequestStub(object):
def __init__(self, user):
self.user = user
@classmethod
def send_reports(cls):
"""
Calculate and send all reports for today
:return:
"""
subscriptions = list(ReportSubscription.objects.filter(is_active=True))
for s in subscriptions: