Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from .types import MarkdownCache
log = logging.getLogger(__name__)
class Discussion(Artifact, ActivityObject):
class __mongometa__:
name = 'discussion'
type_s = 'Discussion'
parent_id = FieldProperty(schema.Deprecated)
shortname = FieldProperty(str)
name = FieldProperty(str)
description = FieldProperty(str, if_missing='')
description_cache = FieldProperty(MarkdownCache)
num_topics = FieldProperty(int, if_missing=0)
num_posts = FieldProperty(int, if_missing=0)
subscriptions = FieldProperty({str: bool})
threads = RelationProperty('Thread', via='discussion_id')
posts = RelationProperty('Post', via='discussion_id')
def __json__(self, limit=None, posts_limit=None, is_export=False):
return dict(
_id=str(self._id),
shortname=self.shortname,
name=self.name,
description=self.description,
threads=[t.__json__(limit=posts_limit, is_export=is_export) for t
in self.thread_class().query.find(dict(discussion_id=self._id)).limit(limit or 0)]
)
def delete(self):
try:
if os.path.exists(self.repo_dir):
shutil.rmtree(self.repo_dir)
except:
log.exception('Error deleting %s', self.repo_dir)
Artifact.delete(self)
mapper(Commit).remove(dict(app_config_id=self.app_config_id))
class Commit(Artifact):
class __mongometa__:
name='commit'
type_s = 'ForgeSCM Commit'
_id = FieldProperty(schema.ObjectId)
hash = FieldProperty(str)
rev = FieldProperty(int) # only relevant for hg and svn repos
repository_id = ForeignIdProperty(Repository)
summary = FieldProperty(str)
diff = FieldProperty(str)
date = FieldProperty(datetime)
parents = FieldProperty([str])
tags = FieldProperty([str])
user = FieldProperty(str)
branch = FieldProperty(str)
repository = RelationProperty(Repository, via='repository_id')
def index(self):
result = Artifact.index(self)
result.update(
result.update(
type_s='Post Snapshot',
text=self.data.text)
return result
class Post(Message, VersionedArtifact):
class __mongometa__:
name='post'
history_class = PostHistory
indexes = [ 'discussion_id', 'thread_id' ]
type_s = 'Post'
thread_id = ForeignIdProperty(Thread)
discussion_id = ForeignIdProperty(Discussion)
subject = FieldProperty(schema.Deprecated)
status = FieldProperty(schema.OneOf('ok', 'pending', 'spam', if_missing='pending'))
flagged_by = FieldProperty([schema.ObjectId])
flags = FieldProperty(int, if_missing=0)
last_edit_date = FieldProperty(datetime, if_missing=None)
last_edit_by_id = ForeignIdProperty(User)
edit_count = FieldProperty(int, if_missing=0)
thread = RelationProperty(Thread)
discussion = RelationProperty(Discussion)
def __json__(self):
author = self.author()
return dict(
_id=str(self._id),
thread_id=self.thread_id,
slug=self.slug,
subject=self.subject,
name = 'thread'
indexes = [
('artifact_id',),
('ref_id',),
(('app_config_id', pymongo.ASCENDING),
('last_post_date', pymongo.DESCENDING),
('mod_date', pymongo.DESCENDING)),
('discussion_id',),
]
type_s = 'Thread'
_id = FieldProperty(str, if_missing=lambda: h.nonce(10))
discussion_id = ForeignIdProperty(Discussion)
ref_id = ForeignIdProperty('ArtifactReference')
subject = FieldProperty(str, if_missing='')
num_replies = FieldProperty(int, if_missing=0)
num_views = FieldProperty(int, if_missing=0)
subscriptions = FieldProperty({str: bool})
first_post_id = ForeignIdProperty('Post')
last_post_date = FieldProperty(datetime, if_missing=datetime(1970, 1, 1))
artifact_reference = FieldProperty(schema.Deprecated)
artifact_id = FieldProperty(schema.Deprecated)
discussion = RelationProperty(Discussion)
posts = RelationProperty('Post', via='thread_id')
first_post = RelationProperty('Post', via='first_post_id')
ref = RelationProperty('ArtifactReference')
def should_update_index(self, old_doc, new_doc):
"""Skip index update if only `num_views` has changed.
Value of `num_views` is updated whenever user loads thread page.
artifact_ref=artifact_ref,
tag=t)
@classmethod
def remove(cls, artifact_ref, user, tags):
cls.query.remove(dict(
user_id=user._id,
artifact_ref=artifact_ref,
tag={'$in':tags}))
class TagEvent(MappedClass):
class __mongometa__:
session = main_orm_session
name='tag_event'
_id = FieldProperty(S.ObjectId)
when = FieldProperty(datetime, if_missing=datetime.utcnow)
user_id = ForeignIdProperty('User', if_missing=lambda:c.user._id)
artifact_ref = FieldProperty(ArtifactReferenceType)
added_tags = FieldProperty([str])
removed_tags = FieldProperty([str])
user = RelationProperty('User', via='user_id')
def as_message(self):
aref = self.artifact_ref
aref['project_id'] = str(aref['project_id'])
aref['artifact_id'] = str(aref['artifact_id'])
d = dict(
when=self.when,
user_id=str(self.user_id),
project_id=aref['project_id'],
artifact_ref=dict(aref),
return MyArtifactComment.query.find(dict(artifact_id=self._id, parent_id=None))
def reply(self):
while True:
try:
c = MyArtifactComment(artifact_id=self._id)
return c
except OperationFailure:
sleep(0.1)
continue
class MyArtifactComment(Message):
class __mongometa__:
name='my_artifact_comment'
type_s = 'MyArtifact Comment'
artifact_id=FieldProperty(schema.ObjectId)
def index(self):
result = Message.index(self)
author = self.author()
result.update(
title_s='Comment on %s by %s' % (
self.artifact.shorthand_id(), author.get_pref('display_name')),
type_s=self.type_s)
return result
@property
def artifact(self):
return MyArtifact.query.get(_id=self.artifact_id)
@property
def posted_ago(self):
class DiscussionAttachment(BaseAttachment):
DiscussionClass = Discussion
ThreadClass = Thread
PostClass = Post
ArtifactClass = Post
thumbnail_size = (100, 100)
class __mongometa__:
polymorphic_identity = 'DiscussionAttachment'
indexes = ['filename', 'discussion_id', 'thread_id', 'post_id']
discussion_id = FieldProperty(schema.ObjectId)
thread_id = FieldProperty(str)
post_id = FieldProperty(str)
artifact_id = FieldProperty(str)
attachment_type = FieldProperty(str, if_missing='DiscussionAttachment')
@property
def discussion(self):
return self.DiscussionClass.query.get(_id=self.discussion_id)
@property
def thread(self):
return self.ThreadClass.query.get(_id=self.thread_id)
@property
def post(self):
return self.PostClass.query.get(_id=self.post_id)
@classmethod
def metadata_for(cls, post):
class DiscussionAttachment(BaseAttachment):
DiscussionClass = Discussion
ThreadClass = Thread
PostClass = Post
ArtifactClass = Post
thumbnail_size = (100, 100)
class __mongometa__:
polymorphic_identity = 'DiscussionAttachment'
indexes = ['filename', 'discussion_id', 'thread_id', 'post_id']
discussion_id = FieldProperty(schema.ObjectId)
thread_id = FieldProperty(str)
post_id = FieldProperty(str)
artifact_id = FieldProperty(str)
attachment_type = FieldProperty(str, if_missing='DiscussionAttachment')
@property
def discussion(self):
return self.DiscussionClass.query.get(_id=self.discussion_id)
@property
def thread(self):
return self.ThreadClass.query.get(_id=self.thread_id)
@property
def post(self):
return self.PostClass.query.get(_id=self.post_id)
@classmethod
def metadata_for(cls, post):
return dict(
from allura.lib.security import require_access, has_access
from .artifact import Artifact, VersionedArtifact, Snapshot, Message, Feed
from .attachments import BaseAttachment
from .auth import User
log = logging.getLogger(__name__)
class Discussion(Artifact):
class __mongometa__:
name='discussion'
type_s = 'Discussion'
parent_id = FieldProperty(schema.Deprecated)
shortname = FieldProperty(str)
name = FieldProperty(str)
description = FieldProperty(str, if_missing='')
num_topics = FieldProperty(int, if_missing=0)
num_posts = FieldProperty(int, if_missing=0)
subscriptions = FieldProperty({str:bool})
threads = RelationProperty('Thread')
posts = RelationProperty('Post')
def __json__(self):
return dict(
_id=str(self._id),
shortname=self.shortname,
name=self.name,
description=self.description,
threads=[dict(_id=t._id, subject=t.subject)
for t in self.threads ])
from .auth import User, ProjectRole, AlluraUserProperty
from .timeline import ActivityObject
from .types import MarkdownCache
log = logging.getLogger(__name__)
class Discussion(Artifact, ActivityObject):
class __mongometa__:
name = 'discussion'
type_s = 'Discussion'
parent_id = FieldProperty(schema.Deprecated)
shortname = FieldProperty(str)
name = FieldProperty(str)
description = FieldProperty(str, if_missing='')
description_cache = FieldProperty(MarkdownCache)
num_topics = FieldProperty(int, if_missing=0)
num_posts = FieldProperty(int, if_missing=0)
subscriptions = FieldProperty({str: bool})
threads = RelationProperty('Thread', via='discussion_id')
posts = RelationProperty('Post', via='discussion_id')
def __json__(self, limit=None, posts_limit=None, is_export=False):
return dict(
_id=str(self._id),
shortname=self.shortname,
name=self.name,
description=self.description,
threads=[t.__json__(limit=posts_limit, is_export=is_export) for t