Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
DepotManager.configure('default', {'depot.storage_path': './lfs'})
DepotManager.configure('another', {'depot.storage_path': './lfs'})
DepotManager.alias('another_alias', 'another')
DepotManager.make_middleware(None)
def teardown():
shutil.rmtree('./lfs', ignore_errors=True)
class Document(MappedClass):
class __mongometa__:
session = DBSession
name = 'depot_test_document'
_id = FieldProperty(s.ObjectId)
name = FieldProperty(str)
content = UploadedFileProperty()
photo = UploadedFileProperty(upload_type=UploadedImageWithThumb)
second_photo = UploadedFileProperty(filters=(WithThumbnailFilter((12, 12), 'PNG'),))
targeted_content = UploadedFileProperty(upload_storage='another_alias')
class TestMingAttachments(object):
def __init__(self):
self.file_content = b'this is the file content'
self.fake_file = tempfile.NamedTemporaryFile()
self.fake_file.write(self.file_content)
self.fake_file.flush()
def setup(self):
clear_database()
from ming import collection, Field
from ming import schema as S
from ming.odm import RelationProperty, ForeignIdProperty, Mapper
from ming.utils import LazyProperty
from .m_base import ModelBase
from .m_session import doc_session, orm_session
log = logging.getLogger(__name__)
client = collection(
'chef.client', doc_session,
Field('_id', S.ObjectId()),
Field('name', str, unique=True),
Field('account_id', S.ObjectId()),
Field('raw_public_key', S.Binary()),
Field('is_validator', bool, if_missing=False),
Field('admin', bool, if_missing=False))
class Client(ModelBase):
@property
def __name__(self):
return self.name
def allow_access(self, client, permission):
if permission == 'delete':
return (
client.admin
and client is not self
and not self.is_validator)
SUPPORTED_BY_PIL=set([
'image/jpg',
'image/jpeg',
'image/pjpeg',
'image/png',
'image/x-png',
'image/gif'])
class File(MappedClass):
class __mongometa__:
session = project_orm_session
name = 'fs'
indexes = [ 'filename' ]
_id = FieldProperty(schema.ObjectId)
file_id = FieldProperty(schema.ObjectId)
filename=FieldProperty(str, if_missing='unknown')
content_type=FieldProperty(str)
def __init__(self, **kw):
super(File, self).__init__(**kw)
if self.content_type is None:
self.content_type = utils.guess_mime_type(self.filename)
@classmethod
def _fs(cls):
return GridFS(
session(cls).impl.db,
cls._root_collection())
@classmethod
def _root_collection(cls):
unique_indexes = [ 'shortname' ]
_id=FieldProperty(S.ObjectId)
shortname=FieldProperty(str)
fullname=FieldProperty(str)
organization_type=FieldProperty(S.OneOf(
'For-profit business',
'Foundation or other non-profit organization',
'Research and/or education institution'))
description=FieldProperty(str)
headquarters=FieldProperty(str)
dimension=FieldProperty(
S.OneOf('Small', 'Medium', 'Large', 'Unknown'),
if_missing = 'Unknown')
website=FieldProperty(str)
workfields=FieldProperty([S.ObjectId])
created=FieldProperty(S.DateTime, if_missing=datetime.utcnow())
memberships=RelationProperty('Membership')
project_involvements=RelationProperty('ProjectInvolvement')
def url(self):
return ('/o/' + self.shortname.replace('_', '-') + '/').encode('ascii','ignore')
def project(self):
return M.Project.query.get(
shortname='o/'+self.shortname.replace('_', '-'))
@classmethod
def register(cls, shortname, fullname, orgtype, user):
o=cls.query.get(shortname=shortname)
if o is not None: return None
class Post(Message, VersionedArtifact, ActivityObject, ReactableArtifact):
class __mongometa__:
name = 'post'
history_class = PostHistory
indexes = [
# used in general lookups, last_post, etc
('discussion_id', 'status', 'timestamp'),
'thread_id'
]
type_s = 'Post'
thread_id = ForeignIdProperty(Thread)
discussion_id = ForeignIdProperty(Discussion)
subject = FieldProperty(schema.Deprecated)
status = FieldProperty(schema.OneOf('ok', 'pending', 'spam',
if_missing='pending'))
last_edit_date = FieldProperty(datetime, if_missing=None)
last_edit_by_id = AlluraUserProperty()
edit_count = FieldProperty(int, if_missing=0)
spam_check_id = FieldProperty(str, if_missing='')
text_cache = FieldProperty(MarkdownCache)
# meta comment - system generated, describes changes to an artifact
is_meta = FieldProperty(bool, if_missing=False)
thread = RelationProperty(Thread)
discussion = RelationProperty(Discussion)
def __json__(self):
author = self.author()
return dict(
css = FieldProperty(str, if_missing='')
homepage = FieldProperty(str, if_missing='')
homepage_cache = FieldProperty(MarkdownCache)
redirect = FieldProperty(str, if_missing='')
projects = RelationProperty('Project')
allow_browse = FieldProperty(bool, if_missing=True)
show_title = FieldProperty(bool, if_missing=True)
site_specific_html = FieldProperty(str, if_missing='')
project_template = FieldProperty(str, if_missing='')
tracking_id = FieldProperty(str, if_missing='')
project_list_url = FieldProperty(str, if_missing='')
level = FieldProperty(S.Deprecated)
allow_private = FieldProperty(S.Deprecated)
features = FieldProperty(dict(
private_projects=bool,
max_projects=S.Int,
css=str,
google_analytics=bool))
anchored_tools = FieldProperty(str, if_missing='')
prohibited_tools = FieldProperty(str, if_missing='')
use_wiki_page_as_root = FieldProperty(bool, if_missing=False)
def parent_security_context(self):
return None
@LazyProperty
def neighborhood_project(self):
from .project import Project
p = Project.query.get(
neighborhood_id=self._id,
is_nbhd_project=True)
assert p
def deny(cls, role_id, permission, reason=None):
ace = Object(
access=cls.DENY,
reason=reason,
role_id=role_id,
permission=permission)
return ace
@classmethod
def match(cls, ace, role_id, permission):
return (
ace.role_id in (role_id, EVERYONE)
and ace.permission in (permission, ALL_PERMISSIONS))
class ACL(S.Array):
'''
Access Control List. Is an array of :class:`ACE`
'''
def __init__(self, permissions=None, **kwargs):
super(ACL, self).__init__(
field_type=ACE(permissions), **kwargs)
@classmethod
def contains(cls, ace, acl):
"""Test membership of ace in acl ignoring ace.reason field.
Return actual ACE with reason filled if ace is found in acl, None otherwise
e.g. `ACL.contains(ace, acl)` will return `{role_id=ObjectId(...), permission='read', access='DENY', reason='Spammer'}`
with following vars:
def index(self):
result = Snapshot.index(self)
result.update(
title_s='Version %d of %s' % (
self.version,self.original().summary),
type_s='Ticket Snapshot',
text=self.data.summary)
return result
class Bin(Artifact):
class __mongometa__:
name = 'bin'
type_s = 'Bin'
_id = FieldProperty(schema.ObjectId)
summary = FieldProperty(str, required=True, allow_none=False)
terms = FieldProperty(str, if_missing='')
sort = FieldProperty(str, if_missing='')
def url(self):
base = self.app_config.url() + 'search/?'
params = dict(q=(self.terms or ''))
if self.sort:
params['sort'] = self.sort
return base + urllib.urlencode(params)
def shorthand_id(self):
return self.summary
def index(self):
result = Artifact.index(self)