Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update(self, id, updates, original):
reason = updates.pop('reason', None)
cancelled_items = updates.pop('_cancelled_events', [])
item = super().update(id, updates, original)
if self.is_original_event(original):
user = get_user(required=True).get(config.ID_FIELD, '')
session = get_auth().get(config.ID_FIELD, '')
push_notification(
'events:cancel',
item=str(original[config.ID_FIELD]),
user=str(user),
session=str(session),
occur_status=updates.get('occur_status'),
etag=item.get('_etag'),
cancelled_items=cancelled_items,
reason=reason or '',
actioned_date=updates.get('actioned_date')
)
return item
def on_created(self, docs):
for doc in docs:
self._push_notification(
doc.get(config.ID_FIELD),
'event_planning_filters:created'
)
def get_plannings_for_event(event):
return get_resource_service('planning').find(where={
'event_item': event[config.ID_FIELD]
})
get_resource_service('events_post').post([post])
push_notification(
'events:updated:recurring',
item=str(original[config.ID_FIELD]),
user=str(updates.get('version_creator', '')),
recurrence_id=str(generated_events[0]['recurrence_id'])
)
else:
if original.get('lock_action') == 'mark_completed' and updates.get('actioned_date'):
self.mark_event_complete(original, updates, original, None)
# This updates Event metadata only
push_notification(
'events:updated',
item=str(original[config.ID_FIELD]),
user=str(updates.get('version_creator', ''))
)
try:
can_user_lock, error_message = self.can_lock(item, user_id, session_id, resource)
if can_user_lock:
# following line executes handlers attached to function:
# on_lock_'resource' - ex. on_lock_planning, on_lock_event
getattr(self.app, 'on_lock_%s' % resource)(item, user_id)
updates = {LOCK_USER: user_id, LOCK_SESSION: session_id, 'lock_time': utcnow()}
if action:
updates['lock_action'] = action
item_service.update(item.get(config.ID_FIELD), updates, item)
push_notification(resource + ':lock',
item=str(item.get(config.ID_FIELD)),
user=str(user_id), lock_time=updates['lock_time'],
lock_session=str(session_id),
lock_action=updates.get('lock_action'),
etag=updates['_etag'])
else:
raise SuperdeskApiError.forbiddenError(message=error_message)
item = item_service.find_one(req=None, _id=item_id)
# following line executes handlers attached to function:
# on_locked_'resource' - ex. on_locked_planning, on_locked_event
getattr(self.app, 'on_locked_%s' % resource)(item, user_id)
return item
finally:
# unlock the lock :)
unlock(lock_id, remove=True)
planning_service = get_resource_service('planning')
# Obtain the full list of Planning items that we're to process first
# As subsequent queries will change the list of returnd items
plans = dict()
for items in planning_service.get_expired_items(expiry_datetime):
plans.update({item[config.ID_FIELD]: item for item in items})
locked_plans = set()
plans_expired = set()
for plan_id, plan in plans.items():
if plan.get('lock_user'):
locked_plans.add(plan_id)
else:
planning_service.system_update(plan[config.ID_FIELD], {'expired': True}, plan)
plans_expired.add(plan_id)
if len(locked_plans) > 0:
logger.info('{} Skipping {} locked Planning items: {}'.format(
self.log_msg,
len(locked_plans),
list(locked_plans)
))
if len(plans_expired) > 0:
push_notification(
'planning:expired',
items=list(plans_expired)
)
logger.info('{} {} Planning items expired: {}'.format(self.log_msg, len(plans_expired), list(plans_expired)))
def on_update(self, updates, original):
if 'name' in updates and not current_user_has_privilege('planning_agenda_management'):
raise SuperdeskApiError.forbiddenError('Insufficient privileges to update agenda.')
user = get_user()
if user and user.get(config.ID_FIELD):
updates['version_creator'] = user[config.ID_FIELD]
self._validate_unique_agenda(updates, original)
def on_item_deleted(self, doc):
lookup = {'event_id': doc[config.ID_FIELD]}
self.delete(lookup=lookup)
def __call__(self, cls_):
resource = self.resource or cls_.__name__.lower()
domain = {
resource: {
'schema': {},
'datasource': {'source': cls_.__name__},
'id_field': config.ID_FIELD,
'item_lookup': True,
'item_lookup_field': config.ID_FIELD,
'item_url': 'regex("[0-9]+")'
}
}
projection = domain[resource]['datasource']['projection'] = {}
if hasattr(cls_, '_eve_resource'):
dict_update(domain[resource], cls_._eve_resource)
all_orm_descriptors = inspect(cls_).all_orm_descriptors
for desc in all_orm_descriptors:
if isinstance(desc, InstrumentedAttribute):
prop = desc.property
if prop.key in (config.LAST_UPDATED,
config.DATE_CREATED,
config.ETAG):
posted_events = historic + past + [original] + future
# First we want to validate that all events can be posted
for event in posted_events:
self.validate_post_state(post_to_state)
self.validate_item(event)
# Next we perform the actual post
updated_event = None
ids = []
items = []
for event in posted_events:
updated_event = self.post_event(event, post_to_state, doc.get('repost_on_update'))
ids.append(event[config.ID_FIELD])
items.append({
'id': event[config.ID_FIELD],
'etag': updated_event['_etag']
})
# Do not send push-notification if reposting as each event's post state is different
# The original action's notifications should refetch items
if not doc.get('repost_on_update'):
event_type = 'events:posted:recurring' if doc['pubstatus'] == POST_STATE.USABLE \
else 'events:unposted:recurring'
push_notification(
event_type,
item=original[config.ID_FIELD],
items=items,
recurrence_id=str(original.get('recurrence_id')),
pubstatus=updated_event['pubstatus'],
state=updated_event['state']