Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _mk_json_from_build_parameters(build_params, file_params=None):
"""make build parameters to json format."""
if not isinstance(build_params, dict):
raise exception.ParamTypeError("Build parameters must be a dict")
build_p = [{'name': k, 'value': str(v)}
for k, v in sorted(build_params.items())]
out = {'parameter': build_p}
if file_params:
file_p = [{'name': k, 'file': k}
for k in file_params.keys()]
out['parameter'].extend(file_p)
if len(out['parameter']) == 1:
out['parameter'] = out['parameter'][0]
return out
def __build_request(self, params=None, data=None, files=None, headers=None, **kwargs):
"""build kwargs for requests."""
requests_kwargs = kwargs
if self.username:
requests_kwargs["auth"] = (self.username, self.password)
if headers:
if not isinstance(headers, dict):
raise exception.ParamTypeError(
"unexpected type of parameter 'headers': %s. Expected (dict)" % type(data))
requests_kwargs["headers"] = headers
if params:
if not isinstance(params, dict):
raise exception.ParamTypeError(
"unexpected type of parameter 'params': %s. Expected (dict)" % type(data))
requests_kwargs["params"] = params
if data:
if not isinstance(data, (str, dict)):
raise exception.ParamTypeError(
"unexpected type of parameter 'data': %s. Expected (str, dict)" % type(data))
requests_kwargs["data"] = data
if files:
requests_kwargs["files"] = files
return requests_kwargs
def invoke(self, block=False, build_params=None, cause=None, files=None, delay=15):
"""trigger a new build.
Args:
block: block until the new build stopped.
build_params: trigger new build with params.
cause: set cause info.
files: send build params in files.
delay: if `block` is True, check status every `delay` seconds.
"""
if not isinstance(block, bool):
raise exception.ParamTypeError("block")
if build_params and not self.has_params:
raise exception.BadParams("%s does not support parameters" % str(self))
params = {} # Via Get string
# Either copy the params dict or make a new one.
build_params = dict(build_params.items()) if build_params else {}
if cause:
build_params['cause'] = cause
url = self.get_build_trigger_url(files)
# Build require params as form fields and as Json.
data = {'json': self.mk_json_from_build_parameters(build_params, files)}
if headers:
if not isinstance(headers, dict):
raise exception.ParamTypeError(
"unexpected type of parameter 'headers': %s. Expected (dict)" % type(data))
requests_kwargs["headers"] = headers
if params:
if not isinstance(params, dict):
raise exception.ParamTypeError(
"unexpected type of parameter 'params': %s. Expected (dict)" % type(data))
requests_kwargs["params"] = params
if data:
if not isinstance(data, (str, dict)):
raise exception.ParamTypeError(
"unexpected type of parameter 'data': %s. Expected (str, dict)" % type(data))
requests_kwargs["data"] = data
if files:
requests_kwargs["files"] = files
return requests_kwargs
import os
from cup.jenkinslib import internal
from cup.jenkinslib.internal import exception as _exception
from cup.jenkinslib.internal import jenkins as _jenkins
from cup.jenkinslib.internal import promotion as _promotion
# import Jenkins and Promotion
Jenkins = _jenkins.Jenkins
Promotion = _promotion.Promotion
# import exceptions
Error = _exception.Error
NotRunningOnJenkins = _exception.NotRunningOnJenkins
BadParam = _exception.BadParam
ParamTypeError = _exception.ParamTypeError
BadValue = _exception.BadValue
RunTimeout = _exception.RunTimeout
NotFound = _exception.NotFound
UnknownNode = _exception.UnknownNode
UnknownJob = _exception.UnknownJob
UnknownPromotion = _exception.UnknownPromotion
UnknownQueueItem = _exception.UnknownQueueItem
NotBuiltYet = _exception.NotBuiltYet
NoBuildData = _exception.NoBuildData
DeletedBuild = _exception.DeletedBuild
NoArtifacts = _exception.NoArtifacts
JenkinsAPIError = _exception.JenkinsAPIError
UnsupportedAPI = _exception.UnsupportedAPI
NotStopYet = _exception.NotStopYet
ImappropriateMethod = _exception.ImappropriateMethod
ImappropriateMethodInStaticMode = _exception.ImappropriateMethodInStaticMode
def post_and_confirm_status(self, url, params=None, data=None, files=None,
headers=None, valid=None, allow_redirects=True):
"""request url in POST method, and check status code.
Returns:
Response object.
"""
valid = valid or self.VALID_STATUS_CODES
if not isinstance(data, (str, dict)):
raise exception.ParamTypeError(
"unexpected type of parameter 'data': %s. Expected (str, dict)" % type(data))
if not headers and not files:
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
response = self.post(url, params, data, files, headers, allow_redirects)
if response.status_code not in valid:
raise exception.InvalidRequestStatus(
response.url,
method="POST",
status=response.status_code,
msg="operation failed.",
response=response)
return response