Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@api.response(200, model=return_schema)
def get(self, title, session=None):
""" Get a list of IMDB search result by name or ID"""
raw_movies = ImdbSearch().smart_match(title, single_match=False)
if not raw_movies:
return jsonify([])
# Convert single movie to list to preserve consistent reply
if not isinstance(raw_movies, list):
raw_movies = [raw_movies]
return jsonify(raw_movies)
base_entry_schema = api.schema_model('base_entry_schema', ObjectsContainer.base_entry_object)
pending_list_entry_base_schema = api.schema_model('pending_list.entry_base_schema',
ObjectsContainer.pending_list_entry_base_object)
pending_lists_entries_return_schema = api.schema_model('pending_list.entry_return_schema',
ObjectsContainer.pending_lists_entries_return_object)
sort_choices = ('id', 'added', 'title', 'original_url', 'list_id', 'approved')
entries_parser = api.pagination_parser(sort_choices=sort_choices, default='title')
entries_parser.add_argument('filter', help='Filter by title name')
@pending_list_api.route('//entries/')
@api.doc(params={'list_id': 'ID of the list'}, parser=entries_parser)
@api.response(NotFoundError)
class PendingListEntriesAPI(APIResource):
@etag
@api.response(200, model=pending_lists_entries_return_schema)
def get(self, list_id, session=None):
""" Get entries by list ID """
try:
list = get_list_by_id(list_id=list_id, session=session)
except NoResultFound:
raise NotFoundError('list_id %d does not exist' % list_id)
args = entries_parser.parse_args()
# Pagination and sorting params
page = args['page']
per_page = args['per_page']
sort_by = args['sort_by']
@api.validate(raw_config_schema)
@api.response(200, model=base_message_schema, description='Successfully updated config')
@api.response(BadRequest)
@api.response(APIError)
@api.doc(
description='Config file must be base64 encoded. A backup will be created, and if successful config will'
' be loaded and saved to original file.'
)
def post(self, session=None):
""" Update config """
data = request.json
try:
raw_config = base64.b64decode(data['raw_config'])
except (TypeError, binascii.Error):
raise BadRequest(message='payload was not a valid base64 encoded string')
try:
from flexget.api import APIResource, api
from flexget.api.app import base_message_schema, success_response
schema_api = api.namespace(
'format_check', description='Test Flexget custom schema format validations'
)
class ObjectContainer:
format_checker_input = {
'type': 'object',
'properties': {
'quality': {'type': 'string', 'format': 'quality'},
'quality_requirements': {'type': 'string', 'format': 'quality_requirements'},
'time': {'type': 'string', 'format': 'time'},
'interval': {'type': 'string', 'format': 'interval'},
'size': {'type': 'string', 'format': 'size'},
'percent': {'type': 'string', 'format': 'percent'},
'regex': {'type': 'string', 'format': 'regex'},
'file': {'type': 'string', 'format': 'file'},
@api.doc(parser=lookup_parser)
def get(self, session=None):
"""Trakt movie lookup"""
args = lookup_parser.parse_args()
include_actors = args.pop('include_actors')
include_translations = args.pop('include_translations')
kwargs = args
try:
movie = ApiTrakt.lookup_movie(session=session, **kwargs)
except LookupError as e:
raise NotFoundError(e.args[0])
result = movie.to_dict()
if include_actors:
result['actors'] = db.list_actors(movie.actors)
if include_translations:
result['translations'] = db.get_translations_dict(movie.translations, 'movie')
return jsonify(result)
@api.doc(parser=cached_parser)
def get(self, session=None):
""" Cache remote resources """
args = cached_parser.parse_args()
url = args.get('url')
force = args.get('force')
try:
file_path, mime_type = cached_resource(url, self.manager.config_base, force=force)
except RequestException as e:
raise BadRequest('Request Error: {}'.format(e.args[0]))
except OSError as e:
raise APIError('Error: {}'.format(str(e)))
return send_file(file_path, mimetype=mime_type)
@api.doc(parser=plugin_parser, params={'plugin_name': 'Name of the plugin to return'})
def get(self, plugin_name, session=None):
""" Return plugin data by name"""
args = plugin_parser.parse_args()
try:
plugin = get_plugin_by_name(plugin_name, issued_by='plugins API')
except DependencyError as e:
raise BadRequest(e.message)
p = plugin_to_dict(plugin)
if args['include_schema']:
p['schema'] = plugin.schema
return jsonify(p)
@api.validate(model=user_password_input_schema, description='Password change schema')
@api.response(BadRequest)
@api.response(200, 'Success', model=base_message_schema)
@api.doc(
description='Change user password. A score of at least 3 is needed.'
'See https://github.com/dropbox/zxcvbn for details'
)
def put(self, session=None):
""" Change user password """
user = current_user
data = request.json
try:
change_password(username=user.name, password=data.get('password'), session=session)
except WeakPassword as e:
raise BadRequest(e.value)
return success_response('Successfully changed user password')
from __future__ import unicode_literals, division, absolute_import
from builtins import * # pylint: disable=unused-import, redefined-builtin
import datetime
from math import ceil
from flask import jsonify, request
from flask_restplus import inputs
from flexget.api import api, APIResource
from flexget.plugins.api.series import NoResultFound
from flexget.plugins.filter import movie_queue as mq
from flexget.utils import qualities
movie_queue_api = api.namespace('movie_queue', description='Movie Queue operations (DEPRECATED)')
default_error_schema = {
'type': 'object',
'properties': {
'status': {'type': 'string'},
'message': {'type': 'string'}
}
}
default_error_schema = api.schema('default_error_schema', default_error_schema)
empty_response = api.schema('empty', {'type': 'object'})
movie_object = {
'type': 'object',
'properties': {