Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
headers=HA_HEADERS)
self.assertEqual(422, req.status_code)
# Setup a real one
req = requests.post(
_url(const.URL_API_EVENT_FORWARD),
data=json.dumps({
'api_password': API_PASSWORD,
'host': '127.0.0.1',
'port': SERVER_PORT
}),
headers=HA_HEADERS)
self.assertEqual(200, req.status_code)
# Delete it again..
req = requests.delete(
_url(const.URL_API_EVENT_FORWARD),
data=json.dumps({}),
headers=HA_HEADERS)
self.assertEqual(400, req.status_code)
req = requests.delete(
_url(const.URL_API_EVENT_FORWARD),
data=json.dumps({
'host': '127.0.0.1',
'port': 'abcd'
}),
headers=HA_HEADERS)
self.assertEqual(422, req.status_code)
req = requests.delete(
_url(const.URL_API_EVENT_FORWARD),
print('request failed')
sys.exit(1)
for queue in r.json():
if queue['consumers'] == 0:
try:
agtq = re.search('mig.sched.(.+?)$', queue['name']).group(1)
except AttributeError:
continue
r2 = requests.get('http://localhost:15672/api/queues/mig/mig.agt.'+agtq,
auth=(mqadmin, mqpasswd))
if r2.status_code != 200:
continue
if r2.json()['consumers'] == 0:
print('%s has no consumer' % (agtq))
for loc in ['agt', 'sched']:
r3 = requests.delete('http://localhost:15672/api/queues/mig/mig.'+loc+'.'+agtq,
auth=(mqadmin, mqpasswd))
if r3.status_code == 204:
print('deleted mig.'+loc+'.'+agtq)
else:
print('failed to delete mig.'+loc+'.'+agtq)
headers = {'host': domain_name}
rsp = requests.get(req, headers=headers)
if rsp.status_code == 200:
root_json = json.loads(rsp.text)
if rsp.status_code != 200 and mode in ('r', 'r+'):
# file must exist
raise IOError(rsp.reason)
if rsp.status_code == 200 and mode in ('w-', 'x'):
# Fail if exists
raise IOError("domain already exists")
if rsp.status_code == 200 and mode == 'w':
# delete existing domain
rsp = requests.delete(req, headers=headers)
if rsp.status_code != 200:
# failed to delete
raise IOError(rsp.reason)
root_json = None
if root_json is None:
# create the domain
rsp = requests.put(req, headers=headers)
if rsp.status_code != 201:
raise IOError(rsp.reason)
root_json = json.loads(rsp.text)
if 'root' not in root_json:
raise IOError("Unexpected error")
if 'created' not in root_json:
raise IOError("Unexpected error")
if 'lastModified' not in root_json:
def delete_event(self, post_id):
""" Delete event.
:param str post_id: WordPress post ID
"""
# Delete tribe event
tribe_endpoint = "{}/events/{}".format(self.service.tribe_endpoint,
post_id)
self.logger.info('DELETE %s', tribe_endpoint)
requests.delete(tribe_endpoint, headers=self.basic_auth())
# Delete WordPress post
return self.delete_post(post_id)
def http_request_for_delete(url_path,**payload):
response = requests.delete("https://"+api_host+":"+api_port+url_path, auth=(api_user_name,api_user_password), verify=False, data=json.dumps(payload),headers={'content-type': 'application/json'})
return response
def delete(self, path):
config = self.instance()
url = config.get('cenit_url') + API_PATH + path
headers = self.headers(config)
try:
_logger.info("[DEL] %s ? {%s}", url, headers)
r = requests.delete(url, headers=headers)
except Exception as e:
_logger.error(e)
raise exceptions.AccessError("Error trying to connect to Cenit.")
if 200 <= r.status_code < 300:
return True
try:
error = r.json()
_logger.error(error)
except Exception as e:
_logger.error(e)
raise exceptions.ValidationError("Cenit returned with errors")
if 400 <= error.get('code', 400) < 500:
raise exceptions.AccessError("Error trying to connect to Cenit.")
def delete(self, path):
url = '%s/%s' % (self.request_base(), path)
req = requests.delete(url)
if req.status_code == 200:
return json.loads(req.text)
raise ValueError('response error %d:%s' % (req.status_code, req.text))
method=requests.get,
headers={'content-type': 'application/json'}):
try:
if payload is None:
response = method('/'.join((root_uri, str(path))),
headers=headers)
else:
response = method('/'.join((root_uri, str(path))),
data=json.dumps(payload),
headers=headers)
except requests.ConnectionError:
log.error('** Dashboard is probably down')
return {}
log.debug('Server response: {} in {}s'.format(response.status_code, response.elapsed.total_seconds()))
if method == requests.delete:
return {}
if not response.ok:
log.warning('** Server responded with {}::{}'.format(response.status_code, response.reason))
return {}
try:
return response.json()
except ValueError:
log.error('** Could not decode json response')
return {}
return {}
def delete_policy_id(self, id):
'''**Description**
Delete the policy with the given id
**Arguments**
- id: the id of the policy to delete
**Success Return Value**
The JSON object representing the now-deleted policy.
**Example**
`examples/delete_policy.py `_
'''
res = requests.delete(self.url + '/api/v2/policies/{}'.format(id), headers=self.hdrs, verify=self.ssl_verify)
return self._request_result(res)
def http_request_for_delete(url_path,**payload):
response = requests.delete("https://"+api_host+":"+api_port+url_path, auth=(api_user_name,api_user_password), verify=False, data=json.dumps(payload),headers={'content-type': 'application/json'})
return response