Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _error_handling(self, res):
"""
Helper function to do the error handling
:params res: the response from a request
"""
# making the error handling generic so if an status_code starting with 2 doesn't exist, we raise the error
if res.status_code // 100 != 2:
error = self._get_response_json(res)
raise exceptions.KeenApiError(error)
if milestone:
milestone_number = pr_data["milestone"].get("number", None)
else:
milestone_number = None
print("----------------------------------------")
# print('milestone data :', pr_data['milestone'])
print("----------------------------------------")
if not target_branch.strip():
milestone_title = pr_data["milestone"]["title"]
parts = milestone_title.split(".")
parts[-1] = "x"
infered_target_branch = ".".join(parts)
print("inferring branch....", infered_target_branch)
target_branch = infered_target_branch
keen.add_event("backport_infering_branch", {"infering_remove_x": 1})
if milestone_number:
milestone_number = int(milestone_number)
labels_names = []
try:
label_names = [l["name"] for l in pr_data["labels"]]
if not label_names and ("issue" in payload.keys()):
labels_names = [l["name"] for l in payload["issue"]["labels"]]
except KeyError:
print("Did not find labels|", pr_data)
# clone locally
# this process can take some time, regen token
atk = session.token()
# FORK it.
fork_epoch = time.time()
"Oops, something went wrong applying the patch... Please have a look at my logs.",
)
print(e.stderr)
print("----")
print(e.stdout)
print("----")
s_reason = "Unknown error line 491"
keen_stats()
return
except Exception as e:
session.post_comment(
comment_url, "Hum, I actually crashed, that should not have happened."
)
print("\n" + e.stderr.decode("utf8", "replace"), file=sys.stderr)
print("\n" + repo.git.status(), file=sys.stderr)
keen.add_event("error", {"git_crash": 1})
s_reason = "Unknown error line 501"
keen_stats()
return
# write the commit message
repo.git.commit("--amend", "-m", msg)
print("== PR #%i applied, with msg:" % prnumber)
print()
print(msg)
print("== ")
# Push the backported work
print("== Pushing work....:")
try:
with requests.Session() as s:
res = s.send(prepared)
return res
except:
import traceback
traceback.print_exc()
pool.submit(fn, self.request, self.config.forward_staging_url)
except:
print(red + "failure to forward")
import traceback
traceback.print_exc()
if "X-Hub-Signature" not in self.request.headers:
keen.add_event("attack", {"type": "no X-Hub-Signature"})
return self.error("WebHook not configured with secret")
if not verify_signature(
self.request.body,
self.request.headers["X-Hub-Signature"],
self.config.webhook_secret,
):
keen.add_event("attack", {"type": "wrong signature"})
return self.error(
"Cannot validate GitHub payload with provided WebHook secret"
)
payload = tornado.escape.json_decode(self.request.body)
org = payload.get("repository", {}).get("owner", {}).get("login")
if not org:
org = (
def update_access_key_full(self, access_key_id, name, is_active, permitted, options):
"""
Replaces the 'name', 'is_active', 'permitted', and 'options' values of a given key.
A master key must be set first.
:param access_key_id: the 'key' value of the access key for which the values will be replaced
:param name: the new name desired for this access key
:param is_active: whether the key should become enabled (True) or revoked (False)
:param permitted: the new list of permissions desired for this access key
:param options: the new dictionary of options for this access key
"""
url = "{0}/{1}/projects/{2}/keys/{3}".format(self.base_url, self.api_version,
self.project_id, access_key_id)
headers = utilities.headers(self.master_key)
payload_dict = {
"name": name,
"is_active": is_active,
"permitted": permitted,
"options": options
}
payload = json.dumps(payload_dict)
response = self.fulfill(HTTPMethods.POST, url, data=payload, headers=headers, timeout=self.get_timeout)
self._error_handling(response)
return response.json()
def get_access_key(self, access_key_id):
"""
Returns details on a particular access key. A master key must be set first.
:param access_key_id: the 'key' value of the access key to retreive data from
"""
url = "{0}/{1}/projects/{2}/keys/{3}".format(self.base_url, self.api_version, self.project_id,
access_key_id)
headers = utilities.headers(self.master_key)
response = self.fulfill(HTTPMethods.GET, url, headers=headers, timeout=self.get_timeout)
self._error_handling(response)
return response.json()
def _get_json(self, http_method, url, key, *args, **kwargs):
response = self.api.fulfill(
http_method,
url,
headers=headers(key),
*args,
**kwargs)
self.api._error_handling(response)
try:
response = response.json()
except ValueError:
response = "No JSON available."
return response
def list_access_keys(self):
"""
Returns a list of all access keys in this project. A master key must be set first.
"""
url = "{0}/{1}/projects/{2}/keys".format(self.base_url, self.api_version, self.project_id)
headers = utilities.headers(self.master_key)
response = self.fulfill(HTTPMethods.GET, url, headers=headers, timeout=self.get_timeout)
self._error_handling(response)
return response.json()
def post_events(self, events):
"""
Posts a single event to the Keen IO API. The write key must be set first.
:param events: an Event to upload
"""
url = "{0}/{1}/projects/{2}/events".format(self.base_url, self.api_version,
self.project_id)
headers = utilities.headers(self.write_key)
payload = json.dumps(events)
response = self.fulfill(HTTPMethods.POST, url, data=payload, headers=headers, timeout=self.post_timeout)
self._error_handling(response)
return self._get_response_json(response)
def __init__(self, commands, config):
keen.add_event("status", {"state": "starting"})
self.commands = commands
self.application = None
self.config = config
self.port = config.port
self.auth = Authenticator(
self.config.integration_id,
self.config.key,
self.config.personnal_account_token,
self.config.personnal_account_name,
)
self.auth._build_auth_id_mapping()