Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# rbtools will convert body_* to str() and insert "None" if we pass
# an argument.
args = {'public': public}
if review_flag:
args['ship_it'] = (review_flag == 'r+')
args['extra_data.p2rb.review_flag'] = review_flag
if body_bottom:
args['body_bottom'] = body_bottom
if body_top:
args['body_top'] = body_top
try:
r = reviews.create(**args)
except APIError as e:
print('API Error: %s: %s: %s' % (e.http_status, e.error_code,
e.rsp['err']['msg']))
return 1
print('created review %s' % r.rsp['review']['id'])
self.options.change_description
if (self.options.markdown and
tool.capabilities.has_capability('text', 'markdown')):
update_fields['changedescription_text_type'] = 'markdown'
else:
update_fields['changedescription_text_type'] = 'plain'
else:
logging.error(
'The change description field can only be set when '
'publishing an update.')
try:
draft = review_request.get_draft(only_fields='')
draft.update(**update_fields)
except APIError as e:
raise CommandError('Error publishing review request (it may '
'already be published): %s' % e)
print('Review request #%s is published.' % request_id)
def process_error(self, http_status, data):
"""Processes an error, raising an APIError with the information."""
try:
rsp = json_loads(data)
assert rsp['stat'] == 'fail'
debug("Got API Error %d (HTTP code %d): %s" %
(rsp['err']['code'], http_status, rsp['err']['msg']))
debug("Error data: %r" % rsp)
raise create_api_error(http_status, rsp['err']['code'], rsp)
except ValueError:
debug("Got HTTP error: %s: %s" % (http_status, data))
raise APIError(http_status, None, None, data)
self.error_code = error_code
self.rsp = rsp
def __str__(self):
code_str = "HTTP %d" % self.http_status
if self.error_code:
code_str += ', API Error %d' % self.error_code
if self.rsp and 'err' in self.rsp:
return '%s (%s)' % (self.rsp['err']['msg'], code_str)
else:
return code_str
class AuthorizationError(APIError):
pass
class BadRequestError(APIError):
def __str__(self):
lines = [super(BadRequestError, self).__str__()]
if self.rsp and 'fields' in self.rsp:
lines.append('')
for field, error in self.rsp['fields'].iteritems():
lines.append(' %s: %s' % (field, '; '.join(error)))
return '\n'.join(lines)
'%d of %d.') % (self.max_comments, len(self.comments))
self.body_top = "%s\n%s" % (self.body_top, warning)
del self.comments[self.max_comments:]
try:
batch_reviews = self.api_root.get_extension(
extension_name='mozreview.extension.MozReviewExtension'
).get_batch_reviews()
batch_reviews.create(
review_request_id=self.review_request_id,
ship_it=ship_it,
body_top=body_top,
body_bottom=body_bottom,
diff_comments=json.dumps(self.comments))
except APIError as e:
self.logger.error(
'batchreview: could not publish review: %s' % str(e))
return False
return True
return False
user = session.get_user()
username = user.username
# Now that we have proven ownership over the user, take the provided
# ldap_username and associate it with the account.
rbc = ReviewBoardClient(url, username=privileged_username,
password=privileged_password)
root = rbc.get_root()
ext = root.get_extension(
extension_name='mozreview.extension.MozReviewExtension')
ldap = ext.get_ldap_associations().get_item(username)
ldap.update(ldap_username=ldap_username)
except APIError:
return False
return True
def main(self, request_id):
"""Run the command."""
repository_info, tool = self.initialize_scm_tool(
client_name=self.options.repository_type)
server_url = self.get_server_url(repository_info, tool)
api_client, api_root = self.get_api(server_url)
request = get_review_request(request_id, api_root)
try:
draft = request.get_draft()
draft = draft.update(public=True)
except APIError as e:
raise CommandError("Error publishing review request (it may "
"already be published): %s" % e)
print('Review request #%s is published.' % request_id)
repo_name = repository['name']
# Repositories with a similar VOB tag get put at the beginning and
# the others at the end.
if repo_name == self.vobstag or repo_name in vob_tag_parts:
repository_scan_order.appendleft(repository)
else:
repository_scan_order.append(repository)
# Now try to find a matching uuid
for repository in repository_scan_order:
repo_name = repository['name']
try:
info = repository.get_info()
except APIError as e:
# If the current repository is not publicly accessible and the
# current user has no explicit access to it, the server will
# return error_code 101 and http_status 403.
if not (e.error_code == 101 and e.http_status == 403):
# We can safely ignore this repository unless the VOB tag
# matches.
if repo_name == self.vobstag:
raise SCMError('You do not have permission to access '
'this repository.')
continue
else:
# Bubble up any other errors
raise e
if not info or uuid != info['uuid']:
# Get only pending requests by the current user for this
# repository.
review_requests = api_root.get_review_requests(
repository=repository_id,
from_user=username,
status='pending',
expand='draft',
only_fields=','.join(only_fields),
only_links='diffs,draft',
show_all_unpublished=True)
if not review_requests:
raise ValueError('No existing review requests to update for '
'user %s'
% username)
except APIError as e:
raise ValueError('Error getting review requests for user %s: %s'
% (username, e))
summary = None
description = None
if not guess_summary or not guess_description:
try:
commit_message = tool.get_commit_message(revisions)
if commit_message:
if not guess_summary:
summary = commit_message['summary']
if not guess_description:
description = commit_message['description']