Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_03_rdm_portmapping(self):
url = ("{}/config/transportpce-portmapping:network/"
"nodes/ROADMA01"
.format(self.restconf_baseurl))
headers = {'content-type': 'application/json'}
response = requests.request(
"GET", url, headers=headers, auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn(
{'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
res['nodes'][0]['mapping'])
self.assertIn(
{'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional'},
res['nodes'][0]['mapping'])
#Delete in the topology-netconf
url = ("{}/config/network-topology:"
"network-topology/topology/topology-netconf/node/ROADMC01"
.format(self.restconf_baseurl))
data = {}
headers = {'content-type': 'application/json'}
response = requests.request(
"DELETE", url, data=json.dumps(data), headers=headers,
auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.ok)
#Delete in the clli-network
url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
.format(self.restconf_baseurl))
data = {}
headers = {'content-type': 'application/json'}
response = requests.request(
"DELETE", url, data=json.dumps(data), headers=headers,
auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.ok)
url (str): URL to issue the request against.
timeout (int | tuple): the timeout period for the request.
**kwargs (dict): additional arguments for the request.
(see `requests.request` for supported kwargs)
Returns:
requests.Response: the `requests.Response` object returned by the request.
Raises:
VaporRequestError: a requests error was encountered when sending the
HTTP request.
VaporError: an unexpected exception was encountered when sending the
HTTP request.
"""
try:
response = requests.request(method=method, url=url, timeout=timeout, **kwargs)
except requests.exceptions.RequestException as e:
logger.error('Failed to issue request ({}): {}'.format(url, e))
raise VaporRequestError(e)
except Exception as e:
logger.error('Unexpected exception encountered ({}): {}'.format(url, e))
raise VaporError(e)
else:
return response
def uninstall_trackers():
if requests.request == _request:
requests.request = _original_methods['request']
if requests.get == _get:
requests.get = _original_methods['get']
if requests.post == _post:
requests.post = _original_methods['post']
if requests.patch == _patch:
requests.patch = _original_methods['patch']
if requests.put == _put:
requests.put = _original_methods['put']
def http_request(method: str, path: str, params: dict = None, data: dict = None) -> dict:
"""
Sends an HTTP request using the provided arguments
:param method: HTTP method
:param path: URL path
:param params: URL query params
:param data: Request body
:return: JSON response
"""
params: dict = params if params is not None else {}
data: dict = data if data is not None else {}
try:
res: Response = requests.request(
method,
BASE_URL + path,
auth=(USERNAME, PASSWORD),
verify=USE_SSL,
params=params,
data=json.dumps(data),
headers=HEADERS)
except requests.exceptions.SSLError:
ssl_error = 'Could not connect to PhishLabs IOC Feed: Could not verify certificate.'
if RAISE_EXCEPTION_ON_ERROR:
raise Exception(ssl_error)
return return_error(ssl_error)
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout,
requests.exceptions.TooManyRedirects, requests.exceptions.RequestException) as e:
connection_error = 'Could not connect to PhishLabs IOC Feed: {}'.format(str(e))
if RAISE_EXCEPTION_ON_ERROR:
def get_xml(self, url, method="GET", **parameters):
"""Get XML for the given url.
:param url: The url to get XML for.
:param method: Default ``'GET'``. The HTTP method to use.
:param parameters: Query parameters.
"""
headers = {"X-TrackerToken": self.token}
if method != "GET":
headers["Content-Length"] = '0'
response = request(method, url, params=parameters, headers=headers,
verify=False)
xml = minidom.parseString(response.content)
return xml.firstChild
def __call__(self, url, method='GET', parse_json_resp=True,
api='https://api.github.com', **kwargs):
url = '%s/repos/%s/%s%s' % (api, self.owner, self.repo, url)
headers = kwargs.pop('headers', {})
headers.update({'Authorization': 'token %s' % self.token})
print "Will make %s request to %s: %s" % (method, url, kwargs)
resp = requests.request(method, url, headers=headers, **kwargs)
if not resp.ok:
print resp.status_code
print resp.text
raise Exception(resp.status_code)
if parse_json_resp:
try:
return resp.json()
except Exception:
print "Error decoding json response"
print resp.text
raise
else:
return resp
def CLIP(self, method, resource, body, addUsername=True):
reply = ""
if type(body) == dict:
body = json.dumps(body)
url = '/api'
if addUsername == True:
url += '/{0}'.format(self.username)
url += resource
self.log('> {0}, {1}, {2}\n'.format(method, url, body))
try:
r = requests.request(method, "http://{0}{1}".format(self.ip, url), data=body, timeout=1.5)
reply = r.json()
except requests.exceptions.ConnectTimeout as e:
self.log("Connection timeout! IP:{0}, URL:{1}".format(self.ip, url))
except Exception as e:
self.log('E {0}\n'.format(traceback.format_exc()))
raise e
else:
self.log('< {0}\n'.format(reply))
return reply
def getToken(client_id, client_secret):
""" This proc will return username/password for Stage CSSM for APIs"""
logm = ""
# For Production CSSM
url = "https://cloudsso.cisco.com/as/token.oauth2"
querystring = {"client_id":client_id,"client_secret":client_secret,"grant_type":"client_credentials"}
headers = {'Cache-Control': "no-cache", 'Postman-Token': "7738e018-f7da-8777-3db5-821f52dd6352"}
response = requests.request("POST", url, headers=headers, params=querystring)
try:
token_type = response.json()['token_type']
access_token = response.json()['access_token']
except:
logm = 'Unable to get the usr/pwd for Stage CSSM. Error: %s' % response.text
return logm
usr_pwd = ' '.join((token_type, access_token))
return str(usr_pwd)
full_url = full_url + '?' +param
sig_str = method + full_url + timestamp
elif method == 'POST':
sig_str = method + full_url + timestamp + param
signature = self.get_signed(bytes(sig_str, 'utf-8'))
headers = {
'FC-ACCESS-KEY': self.key,
'FC-ACCESS-SIGNATURE': signature,
'FC-ACCESS-TIMESTAMP': timestamp
}
try:
r = requests.request(method, full_url, headers = headers, json=payload)
r.raise_for_status()
except requests.exceptions.HTTPError as err:
print(err)
print(r.text)
if r.status_code == 200:
return r.json()