Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_url_prefix():
url = "http://127.0.0.1:5000"
meta = requests.get(url, headers={'Accept': 'application/json'}).json()
url_prefix, __, __ = parse_meta(meta)
assert url_prefix == ""
def get_chrome_url():
schema, _, hostname, path = settings.CHROME_REMOTE_URL.split("/", 3)
hostname, port = hostname.split(":")
url = "%s//%s:%s/%s" % (schema, socket.gethostbyname(hostname), port, path)
r = requests.get(url).json()
return r["webSocketDebuggerUrl"]
task_mapping_function(profiler_ips,execution_ips,node_names)
"""
Make sure you run kubectl proxy --port=8080 on a terminal.
Then this is link to get the task to node mapping
"""
line = "http://localhost:8089/api/v1/namespaces/"
line = line + jupiter_config.MAPPER_NAMESPACE + "/services/home:" + str(jupiter_config.FLASK_SVC) + "/proxy"
time.sleep(5)
print(line)
while 1:
try:
# print("get the data from " + line)
#time.sleep(5)
r = requests.get(line)
mapping = r.json()
data = json.dumps(mapping)
# print(mapping)
# print(len(mapping))
if len(mapping) != 0:
if "status" not in data:
break
except Exception as e:
print(e)
print("Will retry to get the mapping!")
pprint(mapping)
schedule = utilities.k8s_get_hosts(path1, path2, mapping)
dag = utilities.k8s_read_dag(path1)
dag.append(mapping)
print("Printing DAG:")
def get_all(cls, parent=None, **params):
if parent is not None:
route = copy(parent.route)
else:
route = {}
if cls.ID_NAME is not None:
# Empty string triggers "get all resources"
route[cls.ID_NAME] = ""
base_obj = cls(key=parent.key, route=route, config=parent.config)
"""Perform a read request against the resource"""
start = datetime.now()
r = requests.get(
base_obj._url(), auth=(base_obj.key, ""), params=params)
cls._delay_for_ratelimits(start)
if r.status_code not in cls.TRUTHY_CODES:
return base_obj._handle_request_exception(r)
response = r.json()
objects_data = response.get(base_obj.ENVELOPE or base_obj, [])
return_objects = []
for data in objects_data:
# Note that this approach does not get meta data
return_objects.append(
cls.get(
parent=parent,
id=data.get(cls.ID_NAME, data.get("id")),
"""
Make HTTP GET requests
:param url:
:param req_params:
:param req_headers:
:param verify_ssl:
:return:
"""
if req_params is None:
req_params = {}
if req_headers is None:
req_headers = {}
try:
raw_response = requests.get(url, params=req_params, headers=req_headers, verify=verify_ssl)
print_verbose_details(raw_response)
return True, raw_response.status_code, raw_response.json()
except requests.exceptions.Timeout as e:
print "[ERROR] Timeout. [url] %s [verify ssl] %s" % (url, verify_ssl)
return False, None, "Timeout: " + e.message
except Exception as e:
print "[ERROR] Other error. %s, [url] %s [verify ssl] %s" % (e, url, verify_ssl)
return False, None, "Timeout: " + e.message
def purge_geofence_all():
"""purge all existing GeoFence Cache Rules"""
if settings.OGC_SERVER['default']['GEOFENCE_SECURITY_ENABLED']:
try:
url = settings.OGC_SERVER['default']['LOCATION']
user = settings.OGC_SERVER['default']['USER']
passwd = settings.OGC_SERVER['default']['PASSWORD']
"""
curl -X GET -u admin:geoserver -H "Content-Type: application/json" \
http://:/geoserver/rest/geofence/rules.json
"""
headers = {'Content-type': 'application/json'}
r = requests.get(url + 'rest/geofence/rules.json',
headers=headers,
auth=HTTPBasicAuth(user, passwd),
timeout=10,
verify=False)
if (r.status_code < 200 or r.status_code > 201):
logger.warning("Could not Retrieve GeoFence Rules")
else:
try:
rules_objs = json.loads(r.text)
rules_count = rules_objs['count']
rules = rules_objs['rules']
if rules_count > 0:
# Delete GeoFence Rules associated to the Layer
# curl -X DELETE -u admin:geoserver http://:/geoserver/rest/geofence/rules/id/{r_id}
for i, rule in enumerate(rules):
r = requests.delete(url + 'rest/geofence/rules/id/' + str(rule['id']),
def get(url, retries=10):
#TODO: backoff
r = requests.get(url)
sleep = .1
for _ in range(retries):
if r.status_code == 200:
return r
sleep *= 2
print "retrying in {0}".format(sleep)
time.sleep(sleep)
r = requests.get(url)
raise Exception("GET failed.\nstatus: {0}\nurl: {1}".format(r.status_code, url))
def check_fixie_url(url):
print "INFO: checking %s to make sure it's valid" % url
r = requests.get(url)
if r.status_code==200:
return url
else:
# what directory are we in?
bad_dir = re.search('.*dir=([aw])$', url).group(1)
if bad_dir == 'a':
good_dir = 'w'
elif bad_dir == 'w':
good_dir = 'a'
return re.sub("dir="+bad_dir, "dir="+good_dir, url)
gtable.lat_max,
gtable.lon_min,
gtable.lon_max,
limitby = (0, 1)
).first()
bounds = "%s,%s|%s,%s" % (NYC.lat_min, NYC.lon_min, NYC.lat_max, NYC.lon_max)
NYC = NYC.id
import requests
params = {"address": location.addr_street,
"key":settings.get_gis_api_google(),
"bounds": bounds,
}
r = requests.get("https://maps.googleapis.com/maps/api/geocode/json", params=params)
if r.status_code == requests.codes.ok:
results = r.json()
if results["status"] == "OK":
results = results["results"][0]
loc = results["geometry"]["location"]
postcode = None
parent = None
for c in results["address_components"]:
types = c["types"]
if "postal_code" in types:
postcode = c["short_name"]
elif "sublocality_level_1" in types:
L3 = c["short_name"]
if L3 == "Bronx":
# Google sometimes returns just 'Bronx'
L3 = "The Bronx"