Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def scene_get(proj, sc):
"""Return a scene."""
query = session_get().query(Scene)
try:
query = query.filter_by(proj_id=proj.decode('utf-8'))
return query.filter_by(name=sc.decode('utf-8')).one()
except NoResultFound:
raise SPAMDBNotFound('Scene "%s" could not be found.' % sc)
except MultipleResultsFound:
raise SPAMDBError('Error when searching scene "%s".' % sc)
def annotable_get(annotable_id):
"""Return an existing annotable."""
session = session_get()
query = session.query(Annotable).filter_by(id=annotable_id.decode('utf-8'))
try:
return query.one()
except NoResultFound:
raise SPAMDBNotFound(
'Annotable "%s" could not be found.' % annotable_id)
except MultipleResultsFound:
raise SPAMDBError(
'Error when searching annotable "%s".' % annotable_id)
def project_get(proj):
"""Return a lazyloaded project."""
try:
return query_projects().filter_by(id=proj.decode('utf-8')).one()
except NoResultFound:
raise SPAMDBNotFound('Project "%s" could not be found.' % proj)
except MultipleResultsFound:
raise SPAMDBError('Error when searching project "%s".' % proj)
def group_get(group_id):
"""Return a group."""
query = session_get().query(Group)
try:
return query.filter_by(group_id=group_id.decode('utf-8')).one()
except NoResultFound:
try:
domain = config.auth_domain.decode('utf-8')
group_id = '%s-%s' % (domain, group_id)
return query.filter_by(group_id=group_id.decode('utf-8')).one()
except NoResultFound:
raise SPAMDBNotFound('Group "%s" could not be found.' % group_id)
except MultipleResultsFound:
raise SPAMDBError('Error when searching group "%s".' % group_id)
except MultipleResultsFound:
raise SPAMDBError('Error when searching group "%s".' % group_id)
def libgroup_get(proj, libgroup_id):
"""Return a libgroup."""
session = session_get()
query = session.query(Libgroup).filter_by(proj_id=proj.decode('utf-8'))
try:
return query.filter_by(id=libgroup_id.decode('utf-8')).one()
except NoResultFound:
raise SPAMDBNotFound('Libgroup "%s" could not be found.' % libgroup_id)
except MultipleResultsFound:
raise SPAMDBError('Error when searching Librarygroup "%s".' %
libgroup_id)
def assetversion_get(proj, assetver_id):
"""Return an asset version."""
query = session_get().query(AssetVersion)
try:
return query.filter_by(id=assetver_id.decode('utf-8')).one()
except NoResultFound:
raise SPAMDBNotFound('AssetVersion "%s" could not be found.' %
assetver_id)
except MultipleResultsFound:
raise SPAMDBError('Error when searching asset version "%s".' %
assetver_id)
def note_get(note_id):
"""Return an existing note."""
query = session_get().query(Note).filter_by(id=note_id.decode('utf-8'))
try:
return query.one()
except NoResultFound:
raise SPAMDBNotFound('Note "%s" could not be found.' % note_id)
except MultipleResultsFound:
raise SPAMDBError('Error when searching note "%s".' % note_id)
def category_get(category_id):
"""Return a asset category."""
session = session_get()
query = session.query(Category).filter_by(id=category_id.decode('utf-8'))
try:
return query.one()
except NoResultFound:
raise SPAMDBNotFound('Category "%s" could not be found.' % category_id)
except MultipleResultsFound:
raise SPAMDBError('Error when searching category "%s".' % category_id)
def asset_get(proj, asset_id):
"""Return an asset."""
query = session_get().query(Asset)
try:
return query.filter_by(id=asset_id.decode('utf-8')).one()
except NoResultFound:
raise SPAMDBNotFound('Asset "%s" could not be found.' % asset_id)
except MultipleResultsFound:
raise SPAMDBError('Error when searching asset "%s".' % asset_id)
def container_get(proj, container_type, container_id):
"""Return an asset container."""
query = session_get().query(AssetContainer)
try:
return query.filter_by(id=container_id.decode('utf-8')).one()
except NoResultFound:
raise SPAMDBNotFound('Container "%s %s" could not be found.' %
(container_type, container_id))
except MultipleResultsFound:
raise SPAMDBError('Error when searching container "%s %s".' %
(container_type, container_id))