Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def inject_series(self, execute_task, release_name):
execute_task('inject_series', options={'inject': [Entry(title=release_name, url='')], 'disable_tracking': True})
import subprocess
from loguru import logger
from flexget import plugin
from flexget.config_schema import one_or_more
from flexget.entry import Entry
from flexget.event import event
from flexget.utils.template import RenderError, render_from_entry, render_from_task
from flexget.utils.tools import io_encoding
logger = logger.bind(name='exec')
class EscapingEntry(Entry):
"""Helper class, same as a Entry, but returns all string value with quotes escaped."""
def __init__(self, entry):
super().__init__(entry)
def __getitem__(self, key):
value = super().__getitem__(key)
# TODO: May need to be different depending on OS
if isinstance(value, str):
value = value.replace('"', '\\"')
return value
class PluginExec:
"""
Execute commands
from loguru import logger
from flexget import plugin
from flexget.entry import Entry
from flexget.event import event
from flexget.utils.tools import group_entries
logger = logger.bind(name='best_quality')
entry_actions = {'accept': Entry.accept, 'reject': Entry.reject}
class FilterBestQuality:
schema = {
'type': 'object',
'properties': {
'identified_by': {'type': 'string', 'default': 'auto'},
'on_best': {
'type': 'string',
'enum': ['accept', 'reject', 'do_nothing'],
'default': 'do_nothing',
},
'on_lower': {
'type': 'string',
'enum': ['accept', 'reject', 'do_nothing'],
'default': 'reject',
def map_search_result_entry(json_entry, download_auth=None):
"""
Parse json object of a torrent entry to flexget Entry
:param download_auth: Requests authenticator
"""
result = Entry()
result['t411_torrent_id'] = int(json_entry['id'])
result['title'] = json_entry['name']
result['url'] = T411RestClient.download_url(json_entry['id'])
result['t411_category'] = int(json_entry['category'])
result['seeders'] = int(json_entry['seeders'])
result['leechers'] = int(json_entry['leechers'])
result['t411_comments'] = int(json_entry['comments'])
result['t411_verified'] = json_entry['isVerified'] is '1'
result['t411_pubdate'] = datetime.strptime(json_entry['added'], T411ObjectMapper.date_format)
result['content_size'] = int(json_entry['size']) / (1024 ** 2)
result['t411_times_completed'] = int(json_entry['times_completed'])
result['t411_category_name'] = json_entry['categoryname']
result['t411_category_image'] = json_entry['categoryimage']
result['t411_privacy'] = json_entry['privacy']
result['t411_owner_id'] = int(json_entry['owner'])
result['t411_owner_username'] = json_entry['username']
def getter(self):
return Entry(json.loads(getattr(self, name), decode_datetime=True))
def on_feed_input(self, feed, config):
entries = []
client = self.get_xunlei_client(config)
tasks = client.get_task_list(config['limit'], 2)
for task in tasks:
if task['status'] != "finished":
continue
elif task['lixian_url']:
entry = Entry(title=task['taskname'],
url=task['lixian_url'],
cookie="gdriveid=%s;" % client.gdriveid,
taskname=".",
size=task['size'],
format=task['format'],
fields=config['fields'],
)
entries.append(entry)
elif task['task_type'] in ("bt", "magnet"):
files = client.get_bt_list(task['task_id'], task['cid'])
for file in files:
if not file['lixian_url']:
continue
entry = Entry(title=file['dirtitle'],
url=file['lixian_url'],
cookie="gdriveid=%s;" % client.gdriveid,
entries = []
log.verbose('Fetching %s' % url)
try:
r = task.requests.get(url)
except RequestException as e:
log.error("Failed fetching '%s': %s" % (url, e))
rss = feedparser.parse(r.content)
log.debug("Raw RSS: %s" % rss)
if not len(rss.entries):
log.info('No results returned')
for rss_entry in rss.entries:
new_entry = Entry()
for key in list(rss_entry.keys()):
new_entry[key] = rss_entry[key]
new_entry['url'] = new_entry['link']
if rss_entry.enclosures:
size = int(rss_entry.enclosures[0]['length']) # B
new_entry['content_size'] = size / (2 ** 20) # MB
entries.append(new_entry)
return entries
raise PluginError('Error retrieving favorites from thetvdb: %s' % str(e))
self._items = []
for series_id in series_ids:
# Lookup the series name from the id
try:
series = plugin_api_tvdb.lookup_series(
tvdb_id=series_id, language=self.config.get('language')
)
except LookupError as e:
log.error('Error looking up %s from thetvdb: %s' % (series_id, e.args[0]))
else:
series_name = series.name
if self.config.get('strip_dates'):
# Remove year from end of series name if present
series_name, _ = split_title_year(series_name)
entry = Entry()
entry['title'] = entry['series_name'] = series_name
entry['url'] = 'http://thetvdb.com/index.php?tab=series&id={}'.format(
str(series.id)
)
entry['tvdb_id'] = str(series.id)
self._items.append(entry)
return self._items
thumbtag = "thumb"
arttag = "art"
seasoncovertag = "parentThumb"
covertag = "grandparentThumb"
elif viewgroup == "movie":
domroot = "Video"
titletag = "title"
arttag = "art"
seasoncovertag = "thumb"
covertag = "thumb"
if config['fetch'] == "thumb":
raise plugin.PluginError(
"Movie sections does not have any thumbnails to download!"
)
for node in dom.getElementsByTagName(domroot):
e = Entry()
e['plex_server'] = config['plexserver']
e['plex_port'] = config['port']
e['plex_section'] = config['section']
e['plex_section_name'] = plexsectionname
e['plex_episode_thumb'] = ''
title = node.getAttribute(titletag)
if config['strip_year']:
title = re.sub(r'^(.*)\(\d{4}\)(.*)', r'\1\2', title)
if config['strip_parens']:
title = re.sub(r'\(.*?\)', r'', title)
title = title.strip()
if config['strip_non_alpha']:
title = re.sub(r'[\(\)]', r'', title)
title = re.sub(r'&', r'And', title)
title = re.sub(r'[^A-Za-z0-9- \']', r'', title)
def delete(self, session):
data = request.json
for plugin_name, plugin_config in data['config'].iteritems():
thelist = plugin.get_plugin_by_name(plugin_name).instance.get_list(plugin_config)
thelist.discard(Entry(data['item']))