Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param structures: list with ids names or string with ids name of which to retrieve the info
if None, then all structures are returned
:return: ods
'''
if not structures:
structures = sorted(list(dict_structures(imas_version).keys()))
elif isinstance(structures, str):
structures = [structures]
# caching
if imas_version not in _info_structures:
from omas import ODS
_info_structures[imas_version] = ODS(imas_version=imas_version, consistency_check=False)
ods = _info_structures[imas_version]
for structure in structures:
if structure not in ods:
tmp = load_structure(structure, imas_version)[0]
lst = sorted(tmp.keys())
for k, item in enumerate(lst):
if re.match('.*_error_(index|lower|upper)$', item.split('.')[-1]):
continue
parent = False
for item1 in lst[k + 1:]:
if l2u(item1.split('.')[:-1]).rstrip('[:]') == item:
parent = True
break
if parent:
continue
def summary_taue(ods, update=True):
"""
Calculates Energy confinement time estimated from the IPB98(y,2) scaling for each time slice and stores them in the summary ods
:param ods: input ods
:param update: operate in place
:return: updated ods
"""
ods_n = ods
if not update:
from omas import ODS
ods_n = ODS().copy_attrs_from(ods)
# update ODS with stored energy from equilibrium
ods.physics_equilibrium_stored_energy()
tau_e_scaling = []
tau_e_MHD = []
for time_index in ods['equilibrium']['time_slice']:
equilibrium_ods = ods['equilibrium']['time_slice'][time_index]
a = (equilibrium_ods['profiles_1d']['r_outboard'][-1] - equilibrium_ods['profiles_1d']['r_inboard'][-1]) / 2
r_major = (equilibrium_ods['profiles_1d']['r_outboard'][-1] + equilibrium_ods['profiles_1d']['r_inboard'][-1]) / 2
bt = ods['equilibrium']['vacuum_toroidal_field']['b0'][time_index] * ods['equilibrium']['vacuum_toroidal_field']['r0'] / r_major
ip = equilibrium_ods['global_quantities']['ip']
aspect = r_major / a
psi = ods['equilibrium']['time_slice'][time_index]['profiles_1d']['psi']
rho_tor_norm_equi = equilibrium_ods['profiles_1d']['rho_tor_norm']
rho_tor_norm_core = ods['core_profiles']['profiles_1d'][time_index]['grid']['rho_tor_norm']
:return: ojbect
"""
from omas import ODS
object_pairs = list(map(lambda o: (convert_int(o[0]), o[1]), object_pairs))
dct = cls()
# for ODSs we can use the setraw() method which does
# not peform any sort of check, nor tries to parse
# special OMAS syntaxes and is thus much faster
if isinstance(dct, ODS):
for x, y in object_pairs:
if null_to is not None and y is None:
y = null_to
if isinstance(y, list):
if len(y) and isinstance(y[0], ODS):
dct.setraw(x, cls())
for k in range(len(y)):
dct[x].setraw(k, y[k])
else:
if null_to is not None:
for k in range(len(y)):
if y[k] is None:
y[k] = null_to
y = numpy.array(y) # to handle objects_encode=None as used in OMAS
dct.setraw(x, y)
else:
dct.setraw(x, y)
else:
for x, y in object_pairs:
if null_to is not None and y is None:
y = null_to
def core_profiles_densities(ods, update=True):
'''
Density, density_thermal, and density_fast for electrons and ions are filled and are self-consistent
:param ods: input ods
:param update: operate in place
:return: updated ods
'''
ods_n = ods
if not update:
from omas import ODS
ods_n = ODS().copy_attrs_from(ods)
def consistent_density(loc):
if 'density' in loc:
# if there is no thermal nor fast, assume it is thermal
if 'density_thermal' not in loc and 'density_fast' not in loc:
loc['density_thermal'] = loc['density']
# if there is no thermal calculate it
elif 'density_thermal' not in loc and 'density_fast' in loc:
loc['density_thermal'] = loc['density'] - loc['density_fast']
# if there is no fast calculate it
elif 'density_thermal' in loc and 'density_fast' not in loc:
loc['density_fast'] = loc['density'] - loc['density_thermal']
def summary_total_powers(ods, update=True):
"""
Integrate power densities to the total and heating and current drive systems and fills summary.globalquantities
:param ods: input ods
:param update: operate in place
:return: updated ods
"""
ods_n = ods
if not update:
from omas import ODS
ods_n = ODS().copy_attrs_from(ods)
import copy
sources = ods_n['core_sources']['source']
index_dict = {2: 'nbi', 3: 'ec', 4: 'lh', 5: 'ic'}
power_dict = {'total': [], 'nbi': [], 'ec': [], 'lh': [], 'ic': []}
q_init = numpy.zeros(len(sources[0]['profiles_1d'][0]['grid']['rho_tor_norm']))
q_dict = {'total': copy.deepcopy(q_init), 'nbi': copy.deepcopy(q_init),
'ec': copy.deepcopy(q_init), 'lh': copy.deepcopy(q_init),
'ic': copy.deepcopy(q_init)}
ignore_indices = list(range(100, 108)) + list(range(900,910))
for time_index in sources[0]['profiles_1d']:
vol = sources[0]['profiles_1d'][time_index]['grid']['volume']
for source in sources:
if sources[source]['identifier.index'] in ignore_indices:
NOTE: the fast particles ion pressures are read, not set by this function:
`core_profiles.profiles_1d.:.ion.:.pressure_fast_parallel` #Pressure (thermal) associated with random motion ~average((v-average(v))^2)
`core_profiles.profiles_1d.:.ion.:.pressure_fast_perpendicular` #Pressure (thermal+non-thermal)
:param ods: input ods
:param update: operate in place
:return: updated ods
'''
ods_p = ods
if not update:
from omas import ODS
ods_p = ODS().copy_attrs_from(ods)
for time_index in ods['core_profiles']['profiles_1d']:
prof1d = ods['core_profiles']['profiles_1d'][time_index]
prof1d_p = ods_p['core_profiles']['profiles_1d'][time_index]
if not update:
prof1d_p['grid']['rho_tor_norm'] = prof1d['grid']['rho_tor_norm']
__zeros__ = 0. * prof1d['grid']['rho_tor_norm']
prof1d_p['pressure_thermal'] = copy.deepcopy(__zeros__)
prof1d_p['pressure_ion_total'] = copy.deepcopy(__zeros__)
prof1d_p['pressure_perpendicular'] = copy.deepcopy(__zeros__)
prof1d_p['pressure_parallel'] = copy.deepcopy(__zeros__)
# electrons
:param ignore_type: ignore object type differences
:param ignore_empty: ignore emptry nodes
:return: string with reason for difference, or False otherwise
"""
from omas import ODS, CodeParameters
ods1 = ods1.flat(return_empty_leaves=True, traverse_code_parameters=True)
ods2 = ods2.flat(return_empty_leaves=True, traverse_code_parameters=True)
k1 = set(ods1.keys())
k2 = set(ods2.keys())
differences = []
for k in k1.difference(k2):
if not k.startswith('info.') and not (ignore_empty and isinstance(ods1[k], ODS) and not len(ods1[k])):
differences.append('DIFF: key `%s` missing in 2nd ods' % (prepend_path_string + k))
for k in k2.difference(k1):
if not k.startswith('info.') and not (ignore_empty and isinstance(ods2[k], ODS) and not len(ods2[k])):
differences.append('DIFF: key `%s` missing in 1st ods' % (prepend_path_string + k))
for k in k1.intersection(k2):
try:
if ods1[k] is None and ods2[k] is None:
pass
elif isinstance(ods1[k], str) and isinstance(ods2[k], str):
if ods1[k] != ods2[k]:
differences.append('DIFF: `%s` differ in value' % (prepend_path_string + k))
elif not ignore_type and type(ods1[k]) != type(ods2[k]):
differences.append('DIFF: `%s` differ in type (%s,%s)' % ((prepend_path_string + k), type(ods1[k]), type(ods2[k])))
elif numpy.atleast_1d(is_uncertain(ods1[k])).any() or numpy.atleast_1d(is_uncertain(ods2[k])).any():
if not numpy.allclose(nominal_values(ods1[k]), nominal_values(ods2[k]), equal_nan=True) or not numpy.allclose(std_devs(ods1[k]), std_devs(ods2[k]), equal_nan=True):
differences.append('DIFF: `%s` differ in value' % (prepend_path_string + k))
ods['gyrokinetics'].load(sample_filename, consistency_check='warn_drop')
# show content
pprint(ods.pretty_paths())
# save a copy
try:
__file__
except NameError:
import inspect
__file__ = inspect.getfile(lambda: None)
filename = omas_testdir(__file__) + '/gkdb_linear_initialvalue.json'
ods['gyrokinetics'].save(filename)
# load the newly saved copy
ods1 = ODS()
ods1['gyrokinetics'].load(filename)
# look for differences between original GKDB json and OMAS json
differences = ods.diff(ods1, ignore_type=True)
if not differences:
print('\nPrint no differences found: save/load of GKDB json file worked\n')
else:
pprint(differences)
raise RuntimeError('Save/Load of GKDB on json file failed')
# raise error if trying to run GKDB under Python2x
try:
import gkdb.core.model
except ImportError as _excp:
print('Could not import gkdb library: %s' % repr(_excp))
else: