Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_nondefault_date_created_field(self):
# redefine to get entirely new class
class SimpleDoc(Document):
a = StringField()
b = IntField()
sett = SETTINGS.copy()
sett['DATE_CREATED'] = 'created_at'
app = Eve(settings=sett)
app.debug = True
ext = EveMongoengine(app)
ext.add_model(SimpleDoc)
app = app.test_client()
self._test_default_values(app, SimpleDoc, created_name='created_at')
"""Test schema."""
email = db.EmailField(primary_key=True)
class Seller(EmbeddedDocument):
"""Test schema."""
name = db.StringField()
address = db.EmbeddedDocumentField(Address)
class ArticleMetaData(EmbeddedDocument):
"""Test schema."""
price = db.IntField()
seller = db.EmbeddedDocumentField(Seller)
class Article(Document):
"""Test schema."""
user = db.ReferenceField(User)
addition = db.EmbeddedDocumentField(ArticleMetaData)
title = db.StringField()
date = db.DateTimeField()
body = db.BinaryField()
uuid = db.UUIDField()
class ExtraInformation(db.EmbeddedDocument):
"""Extra information."""
meta = {
'indexes': ['md5']
}
job_id = mongoengine.StringField(required=True,
max_length=MD5_LENGTH,
min_length=MD5_LENGTH,
help_text="ID of parent Job.")
target = mongoengine.StringField(required=True,
help_text="Target URL or filesystem path")
md5 = mongoengine.StringField(max_length=MD5_LENGTH,
min_length=MD5_LENGTH,
help_text="MD5 hexdigest of target.")
title = mongoengine.StringField(help_text="Processed page title.")
size = mongoengine.IntField(min_value=0,
help_text="Size of page in bytes.")
images = mongoengine.IntField(min_value=0,
help_text="Number of images on the page.")
timestamp = mongoengine.DateTimeField(help_text="End time of task.")
status = mongoengine.StringField(choices=STATUSES,
default=QUEUED,
help_text="Job status.")
@classmethod
def create(cls, job_id, target):
"""Create a new task for the passed in target.
:param job_id: The string ID of the parent job instance
:param target: The target URL or filesystem path
:returns: ``Task`` instance
"""
import uuid
import mongoengine
from nosql.engine import Engine
from nosql.service_record import ServiceRecord
class Car(mongoengine.Document):
model = mongoengine.StringField(required=True)
make = mongoengine.StringField(required=True)
year = mongoengine.IntField(required=True)
mileage = mongoengine.IntField(default=0)
vi_number = mongoengine.StringField(default=lambda: str(uuid.uuid4()).replace("-", ''))
engine = mongoengine.EmbeddedDocumentField(Engine, required=True)
service_history = mongoengine.EmbeddedDocumentListField(ServiceRecord)
# no need to reference owners here, that is entirely contained in owner class
meta = {
'db_alias': 'core',
'collection': 'cars',
'indexes': [
'mileage',
'year',
'service_history.price',
'service_history.customer_rating',
# from django.db import models
from mongoengine import Document
from mongoengine import DictField
from mongoengine import IntField
from mongoengine import DateTimeField
# Percentage of individual and collective agents
class PercentIndividualAndCollectiveAgent(Document):
_totalIndividualAgent = IntField(required=True)
_totalCollectiveAgent = IntField(required=True)
_totalAgents = IntField(required=True)
_createDate = DateTimeField(required=True)
@property
def total_individual_agent(self):
return self._totalIndividualAgent
@total_individual_agent.setter
def total_individual_agent(self, number):
self._totalIndividualAgent = number
@property
def total_collective_agent(self):
return self._totalCollectiveAgent
@total_collective_agent.setter
import mongoengine as me
class CourseOffering(me.Document):
# id = me.ObjectIdField(primary_key=True)
course_id = me.StringField(required=True, unique_with=['term_id', 'section_id'])
term_id = me.StringField(required=True)
# TODO(mack): maybe should be list with LEC, TUT, LAB options?
# eg. LEC001 or just 001?
section_id = me.IntField(required=True)
# eg. mc2045, distance_ed
location_id = me.StringField()
def __unicode__(self):
return "%s unsubscribed from %s on %s" % (self.user_id, self.email_type, self.date)
@classmethod
def user(cls, user_id):
unsubs = cls.objects(user_id=user_id)
return unsubs
@classmethod
def unsubscribe(cls, user_id, email_type):
cls.objects.create()
class MSentEmail(mongo.Document):
sending_user_id = mongo.IntField()
receiver_user_id = mongo.IntField()
email_type = mongo.StringField()
date_sent = mongo.DateTimeField(default=datetime.datetime.now)
meta = {
'collection': 'sent_emails',
'allow_inheritance': False,
'indexes': ['sending_user_id', 'receiver_user_id', 'email_type'],
}
def __unicode__(self):
return "%s sent %s email to %s" % (self.sending_user_id, self.email_type, self.receiver_user_id)
@classmethod
def record(cls, email_type, receiver_user_id, sending_user_id=None):
cls.objects.create(email_type=email_type,
end_timestamp = db_field_types.ComplexDateTimeField()
meta = {'indexes': [{'fields': ['action_execution']}]}
class TaskExecutionDB(stormbase.StormFoundationDB, stormbase.ChangeRevisionFieldMixin):
RESOURCE_TYPE = types.ResourceType.EXECUTION
workflow_execution = me.StringField(required=True)
task_name = me.StringField(required=True)
task_id = me.StringField(required=True)
task_route = me.IntField(required=True, min_value=0)
task_spec = stormbase.EscapedDictField()
delay = me.IntField(min_value=0)
itemized = me.BooleanField(default=False)
items_count = me.IntField(min_value=0)
items_concurrency = me.IntField(min_value=1)
context = stormbase.EscapedDictField()
status = me.StringField(required=True)
result = stormbase.EscapedDictField()
start_timestamp = db_field_types.ComplexDateTimeField(default=date_utils.get_datetime_utc_now)
end_timestamp = db_field_types.ComplexDateTimeField()
meta = {
'indexes': [
{'fields': ['workflow_execution']},
{'fields': ['task_id']},
{'fields': ['task_id', 'task_route']},
{'fields': ['workflow_execution', 'task_id']},
{'fields': ['workflow_execution', 'task_id', 'task_route']},
]
}
import datetime
import mongoengine
class BaseModel(mongoengine.Document):
meta = {
'allow_inheritance': True,
'indexes': [
'user_id'
]
}
user_id = mongoengine.IntField()
name = mongoengine.StringField(required=True)
model_path = mongoengine.StringField(required=True)
data = mongoengine.BinaryField()
created_at = mongoengine.DateTimeField()
updated_at = mongoengine.DateTimeField()
def save(self, *args, **kwargs):
if not self.created_at:
self.created_at = datetime.datetime.now()
self.updated_at = datetime.datetime.now()
return super(BaseModel, self).save(*args, **kwargs)
year_totals = {}
years = datetime.datetime.now().year - 2009
for y in reversed(range(years)):
now = datetime.datetime.now()
start_date = datetime.datetime(now.year, 1, 1) - dateutil.relativedelta.relativedelta(years=y)
end_date = now - dateutil.relativedelta.relativedelta(years=y)
if end_date > now: end_date = now
year_totals[now.year - y] = _counter(start_date, end_date)
total = cls.objects.all().aggregate(sum=Sum('payment_amount'))
print "\nTotal: $%s" % total['sum']
class MGiftCode(mongo.Document):
gifting_user_id = mongo.IntField()
receiving_user_id = mongo.IntField()
gift_code = mongo.StringField(max_length=12)
duration_days = mongo.IntField()
payment_amount = mongo.IntField()
created_date = mongo.DateTimeField(default=datetime.datetime.now)
meta = {
'collection': 'gift_codes',
'allow_inheritance': False,
'indexes': ['gifting_user_id', 'receiving_user_id', 'created_date'],
}
def __unicode__(self):
return "%s gifted %s on %s: %s (redeemed %s times)" % (self.gifting_user_id, self.receiving_user_id, self.created_date, self.gift_code, self.redeemed)
@property
def redeemed(self):