Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from_date=True,
basic_auth=True,
token_auth=True,
archive=True)
parser.parser.add_argument('origin')
return parser
class ClassifiedFieldsBackendCommand(MockedBackendCommand):
"""Mocked backend command for testing classified fields filtering"""
BACKEND = ClassifiedFieldsBackend
class NoArchiveBackendCommand(BackendCommand):
"""Mocked backend command class used for testing which does not support archive"""
BACKEND = CommandBackend
def __init__(self, *args):
super().__init__(*args)
def _pre_init(self):
setattr(self.parsed_args, 'pre_init', True)
def _post_init(self):
setattr(self.parsed_args, 'post_init', True)
@classmethod
def setup_cmd_parser(cls):
parser = BackendCommandArgumentParser(cls.BACKEND,
"""
if not self.archive:
raise ArchiveError(cause="Archive not provided")
data = self.archive.retrieve(method, args, None)
if isinstance(data, nntplib.NNTPTemporaryError):
raise data
return data
def quit(self):
self.handler.quit()
class NNTPCommand(BackendCommand):
"""Class to run NNTP backend from the command line."""
BACKEND = NNTP
@classmethod
def setup_cmd_parser(cls):
"""Returns the NNTP argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
offset=True,
archive=True)
# Required arguments
parser.parser.add_argument('host',
help="NNTP server host")
parser.parser.add_argument('group',
"""Returns whether it supports caching items on the fetch process.
:returns: this backend does not support items cache
"""
return False
@classmethod
def has_resuming(cls):
"""Returns whether it supports to resume the fetch process.
:returns: this backend supports items resuming
"""
return True
class GmaneCommand(BackendCommand):
"""Class to run Gmane backend from the command line."""
BACKEND = Gmane
def _pre_init(self):
"""Initialize mailing lists directory path"""
if not self.parsed_args.mboxes_path:
base_path = os.path.expanduser('~/.perceval/mailinglists/')
dirpath = os.path.join(base_path, self.parsed_args.mailing_list)
else:
dirpath = self.parsed_args.mboxes_path
setattr(self.parsed_args, 'dirpath', dirpath)
@staticmethod
the given command
"""
if res:
url = urijoin(self.base_url, res, res_id)
else:
url = urijoin(self.base_url, res_id)
url += self.TJSON
logger.debug("Discourse client calls resource: %s %s params: %s",
res, res_id, str(params))
r = self.fetch(url, payload=params)
return r.text
class DiscourseCommand(BackendCommand):
"""Class to run Discourse backend from the command line."""
BACKEND = Discourse
@classmethod
def setup_cmd_parser(cls):
"""Returns the Discourse argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
from_date=True,
token_auth=True,
archive=True)
# Required arguments
parser.parser.add_argument('url',
help="URL of the Discourse server")
:param from_archive: it tells whether to write/read the archive
:raises HTTPError: when an error occurs doing the request
"""
def __init__(self, url, archive=None, from_archive=False):
super().__init__(url, archive=archive, from_archive=from_archive)
def get_entries(self):
""" Retrieve all entries from a RSS feed"""
req = self.fetch(self.base_url)
return req.text
class RSSCommand(BackendCommand):
"""Class to run RSS backend from the command line."""
BACKEND = RSS
@classmethod
def setup_cmd_parser(cls):
"""Returns the RSS argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
archive=True)
# Required arguments
parser.parser.add_argument('url',
help="URL of the RSS feed")
return parser
cmd += " %s " % (filter_)
cmd += " --all-approvals --comments --format=JSON"
gerrit_version = self.version
if last_item is not None:
if gerrit_version[0] == 2 and gerrit_version[1] >= 9:
cmd += " --start=" + str(last_item)
else:
cmd += " resume_sortkey:" + last_item
return cmd
class GerritCommand(BackendCommand):
"""Class to run Gerrit backend from the command line."""
BACKEND = Gerrit
@classmethod
def setup_cmd_parser(cls):
"""Returns the Gerrit argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND.CATEGORIES,
from_date=True,
archive=True)
# Gerrit options
group = parser.parser.add_argument_group('Gerrit arguments')
group.add_argument('--user', dest='user',
help="Gerrit ssh user")
self.last_rate_limit_checked = self.rate_limit
except requests.exceptions.HTTPError as error:
if error.response.status_code == 404:
logger.warning("Rate limit not initialized: %s", error)
else:
raise error
def _set_extra_headers(self):
"""Set extra headers for session"""
headers = {}
headers.update({'Accept': 'application/vnd.github.squirrel-girl-preview'})
return headers
class GitHubCommand(BackendCommand):
"""Class to run GitHub backend from the command line."""
BACKEND = GitHub
@classmethod
def setup_cmd_parser(cls):
"""Returns the GitHub argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
from_date=True,
to_date=True,
token_auth=False,
archive=True)
# GitHub options
group = parser.parser.add_argument_group('GitHub arguments')
group.add_argument('--enterprise-url', dest='base_url',
buglist = self.parse_buglist(raw_csv)
return [bug for bug in buglist]
def __fetch_and_parse_bugs_details(self, *bug_ids):
logger.debug("Fetching and parsing bugs details")
raw_bugs = self.client.bugs(*bug_ids)
return self.parse_bugs_details(raw_bugs)
def __fetch_and_parse_bug_activity(self, bug_id):
logger.debug("Fetching and parsing bug #%s activity", bug_id)
raw_activity = self.client.bug_activity(bug_id)
activity = self.parse_bug_activity(raw_activity)
return [event for event in activity]
class BugzillaCommand(BackendCommand):
"""Class to run Bugzilla backend from the command line."""
BACKEND = Bugzilla
@classmethod
def setup_cmd_parser(cls):
"""Returns the Bugzilla argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
from_date=True,
basic_auth=True,
archive=True)
# Bugzilla options
group = parser.parser.add_argument_group('Bugzilla arguments')
group.add_argument('--max-bugs', dest='max_bugs',
else:
logger.info("No items were found for %s." % url)
def __init_session(self):
if (self.user and self.password) is not None:
self.session.auth = (self.user, self.password)
if self.cert:
self.session.cert = self.cert
if self.verify is not True:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.session.verify = False
class JiraCommand(BackendCommand):
"""Class to run Jira backend from the command line."""
BACKEND = Jira
@classmethod
def setup_cmd_parser(cls):
"""Returns the Jira argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
from_date=True,
basic_auth=True,
archive=True)
# JIRA options
group = parser.parser.add_argument_group('JIRA arguments')
group.add_argument('--project',
if len(keywords) == 1:
query_str = keywords[0]
else:
query_str = ' '.join([k for k in keywords])
logger.info("Fetching hits for '%s'", query_str)
params = {'q': query_str}
# Make the request
req = self.fetch(GOOGLE_SEARCH_URL, payload=params)
return req.text
class GoogleHitsCommand(BackendCommand):
"""Class to run GoogleHits backend from the command line."""
BACKEND = GoogleHits
@classmethod
def setup_cmd_parser(cls):
"""Returns the GoogleHits argument parser."""
parser = BackendCommandArgumentParser(cls.BACKEND,
archive=True)
group = parser.parser.add_argument_group('GoogleHits arguments')
# Generic client options
group.add_argument('--max-retries', dest='max_retries',
default=MAX_RETRIES, type=int,
help="number of API call retries")