Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"lgx-port-rack": "000000.00",
"lgx-port-shelf": "00"
}
},
"optic-type": "gray"
},
"due-date": "2016-11-28T00:00:01Z",
"operator-contact": "pw1234"
}
}
headers = {'content-type': 'application/json',
"Accept": "application/json"}
response = requests.request(
"POST", url, data=json.dumps(data), headers=headers,
auth=('admin', 'admin'))
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
res['output']['configuration-response-common']['response-message'])
time.sleep(self.WAITING)
"""
# Disable Throttling to speed testing
plugins.NotifyBase.request_rate_per_sec = 0
# Bot Token
bot_token = '123456789:abcdefg_hijklmnop'
invalid_bot_token = 'abcd:123'
# Chat ID
chat_ids = 'l2g, lead2gold'
# Prepare Mock
mock_get.return_value = requests.Request()
mock_post.return_value = requests.Request()
mock_post.return_value.status_code = requests.codes.ok
mock_get.return_value.status_code = requests.codes.ok
mock_get.return_value.content = '{}'
mock_post.return_value.content = '{}'
# Exception should be thrown about the fact no bot token was specified
with pytest.raises(TypeError):
plugins.NotifyTelegram(bot_token=None, targets=chat_ids)
# Invalid JSON while trying to detect bot owner
mock_get.return_value.content = '{'
mock_post.return_value.content = '}'
with pytest.raises(TypeError):
plugins.NotifyTelegram(bot_token=bot_token, targets=None)
# Invalid JSON while trying to detect bot owner + 400 error
mock_get.return_value.status_code = requests.codes.internal_server_error
mock_post.return_value.status_code = requests.codes.internal_server_error
def submit(self, target_language, bytecode):
response = requests.post(self.url, data=json.dumps(
{"gremlin": bytecode, "source": self.traversal_source, "language": target_language, "bindings": bytecode.bindings}))
if response.status_code != requests.codes.ok:
raise BaseException(response.text)
traversers = []
for x in response.json()['result']['data']:
traversers.append(Traverser(x, 1))
return RemoteResponse(iter(traversers), {})
url_splitted = url.split("/", 4)
if len(url_splitted) > 4:
url = "/".join(url_splitted[0:4])
data = url_splitted[4]
else:
data = None
headers['Content-Type'] = "text/plain"
log_info("Calling URL: {} (username = {})".format(url, username))
response = requests.post(url, timeout=timeout_seconds, auth=(username, password), headers=headers,
data=data)
else:
raise ValueError('Unknown request_type: {}.'.format(request_type))
if response.status_code != requests.codes.ok:
exc = API_EXCEPTIONS[response.status_code]
raise exc(response.text)
return response
def pull(self):
"""
Read the base64 encoded data from the sync server.
:return: base64 encoded data
:rtype: str
"""
request = requests.post(self.server_url + "ajax/read.php",
data="",
headers=self.headers,
verify=os.path.join(os.path.dirname(os.path.realpath(__file__)),
self.certificate_filename))
if request.status_code == requests.codes.ok:
received_data = json.loads(request.text)
if 'status' in received_data and received_data['status']:
if 'result' in received_data:
return True, received_data['result']
else:
return True, ''
else:
return False, ''
else:
return False, ''
def delete(self, url):
req = None
try:
req = requests.delete(url, auth=self.auth, timeout=getattr(self, 'timeout'))
ok = req.status_code in [requests.codes.ok, requests.codes.no_content]
except requests.exceptions.ConnectionError:
ok = False
return {
"ok": ok,
"result": req
}
def parsePages(links):
numLinks = links.__len__()
for l in range(numLinks):
r = requests.get(links[l])
if (r.status_code == requests.codes.ok):
print(links[l] + " is " + colored('TAKEN', 'red', attrs=['bold']))
else:
print(links[l] + " is " + colored('AVAILABLE', 'green', attrs=['bold']))
file = open('available.txt', 'a')
file.write(links[l] + "\n")
file.close()
file_name = file_dict['FileDescription']
else:
file_name = item_dict['code'] + '-' + str(i)
if not isinstance(file_name, str):
file_name = item_dict['code'] + '-' + str(i)
act_dir = self.set_check_directory(self.cach_file_dir)
path = os.path.join(act_dir, file_name)
if os.path.exists(path):
pr_path = str(unidecode(path))
print('(' + str(item_index) + ') Uploading: ' + pr_path)
sleep(self.delay_before_request)
try:
r = item.upload_file(path,
key=file_name,
metadata=metadata)
if r.status_code == requests.codes.ok:
print('Yeah! Saved item: ' + item_id)
else:
print('HTTP code: ' + str(r.status_code))
print('Problem with: ' + item_id)
except:
print('Bad upload with: ' + item_id)
if i == 0:
# no file rows created, so only add the pre_row_item
row_items.append(pre_row_item)
dir = self.set_check_directory(self.cach_file_dir)
path = os.path.join(dir, 'irma-manifest.csv')
f = codecs.open(path, 'w', encoding='utf-8')
writer = csv.writer(f, dialect=csv.excel, quoting=csv.QUOTE_ALL)
writer.writerow(first_row)
for row_item in row_items:
row = []
# 7. Request to the auth service url
formdata = {
'j_username': user,
'j_password': pssw,
'submit': 'Login',
'redirect_uri': 'https://www.hbpneuromorphic.eu/complete/hbp-oauth2/&response_type=code&client_id=nmpi'
}
headers = {'accept': 'application/json'}
rNMPI2 = s.post("https://services.humanbrainproject.eu/oidc/j_spring_security_check",
data=formdata,
allow_redirects=True,
verify=False,
headers=headers)
# check good communication
if rNMPI2.status_code == requests.codes.ok:
# check success address
if rNMPI2.url == base_url:
# print rNMPI2.text
res = rNMPI2.json()
print res['username']
print res['access_token']
# unauthorized
else:
if 'error' in rNMPI2.url:
print "Authentication Failure: No token retrieved."
else:
print "Unhandeled error in Authentication"
else:
print "Communication error"
def fetch_data(url=LOC_DATA_URL):
resp = requests.get(url)
if resp.status_code == requests.codes.ok:
return resp.text
else:
raise FetchError(resp)