Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import itertools
import time
import six
import requests
from bugwarrior.services import IssueService, Issue, ServiceClient
from bugwarrior.config import die
import logging
log = logging.getLogger(__name__)
class ActiveCollab2Client(ServiceClient):
def __init__(self, url, key, user_id, projects, target):
self.url = url
self.key = key
self.user_id = user_id
self.projects = projects
self.target = target
def get_task_dict(self, project, key, task):
assigned_task = {
'project': project
}
if task[u'type'] == 'Ticket':
# Load Ticket data
# @todo Implement threading here.
ticket_data = self.call_api(
"/projects/" + six.text_type(task[u'project_id']) +
import six
import requests
from bugwarrior.config import die
from bugwarrior.services import Issue, IssueService, ServiceClient
import logging
log = logging.getLogger(__name__)
class TeamLabClient(ServiceClient):
def __init__(self, hostname, verbose=False):
self.hostname = hostname
self.verbose = verbose
self.token = None
def authenticate(self, login, password):
resp = self.call_api("/api/1.0/authentication.json", post={
"userName": six.text_type(login),
"password": six.text_type(password),
})
self.token = six.text_type(resp["token"])
def get_task_list(self):
resp = self.call_api("/api/1.0/project/task/@self.json")
return resp
return self.build_default_description(
title=self.record['subject'],
url=self.get_processed_url(self.record['url']),
number=self.record['number'],
cls='issue'
)
def get_priority(self):
return self.PRIORITY_MAP.get(
self.record.get('severity'),
self.origin['default_priority']
)
class BTSService(IssueService, ServiceClient):
ISSUE_CLASS = BTSIssue
CONFIG_PREFIX = 'bts'
def __init__(self, *args, **kw):
super(BTSService, self).__init__(*args, **kw)
self.email = self.config.get('email', default=None)
self.packages = self.config.get('packages', default=None)
self.udd = self.config.get(
'udd', default=False, to_type=asbool)
self.udd_ignore_sponsor = self.config.get(
'udd_ignore_sponsor', default=True, to_type=asbool)
self.ignore_pkg = self.config.get('ignore_pkg', default=None)
self.ignore_src = self.config.get('ignore_src', default=None)
self.ignore_pending = self.config.get(
'ignore_pending', default=True, to_type=asbool)
self.SUMMARY: self.record['subject'],
}
def get_tags(self):
return [x if isinstance(x, six.string_types) else x[0] for x in self.record['tags']]
def get_default_description(self):
return self.build_default_description(
title=self.record['subject'],
url=self.get_processed_url(self.extra['url']),
number=self.record['ref'],
cls='issue',
)
class TaigaService(IssueService, ServiceClient):
ISSUE_CLASS = TaigaIssue
CONFIG_PREFIX = 'taiga'
def __init__(self, *args, **kw):
super(TaigaService, self).__init__(*args, **kw)
self.url = self.config.get('base_uri')
self.include_tasks = self.config.get('include_tasks', default=False)
self.auth_token = self.get_password('auth_token')
self.label_template = self.config.get(
'label_template', default='{{label}}', to_type=six.text_type
)
self.session = requests.session()
self.session.headers.update({
'Accept': 'application/json',
'Authorization': 'Bearer %s' % self.auth_token,
})
tags.append(
label_template.render(context)
)
return tags
def get_default_description(self):
return self.build_default_description(
title=self.title,
url=self.get_processed_url(self.extra['issue_url']),
number=self.record.get('iid', ''),
cls=self.extra['type'],
)
class GitlabService(IssueService, ServiceClient):
ISSUE_CLASS = GitlabIssue
CONFIG_PREFIX = 'gitlab'
def __init__(self, *args, **kw):
super(GitlabService, self).__init__(*args, **kw)
host = self.config.get(
'host', default='gitlab.com', to_type=six.text_type)
self.login = self.config.get('login')
token = self.get_password('token', self.login)
self.auth = (host, token)
if self.config.get('use_https', default=True, to_type=asbool):
self.scheme = 'https'
else:
self.scheme = 'http'
context = self.record.copy()
tag_template = Template(self.origin['tag_template'])
for tag_dict in self.record.get('tag', []):
context.update({
'tag': re.sub(r'[^a-zA-Z0-9]', '_', tag_dict['value'])
})
tags.append(
tag_template.render(context)
)
return tags
class YoutrackService(IssueService, ServiceClient):
ISSUE_CLASS = YoutrackIssue
CONFIG_PREFIX = 'youtrack'
def __init__(self, *args, **kw):
super(YoutrackService, self).__init__(*args, **kw)
self.host = self.config.get('host')
if self.config.get('use_https', default=True, to_type=asbool):
self.scheme = 'https'
self.port = '443'
else:
self.scheme = 'http'
self.port = '80'
self.port = self.config.get('port', self.port)
self.base_url = '%s://%s:%s' % (self.scheme, self.host, self.port)
if self.config.get('incloud_instance', default=False, to_type=asbool):
self.CARDID: self.record['id'],
self.SHORTCARDID: self.record['idShort'],
self.DESCRIPTION: self.record['desc'],
self.BOARD: self.extra['boardname'],
self.LIST: self.extra['listname'],
self.SHORTLINK: self.record['shortLink'],
self.SHORTURL: self.record['shortUrl'],
self.URL: self.record['url'],
'annotations': self.extra.get('annotations', []),
}
if self.origin['import_labels_as_tags']:
twdict['tags'] = self.get_tags(twdict)
return twdict
class TrelloService(IssueService, ServiceClient):
ISSUE_CLASS = TrelloIssue
# What prefix should we use for this service's configuration values
CONFIG_PREFIX = 'trello'
@classmethod
def validate_config(cls, service_config, target):
def check_key(opt):
""" Check that the given key exist in the configuration """
if opt not in service_config:
die("[{}] has no 'trello.{}'".format(target, opt))
super(TrelloService, cls).validate_config(service_config, target)
check_key('token')
check_key('api_key')
@staticmethod
def get_keyring_service(service_config):
self.URL: self.extra['url'],
self.FOREIGN_ID: self.record['id'],
self.TITLE: self.record['title'],
}
def get_default_description(self):
return self.build_default_description(
title=self.record['title'],
url=self.get_processed_url(self.extra['url']),
number=self.record['id'],
cls='issue'
)
class BitbucketService(IssueService, ServiceClient):
ISSUE_CLASS = BitbucketIssue
CONFIG_PREFIX = 'bitbucket'
BASE_API2 = 'https://api.bitbucket.org/2.0'
BASE_URL = 'https://bitbucket.org/'
def __init__(self, *args, **kw):
super(BitbucketService, self).__init__(*args, **kw)
key = self.config.get('key')
secret = self.config.get('secret')
auth = {'oauth': (key, secret)}
refresh_token = self.config.data.get('bitbucket_refresh_token')
if not refresh_token:
import six
import requests
import re
from bugwarrior.config import die, asbool
from bugwarrior.services import Issue, IssueService, ServiceClient
from taskw import TaskWarriorShellout
import logging
log = logging.getLogger(__name__)
class RedMineClient(ServiceClient):
def __init__(self, url, key, auth, issue_limit, verify_ssl):
self.url = url
self.key = key
self.auth = auth
self.issue_limit = issue_limit
self.verify_ssl = verify_ssl
def find_issues(self, issue_limit=100, only_if_assigned=False):
args = {}
# TODO: if issue_limit is greater than 100, implement pagination to return all issues.
# Leave the implementation of this to the unlucky soul with >100 issues assigned to them.
if issue_limit is not None:
args["limit"] = issue_limit
if only_if_assigned:
args["assigned_to_id"] = 'me'
self.FOREIGN_ID: self.record['_number'],
self.SUMMARY: self.record['subject'],
self.BRANCH: self.record['branch'],
self.TOPIC: self.record.get('topic', 'notopic'),
}
def get_default_description(self):
return self.build_default_description(
title=self.record['subject'],
url=self.get_processed_url(self.extra['url']),
number=self.record['_number'],
cls='pull_request',
)
class GerritService(IssueService, ServiceClient):
ISSUE_CLASS = GerritIssue
CONFIG_PREFIX = 'gerrit'
def __init__(self, *args, **kw):
super(GerritService, self).__init__(*args, **kw)
self.url = self.config.get('base_uri').strip('/')
self.username = self.config.get('username')
self.password = self.get_password('password', self.username)
self.ssl_ca_path = self.config.get('ssl_ca_path', None)
self.session = requests.session()
self.session.headers.update({
'Accept': 'application/json',
'Accept-Encoding': 'gzip',
})
self.query_string = self.config.get(
'query',