Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if resources:
resources = resources.replace(',', ' ').strip().split()
else:
resources = []
resources = dict.fromkeys(resources)
# Always include item
resources['item'] = None
# Exclude resources that should never have provenance
for disallowedResource in ('model_base', 'notification', 'password',
'token'):
if disallowedResource in resources:
del resources[disallowedResource]
self.unbindModels(resources)
for resource in resources:
if resource not in self.boundResources:
events.bind('model.%s.save' % resource, 'provenance',
self.resourceSaveHandler)
events.bind('model.%s.copy.prepare' % resource,
'provenance', self.resourceCopyHandler)
if hasattr(self.loadInfo['apiRoot'], resource):
getattr(self.loadInfo['apiRoot'], resource).route(
'GET', (':id', 'provenance'),
self.getGetHandler(resource))
self.boundResources[resource] = True
def load(self, info):
getPlugin('jobs').load(info)
info['apiRoot'].worker = Worker()
events.bind('jobs.schedule', 'worker', event_handlers.schedule)
events.bind('jobs.status.validate', 'worker', event_handlers.validateJobStatus)
events.bind('jobs.status.validTransitions', 'worker', event_handlers.validTransitions)
events.bind('jobs.cancel', 'worker', event_handlers.cancel)
events.bind('model.job.save.after', 'worker', event_handlers.attachJobInfoSpec)
events.bind('model.job.save', 'worker', event_handlers.attachParentJob)
Job().exposeFields(AccessType.SITE_ADMIN, {'celeryTaskId', 'celeryQueue'})
else:
'GET',
(':itemId', 'flow', ':jobId', 'output'),
flowRunOutput)
info['apiRoot'].item.route(
'GET',
(':itemId', 'flow', ':inputType', ':inputFormat',
':outputFormat'),
flowConvert)
info['apiRoot'].item.route(
'DELETE',
(':itemId', 'flow', ':jobId'),
flowStopRun)
events.bind('model.setting.validate', 'flow', validateSettings)
def load(info):
registerAccessFlag(constants.ACCESS_FLAG_EXECUTE_TASK, name='Execute analyses', admin=True)
TokenScope.describeScope(
constants.TOKEN_SCOPE_EXECUTE_TASK, name='Execute tasks', description='Execute item tasks.')
TokenScope.describeScope(
constants.TOKEN_SCOPE_AUTO_CREATE_CLI, 'Item task auto-creation',
'Create new CLIs via automatic introspection.', admin=True)
Item().ensureIndex(['meta.isItemTask', {'sparse': True}])
Item().exposeFields(level=AccessType.READ, fields='createdByJob')
Job().exposeFields(level=AccessType.READ, fields={'itemTaskId', 'itemTaskBindings'})
events.bind('jobs.job.update', 'item_tasks', _onJobSave)
events.bind('data.process', 'item_tasks', _onUpload)
info['apiRoot'].item_task = ItemTask()
info['apiRoot'].item.route('POST', (':id', 'item_task_slicer_cli_description'),
runSlicerCliTasksDescriptionForItem)
info['apiRoot'].item.route('PUT', (':id', 'item_task_slicer_cli_xml'),
configureItemTaskFromSlicerCliXml)
info['apiRoot'].item.route('POST', (':id', 'item_task_json_description'),
runJsonTasksDescriptionForItem)
info['apiRoot'].item.route('PUT', (':id', 'item_task_json_specs'),
configureItemTaskFromJson)
info['apiRoot'].item.route('POST', (':id', 'item_task_celery'),
describeCeleryTaskItem)
info['apiRoot'].folder.route('POST', (':id', 'item_task_slicer_cli_description'),
def load(self, info):
events.bind('model.setting.validate', 'pvwproxy', validate_settings)
info['apiRoot'].proxy = Proxy()
def load(info):
girderRoot = info['serverRoot']
histomicsRoot = Webroot(_template)
histomicsRoot.updateHtmlVars(girderRoot.vars)
histomicsRoot.updateHtmlVars({'title': 'HistomicsTK'})
info['serverRoot'].histomicstk = histomicsRoot
info['serverRoot'].girder = girderRoot
# auto-ingest annotations into database when a .anot file is uploaded
events.bind('data.process', 'HistomicsTK', process_annotations)
'annotation.name': 10,
'annotation.description': 1,
})
self.exposeFields(AccessType.READ, (
'annotation', '_version', '_elementQuery', '_active',
) + self.baseFields)
events.bind('model.item.remove', 'large_image_annotation', self._onItemRemove)
events.bind('model.item.copy.prepare', 'large_image_annotation', self._prepareCopyItem)
events.bind('model.item.copy.after', 'large_image_annotation', self._handleCopyItem)
self._historyEnabled = Setting().get(
constants.PluginSettings.LARGE_IMAGE_ANNOTATION_HISTORY)
# Listen for changes to our relevant settings
events.bind('model.setting.save.after', 'large_image_annotation', self._onSettingChange)
events.bind('model.setting.remove', 'large_image_annotation', self._onSettingChange)
root = webroot.Webroot()
api_main.addApiToNode(root)
cherrypy.engine.subscribe('start', girder.events.daemon.start)
cherrypy.engine.subscribe('stop', girder.events.daemon.stop)
if plugins is None:
settings = model_importer.ModelImporter().model('setting')
plugins = settings.get(constants.SettingKey.PLUGINS_ENABLED, default=())
plugins = list(plugin_utilities.getToposortedPlugins(plugins, ignoreMissing=True))
_configureStaticRoutes(root, plugins)
girder.events.bind('model.setting.save.after', '_updateStaticRoutesIfModified',
functools.partial(_configureStaticRoutes, root, plugins))
root, appconf, _ = plugin_utilities.loadPlugins(
plugins, root, appconf, root.api.v1, buildDag=False)
return root, appconf
password=setting.get(SettingKey.SMTP_PASSWORD)
)
logger.info('Sending email to %s through %s', ', '.join(recipients), smtp.host)
with smtp:
smtp.send(msg['From'], recipients, msg.as_string())
def _sendmail(event):
msg = event.info['message']
recipients = event.info['recipients']
_submitEmail(msg, recipients)
events.bind('_sendmail', 'core.email', _sendmail)
def sendMailSync(subject, text, to, bcc=None):
"""Send an email synchronously."""
msg, recipients = _createMessage(subject, text, to, bcc)
_submitEmail(msg, recipients)
def sendMail(subject, text, to, bcc=None):
"""
Send an email asynchronously.
:param subject: The subject line of the email.
:type subject: str
:param text: The body of the email.
def load(self, info):
User().ensureIndex((
(('oauth.provider', SortDir.ASCENDING),
('oauth.id', SortDir.ASCENDING)), {}))
events.bind('no_password_login_attempt', 'oauth', checkOauthUser)
info['apiRoot'].oauth = rest.OAuth()