Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
entry_object = get_entry_by_title(list_id=list_id, title=title, session=session)
if entry_object:
raise Conflict('entry with title \'%s\' already exists' % title)
entry_object = PendingListEntry(entry=data, pending_list_id=list_id)
if data.get('approved'):
entry_object.approved = data['approved']
session.add(entry_object)
session.commit()
response = jsonify(entry_object.to_dict())
response.status_code = 201
return response
@pending_list_api.route('//entries//')
@api.doc(params={'list_id': 'ID of the list', 'entry_id': 'ID of the entry'})
@api.response(NotFoundError)
class PendingListEntryAPI(APIResource):
@etag
@api.response(200, model=pending_list_entry_base_schema)
def get(self, list_id, entry_id, session=None):
""" Get an entry by list ID and entry ID """
try:
entry = get_entry_by_id(list_id=list_id, entry_id=entry_id, session=session)
except NoResultFound:
raise NotFoundError('could not find entry with id %d in list %d' % (entry_id, list_id))
return jsonify(entry.to_dict())
@api.response(200, model=base_message_schema)
def delete(self, list_id, entry_id, session=None):
""" Delete an entry by list ID and entry ID """
try:
@api.response(BadRequest)
@api.response(200, model=plugin_schema)
@api.doc(parser=plugin_parser, params={'plugin_name': 'Name of the plugin to return'})
def get(self, plugin_name, session=None):
""" Return plugin data by name"""
args = plugin_parser.parse_args()
try:
plugin = get_plugin_by_name(plugin_name, issued_by='plugins API')
except DependencyError as e:
raise BadRequest(e.message)
p = plugin_to_dict(plugin)
if args['include_schema']:
p['schema'] = plugin.schema
return jsonify(p)
@api.response(200, model=return_schema)
@api.response(NotFoundError)
@api.response(BadRequest)
@api.doc(parser=tmdb_parser)
def get(self, session=None):
""" Get TMDB movie data """
args = tmdb_parser.parse_args()
title = args.get('title')
tmdb_id = args.get('tmdb_id')
imdb_id = args.get('imdb_id')
posters = args.pop('include_posters', False)
backdrops = args.pop('include_backdrops', False)
if not (title or tmdb_id or imdb_id):
raise BadRequest(description)
except NoResultFound:
return {'status': 'error',
'message': 'Show with ID %s not found' % show_id
}, 404
for episode in show.episodes:
try:
series.forget_episodes_by_id(show.id, episode.id)
except ValueError as e:
return {'status': 'error',
'message': e.args[0]
}, 500
return {}
@api.response(404, 'Show ID not found', default_error_schema)
@api.response(414, 'Episode ID not found', default_error_schema)
@api.response(400, 'Episode with ep_ids does not belong to show with show_id', default_error_schema)
@series_api.route('//episodes/')
@api.doc(params={'show_id': 'ID of the show', 'ep_id': 'Episode ID'},
description='Use this endpoint to get or delete a specific episode for a show')
class SeriesEpisodeAPI(APIResource):
@api.response(200, 'Episode retrieved successfully for show', episode_schema)
def get(self, show_id, ep_id, session):
""" Get episode by show ID and episode ID"""
try:
show = series.show_by_id(show_id, session=session)
except NoResultFound:
return {'status': 'error',
'message': 'Show with ID %s not found' % show_id
}, 404
try:
@api.response(200, 'Season retrieved successfully for show', season_schema)
@api.doc(description='Get a specific season via its ID and show ID')
def get(self, show_id, season_id, session):
""" Get season by show ID and season ID"""
try:
db.show_by_id(show_id, session=session)
except NoResultFound:
raise NotFoundError('show with ID %s not found' % show_id)
try:
season = db.season_by_id(season_id, session)
except NoResultFound:
raise NotFoundError('season with ID %s not found' % season_id)
if not db.season_in_show(show_id, season_id):
raise BadRequest('season with id %s does not belong to show %s' % (season_id, show_id))
rsp = jsonify(season.to_dict())
@api.response(200, model=base_message_schema)
def delete(self, list_id, movie_id, session=None):
""" Delete a movie by list ID and movie ID """
try:
movie = db.get_movie_by_id(list_id=list_id, movie_id=movie_id, session=session)
except NoResultFound:
raise NotFoundError('could not find movie with id %d in list %d' % (movie_id, list_id))
logger.debug('deleting movie {}', movie.id)
session.delete(movie)
return success_response('successfully deleted movie %d' % movie_id)
@api.response(NotFoundError)
@api.response(200, model=history_list_schema)
def get(self, session=None):
""" List of previously accepted entries """
args = history_parser.parse_args()
# Pagination and sorting params
page = args['page']
per_page = args['per_page']
sort_by = args['sort_by']
sort_order = args['order']
# Hard limit results per page to 100
if per_page > 100:
per_page = 100
# Filter param
@api.response(200, 'Episode retrieved successfully for show', episode_schema)
def get(self, show_id, ep_id, session):
""" Get episode by show ID and episode ID"""
try:
show = series.show_by_id(show_id, session=session)
except NoResultFound:
return {'status': 'error',
'message': 'Show with ID %s not found' % show_id
}, 404
try:
episode = series.episode_by_id(ep_id, session)
except NoResultFound:
return {'status': 'error',
'message': 'Episode with ID %s not found' % ep_id
}, 414
if not series.episode_in_show(show_id, ep_id):
return {'status': 'error',
@api.response(
200, 'Successfully reset all downloaded releases for season', model=base_message_schema
)
@api.doc(
description='Resets all of the downloaded releases of an season, clearing the quality to be downloaded '
'again,'
)
def put(self, show_id, season_id, session):
""" Marks all downloaded season releases as not downloaded """
try:
db.show_by_id(show_id, session=session)
except NoResultFound:
raise NotFoundError('show with ID %s not found' % show_id)
try:
season = db.season_by_id(season_id, session)
except NoResultFound:
raise NotFoundError('season with ID %s not found' % season_id)
@api.response(201, model=api_schedule_schema)
def put(self, schedule_id, session=None):
""" Update schedule """
new_schedule = request.json
schedules = self.manager.config.get('schedules', [])
# Checks for boolean config
if schedules is True:
self.manager.config['schedules'] = DEFAULT_SCHEDULES
elif schedules is False:
raise Conflict('Schedules are disables in config')
schedule, idx = _schedule_by_id(schedule_id, self.manager.config['schedules'])
if not schedule:
raise NotFoundError('schedule %d not found' % schedule_id)