Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert json.loads(rsp.get_data(as_text=True)).get('title') == 'test_series_2'
new_data = {'quality': '720p',
'qualities': ['720p', '1080p'],
'timeframe': '2 days',
'upgrade': True,
'target': '1080p',
'propers': True,
'specials': True,
'tracking': False,
'exact': True,
'begin': 's01e01',
'from_group': ['group1', 'group2'],
'parse_only': True}
rsp = api_client.json_put('/series_list/1/', data=json.dumps(new_data))
assert rsp.status_code == 200, 'Response code is %s' % rsp.status_code
rsp = api_client.get('/series_list/1/series/')
assert rsp.status_code == 200, 'Response code is %s' % rsp.status_code
response = json.loads(rsp.get_data(as_text=True))
assert len(response.get('series')) == 2
for series in response.get('series'):
for attribute in new_data:
assert series[attribute] == new_data[attribute]
def test_secrets_put(self, api_client):
rsp = api_client.get('/secrets/')
assert rsp.status_code == 200, 'Response code is %s' % rsp.status_code
assert json.loads(rsp.get_data(as_text=True)) == {}
rsp = api_client.json_put('/secrets/', data=json.dumps(self.secrets_dict))
assert rsp.status_code == 201, 'Response code is %s' % rsp.status_code
assert json.loads(rsp.get_data(as_text=True)) == self.secrets_dict
rsp = api_client.get('/secrets/')
assert rsp.status_code == 200, 'Response code is %s' % rsp.status_code
assert json.loads(rsp.get_data(as_text=True)) == self.secrets_dict
def test_json_encode_dt(self):
date_str = '2016-03-11T17:12:17Z'
dt = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ')
encoded_dt = json.dumps(dt, encode_datetime=True)
assert encoded_dt == '"%s"' % date_str
def setter(self, entry):
if isinstance(entry, Entry) or isinstance(entry, dict):
setattr(self, name, json.dumps(only_builtins(dict(entry)), encode_datetime=True))
else:
raise TypeError(f'{type(entry)!r} is not of type Entry or dict.')
try:
dest = 1 if config.get('queue', self.DEFAULT_QUEUE) else 0 # Destination.Queue = 1
# Use the title of the entry, if no naming schema for the package is defined.
name = config.get('package', entry['title'])
# If name has jinja template, render it
try:
name = entry.render(name)
except RenderError as e:
name = entry['title']
logger.error('Error rendering jinja event: {}', e)
data = {
'name': json.dumps(name.encode('ascii', 'ignore').decode()),
'links': json.dumps(urls),
'dest': json.dumps(dest),
'session': session,
}
pid = api.post('addPackage', data=data).text
logger.debug('added package pid: {}', pid)
# Set Folder
folder = config.get('folder', self.DEFAULT_FOLDER)
folder = entry.get('path', folder)
if folder:
# If folder has jinja template, render it
try:
folder = entry.render(folder)
except RenderError as e:
return
if task.manager.options.test:
log.info('Not submitting to trakt.tv because of test mode.')
return
# Submit our found items to trakt
if config['type'] == 'series':
post_url = 'http://api.trakt.tv/show/episode/library/' + config['api_key']
else:
post_url = 'http://api.trakt.tv/movie/library/' + config['api_key']
for item in found.itervalues():
# Add username and password to the dict to submit
item.update({'username': config['username'], 'password': config['password']})
try:
result = task.requests.post(post_url, data=json.dumps(item), raise_status=False)
except RequestException as e:
log.error('Error submitting data to trakt.tv: %s' % e)
continue
if result.status_code == 404:
# Remove some info from posted json and print the rest to aid debugging
for key in ['username', 'password', 'episodes']:
item.pop(key, None)
log.warning('%s not found on trakt: %s' % (config['type'].capitalize(), item))
continue
elif result.status_code == 401:
log.error('Error authenticating with trakt. Check your username/password/api_key')
log.debug(result.text)
continue
elif result.status_code != 200:
log.error('Error submitting data to trakt.tv: %s' % result.text)
else:
logger.debug(
'Not submitting `{}`, no movie name or id found.', entry['title']
)
continue
found.setdefault('movies', []).append(movie)
if not (found.get('shows') or found.get('movies')):
logger.debug('Nothing to submit to trakt.')
return
url = db.get_api_url(self.get_list_endpoint(remove, submit=True))
logger.debug('Submitting data to trakt.tv ({}): {}', url, found)
try:
result = self.session.post(url, data=json.dumps(found), raise_status=False)
except RequestException as e:
logger.error('Error submitting data to trakt.tv: {}', e)
return
if 200 <= result.status_code < 300:
action = 'deleted' if remove else 'added'
res = result.json()
# Default to 0 for all categories, even if trakt response didn't include them
for cat in ('movies', 'shows', 'episodes', 'seasons'):
res[action].setdefault(cat, 0)
logger.info(
'Successfully {0} to/from list {1}: {movies} movie(s), {shows} show(s), {episodes} episode(s), '
'{seasons} season(s).',
action,
self.config['list'],
**res[action],
)
def setter(self, entry):
setattr(self, name, json.dumps(entry, encode_datetime=True))