Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def calculate_omp_num_threads(values_dict: dict) -> int:
"""
Calculates correct value of OMP_NUM_THREADS according to CPU resources requested in template's values.yaml.
:param values_dict: Dictionary containing template's values,yaml file
:return: Calculated OMP_NUM_THREADS value
:raises ValueError, TypeError, KeyError
"""
if values_dict.get("cpu") and values_dict.get("cpu") != "null":
cpu_limit = values_dict.get("cpu")
elif values_dict.get("resources"):
cpu_limit = dutil.get(values_dict, "resources.limits.cpu", separator='.')
elif values_dict.get("worker_resources"):
cpu_limit = dutil.get(values_dict, "worker_resources.limits.cpu", separator='.')
else:
raise ValueError('Unable to find requested CPUs count.')
# We need to handle cases when CPU is provided either as absolute value, or in millicpu format.
# Convert_k8s_cpu_resource returns cpu request in millicpus, so we divide it by 1000 to get absolute
# value of cpus, and we make sure that there will be at least one thread.
return int(max(convert_k8s_cpu_resource(cpu_limit) // 1000, 1))
def test_types_get_glob_multiple():
ehash = TestMapping({
"a": TestMapping({
"b": TestMapping({
"c": TestMapping({
"d": 0
}),
"e": TestMapping({
"d": 0
})
})
})
})
assert_raises(ValueError, dpath.util.get, ehash, '/a/b/*/d')
assert_raises(ValueError, dpath.util.get, ehash, ['a', 'b', '*', 'd'])
def nova_ceph(env):
data = env.get_settings_data()
if dpath.util.get(data, '*/storage/**/ephemeral_ceph/value'):
pytest.skip("Nova Ceph RBD should be disabled")
def test_get_explicit_single():
ehash = {
"a": {
"b": {
"c": {
"d": 0,
"e": 1,
"f": 2
}
}
}
}
assert(dpath.util.get(ehash, '/a/b/c/f') == 2)
assert(dpath.util.get(ehash, ['a', 'b', 'c', 'f']) == 2)
for c,w in elem.items():
lido(data,temp,c,w)
target[k].append(temp)
elif isinstance(v,str):
lido(data,target,k,v)
#generate @id
if "genre" in target:
target["genre"]["@type"]="Text"
_id=baseuri+str(target["identifier"].rsplit('-')[-1])
target["@id"]=_id
#bnodes 1:n
target['mentions']=[]
try:
for i in get(data,"lido:descriptiveMetadata/lido:objectRelationWrap/lido:subjectWrap/lido:subjectSet/lido:subject/lido:subjectConcept"):
tag={}
tag['sameAs']=get(i,"lido:conceptID/_")
tag['name']=get(i,"lido:term")
target['mentions'].append(tag)
except:
pass
target=checkids(target)
lock.acquire()
sys.stdout.write(json.dumps(target)+"\n"),
sys.stdout.flush()
lock.release()
def lido(record,target,attribut,path):
try:
if attribut not in target:
if "@id" not in target:
target[attribut]=get(record,path)
else:
target[attribut]="hcn-"+str(get(record,path))
except:
pass
def get(self, path):
key = ''.join(path)
if key not in self._get_cached:
self._get_cached[key] = dpath.util.get(self.data, list(path))
return self._get_cached[key]
# if not self.log:
# utils.restart_line()
print "Try = [%d]%s" % (i, age_string) # add a comma at the end to remove newline
try:
param_dic = {'entity': entity_name}
r = requests.get(self.base_url() + '/config', params=param_dic)
if self.log:
print "LOG: Get config: /config with params " + json.dumps(param_dic) + ", response = ", r
print " LOG: response text = ", r.text
print " LOG: url: ", r.url
if r.json()['value'] is not None:
age = dpath.util.get(r.json(), 'value.age', '.')
parameter = dpath.util.get(r.json(), 'value.' + param_path, '.')
if parameter == value:
if self.log:
print "LOG: ... parameter: " + entity_name + "." + param_path + ", has achieved value: " + \
str(value) + "."
break
except requests.exceptions.ConnectionError:
print "Oops, ConnectionError exception"
except requests.exceptions.RequestException:
print "Oops, request exception"
time.sleep(wait_period) # sleep for n seconds
print " -> success, parameter reached value" + age_string