Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@with_session
def lookup_series(name=None, session=None):
series = None
res = session.query(TVRageLookup).filter(TVRageLookup.name == name.lower()).first()
if res and not res.series:
# The lookup failed in the past for this series, retry every week
# TODO: 1.2 this should also retry with --retry or whatever flag imdb lookup is using for that
if res.failed_time and res.failed_time > datetime.datetime.now() - datetime.timedelta(days=7):
raise LookupError('Could not find show %s' % name)
elif res:
series = res.series
# if too old result, clean the db and refresh it
interval = parse_timedelta(UPDATE_INTERVAL)
if datetime.datetime.now() > series.last_update + interval:
log.debug('Refreshing tvrage info for %s', name)
else:
@with_session
def load_user(username, session=None):
return session.query(User).filter(User.name == username).first()
def __init__(self, endpoint, caller):
self.endpoint = endpoint
self.caller = caller
def __getattr__(self, item):
return self.__class__(self.endpoint + '/' + item, self.caller)
__getitem__ = __getattr__
def __call__(self, data=None, method=None):
return self.caller(self.endpoint, data=data, method=method)
class APIResource(Resource):
"""All api resources should subclass this class."""
method_decorators = [with_session]
def __init__(self, api, *args, **kwargs):
self.manager = manager.manager
super(APIResource, self).__init__(api, *args, **kwargs)
@with_session
def get_entries_by_list_id(list_id, start=None, stop=None, order_by='title', descending=False, approved=False,
filter=None, session=None):
log.debug('querying entries from pending list with id %d', list_id)
query = session.query(PendingListEntry).filter(PendingListEntry.list_id == list_id)
if filter:
query = query.filter(func.lower(PendingListEntry.title).contains(filter.lower()))
if approved:
query = query.filter(PendingListEntry.approved is approved)
if descending:
query = query.order_by(getattr(PendingListEntry, order_by).desc())
else:
query = query.order_by(getattr(PendingListEntry, order_by))
return query.slice(start, stop).all()
@with_session
def get_series_by_id(list_id, series_id, session=None):
log.debug('fetching series with id %d from list id %d', series_id, list_id)
return session.query(SeriesListSeries).filter(
and_(SeriesListSeries.id == series_id, SeriesListSeries.list_id == list_id)).one()
@with_session
def forget_by_id(entry_id, session=None):
"""
Delete SeenEntry via its ID
:param entry_id: SeenEntry ID
:param session: DB Session
"""
entry = session.query(SeenEntry).filter(SeenEntry.id == entry_id).first()
if not entry:
raise ValueError('Could not find entry with ID {0}'.format(entry_id))
log.debug('Deleting seen entry with ID {0}'.format(entry_id))
session.delete(entry)
@with_session
def get_entry_by_title(list_id, title, session=None):
entry_list = get_list_by_id(list_id=list_id, session=session)
if entry_list:
log.debug('fetching entry with title `%s` from list id %d', title, list_id)
return session.query(PendingListEntry).filter(and_(
PendingListEntry.title == title, PendingListEntry.list_id == list_id)).first()
@with_session
def lookup(
title=None,
year=None,
tmdb_id=None,
imdb_id=None,
smart_match=None,
only_cached=False,
session=None,
language='en',
):
"""
Do a lookup from TMDb for the movie matching the passed arguments.
Any combination of criteria can be passed, the most specific criteria specified will be used.
:param int tmdb_id: tmdb_id of desired movie
@with_session
def season_lookup(session=None, only_cached=False, **lookup_params):
series_name = lookup_params.get('series_name') or lookup_params.get('title')
show_id = lookup_params.get('tvmaze_id') or lookup_params.get('tvdb_id')
season_number = lookup_params.get('series_season')
# Verify we have enough parameters for search
if not any([series_name, show_id, season_number]):
raise LookupError('Not enough parameters to lookup episode')
# Get series
series = APITVMaze.series_lookup(session=session, only_cached=only_cached, **lookup_params)
if not series:
raise LookupError(
'Could not find series with the following parameters: {0}'.format(lookup_params)
)
@with_session
def do_cli(manager, options, session=None):
if hasattr(options, 'user'):
options.user = options.user.lower()
if options.action == 'passwd':
user = user_exist(name=options.user, session=session)
if not user:
console('User %s does not exist' % options.user)
return
try:
change_password(user_name=user.name, password=options.password, session=session)
except WeakPassword as e:
console(e.value)
return
console('Updated password for user %s' % options.user)