Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@cached('text')
@plugin.internet(log)
def on_task_input(self, task, config):
url = config['url']
if '://' in url:
lines = task.requests.get(url).text.split('\n')
else:
lines = Path(url).lines(encoding=config.get('encoding', 'utf-8'))
entry_config = config.get('entry')
format_config = config.get('format', {})
entries = []
# keep track what fields have been found
used = {}
entry = Entry()
# now parse text
@plugin.internet(log)
def search(self, task, entry, config=None):
"""
Search for name from piratebay.
"""
if not isinstance(config, dict):
config = {}
self.set_urls(config.get('url', URL))
sort = SORT.get(config.get('sort_by', 'seeds'))
if config.get('sort_reverse'):
sort += 1
if isinstance(config.get('category'), int):
category = config['category']
else:
category = CATEGORIES.get(config.get('category', 'all'))
filter_url = '/0/%d/%d' % (sort, category)
@plugin.internet(log)
def parse_download_page(self, url, task):
log.verbose('Descargas2020 URL: %s', url)
try:
page = self.session.get(url)
except requests.RequestException as e:
raise UrlRewritingError(e)
try:
soup = get_soup(page.text)
except Exception as e:
raise UrlRewritingError(e)
torrent_id = None
url_format = DESCARGAS2020_TORRENT_FORMAT
torrent_id_prog = re.compile(
@plugin.internet(log)
def url_from_page(self, url):
"""Parses torrent url from newtorrents download page"""
try:
page = requests.get(url)
data = page.text
except requests.RequestException:
raise UrlRewritingError('URLerror when retrieving page')
p = re.compile(r"copy\(\'(.*)\'\)", re.IGNORECASE)
f = p.search(data)
if not f:
# the link in which plugin relies is missing!
raise UrlRewritingError(
'Failed to get url from download page. Plugin may need a update.'
)
else:
return f.group(1)
@plugin.internet(log)
def entries_from_search(self, name, url=None):
"""Parses torrent download url from search results"""
name = normalize_unicode(name)
if not url:
url = 'http://www.newtorrents.info/search/%s' % quote(
name.encode('utf-8'), safe=b':/~?=&%'
)
log.debug('search url: %s' % url)
html = requests.get(url).text
# fix so that BS does not crash
# TODO: should use beautifulsoup massage
html = re.sub(r'()', r'\1\2', html)
soup = get_soup(html)
@plugin.internet(log)
def entries_from_search(self, name, url=None):
"""Parses torrent download url from search results"""
name = normalize_unicode(name)
if not url:
url = 'http://www.newtorrents.info/search/%s' % quote(
name.encode('utf-8'), safe=b':/~?=&%'
)
log.debug('search url: %s' % url)
html = requests.get(url).text
# fix so that BS does not crash
# TODO: should use beautifulsoup massage
html = re.sub(r'()', r'\1\2', html)
soup = get_soup(html)
@plugin.internet(log)
def search(self, task, entry, config):
"""
Search for entries on MoreThanTV
"""
params = {}
if 'category' in config:
categories = (
config['category']
if isinstance(config['category'], list)
else [config['category']]
)
for category in categories:
params[CATEGORIES[category]] = 1
if 'tags' in config:
@plugin.internet(log)
def on_task_input(self, task, config):
proxy = T411Proxy()
proxy.set_credential()
query = T411InputPlugin.build_request_from(config)
try:
return proxy.search(query)
except ApiError as e:
log.warning("Server send an error message : %d - %s", e.code, e.message)
return []
@plugin.internet(log)
# urlrewriter API
def url_rewrite(self, task, entry):
soup = self._get_soup(task, entry['url'])
link_re = re.compile('rarefile\.net.*\.rar$')
# grab links from the main entry:
blog_entry = soup.find('div', class_="entry")
num_links = 0
link_list = None
for paragraph in blog_entry.find_all('p'):
links = paragraph.find_all('a', href=link_re)
if len(links) > num_links:
link_list = links
num_links = len(links)
if 'urls' in entry: