Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
model_description = read_model_description(options.fmu_filename)
unzipdir = extract(options.fmu_filename)
fmu = FMU2Slave(guid=model_description.guid,
unzipDirectory=unzipdir,
modelIdentifier=model_description.coSimulation.modelIdentifier,
instanceName='instance1')
# initialize
fmu.instantiate()
fmu.setupExperiment(tolerance=1E-4, startTime=0.0, stopTime=options.stop_time)
fmu.enterInitializationMode()
fmu.exitInitializationMode()
start = datetime.datetime.now()
t = 0.0
sum = 0.0
def test_extracted_fmu(self):
""" Simulate an extracted FMU """
download_test_file('2.0', 'CoSimulation', 'MapleSim', '2017', 'CoupledClutches', 'CoupledClutches.fmu')
# extract the FMU
tempdir = extract('CoupledClutches.fmu')
# load the model description before the simulation
model_description = read_model_description(tempdir)
result = simulate_fmu(tempdir, model_description=model_description)
self.assertIsNotNone(result)
# clean up
shutil.rmtree(tempdir)
def main():
option = options[1]
model_description = read_model_description(option.fmu_filename)
unzipdir = extract(option.fmu_filename)
fmu = FMU2Slave(guid=model_description.guid,
unzipDirectory=unzipdir,
modelIdentifier=model_description.coSimulation.modelIdentifier,
instanceName='instance1')
# initialize
fmu.instantiate()
fmu.setupExperiment(tolerance=1E-4, startTime=0.0, stopTime=option.stop_time)
fmu.enterInitializationMode()
fmu.exitInitializationMode()
start = datetime.datetime.now()
i = 0
t = 0.0
platforms = supported_platforms(filename)
if 'win32' not in platforms:
raise Exception("The FMU does not support the platform \"win32\".")
if 'win64' in platforms:
raise Exception("The FMU already supports \"win64\".")
model_description = read_model_description(filename)
current_dir = os.path.dirname(__file__)
client = os.path.join(current_dir, 'remoting', 'client.dll')
server = os.path.join(current_dir, 'remoting', 'server.exe')
license = os.path.join(current_dir, 'remoting', 'license.txt')
tempdir = extract(filename)
if model_description.coSimulation is not None:
model_identifier = model_description.coSimulation.modelIdentifier
else:
model_identifier = model_description.modelExchange.modelIdentifier
# copy the binaries & license
os.mkdir(os.path.join(tempdir, 'binaries', 'win64'))
copyfile(client, os.path.join(tempdir, 'binaries', 'win64', model_identifier + '.dll'))
copyfile(server, os.path.join(tempdir, 'binaries', 'win64', 'server.exe'))
licenses_dir = os.path.join(tempdir, 'documentation', 'licenses')
if not os.path.isdir(licenses_dir):
os.mkdir(licenses_dir)
copyfile(license, os.path.join(tempdir, 'documentation', 'licenses', 'fmpy-remoting-binaries.txt'))
# create a new archive from the existing files + remoting binaries
if sync:
dask.config.set(scheduler='synchronous') # synchronized scheduler
# download the FMU
download_test_file('2.0', 'CoSimulation', 'Dymola', '2017', 'Rectifier', fmu_filename)
# read the model description
model_description = read_model_description(fmu_filename)
# collect the value references for the variables to read / write
vrs = {}
for variable in model_description.modelVariables:
vrs[variable.name] = variable.valueReference
# extract the FMU
unzipdir = fmpy.extract(fmu_filename)
fmu_args = {'guid': model_description.guid,
'modelIdentifier': model_description.coSimulation.modelIdentifier,
'unzipDirectory': unzipdir}
# get the value references for the start and output values
start_vrs = [vrs['VAC'], vrs['IDC']]
result_vrs = [vrs['uDC'], vrs['Losses']]
indices = list(np.ndindex(I_DC.shape))
print("Running %d simulations (%d chunks)..." % (V_AC.size, n_chunks))
with ProgressBar():
# calculate the losses for every chunk
b = bag.from_sequence(indices, npartitions=n_chunks)
results = b.map_partitions(simulate_fmu, fmu_args, start_vrs, result_vrs).compute()
relative_tolerance = experiment.tolerance
if step_size is None:
total_time = stop_time - start_time
step_size = 10 ** (np.round(np.log10(total_time)) - 3)
if output_interval is None and fmi_type == 'CoSimulation' and experiment is not None and experiment.stepSize is not None:
output_interval = experiment.stepSize
while (stop_time - start_time) / output_interval > 1000:
output_interval *= 2
if os.path.isfile(os.path.join(filename, 'modelDescription.xml')):
unzipdir = filename
tempdir = None
else:
tempdir = extract(filename)
unzipdir = tempdir
if use_remoting:
# start 32-bit server
from subprocess import Popen
server_path = os.path.dirname(__file__)
server_path = os.path.join(server_path, 'remoting', 'server.exe')
if fmi_type == 'ModelExchange':
model_identifier = model_description.modelExchange.modelIdentifier
else:
model_identifier = model_description.coSimulation.modelIdentifier
dll_path = os.path.join(unzipdir, 'binaries', 'win32', model_identifier + '.dll')
server = Popen([server_path, dll_path])
else:
server = None
Parameters:
filename: filename of the source code FMU
output_filename: filename of the FMU with the compiled binary (None: overwrite existing FMU)
"""
from . import read_model_description, extract, platform, platform_tuple
import zipfile
from shutil import copyfile, rmtree
unzipdir = extract(filename)
model_description = read_model_description(filename)
binary = compile_dll(model_description=model_description, sources_dir=os.path.join(unzipdir, 'sources'))
unzipdir2 = extract(filename)
platform_dir = os.path.join(unzipdir2, 'binaries', platform if model_description.fmiVersion in ['1.0', '2.0'] else platform_tuple)
if not os.path.exists(platform_dir):
os.makedirs(platform_dir)
copyfile(src=binary, dst=os.path.join(platform_dir, os.path.basename(binary)))
if output_filename is None:
output_filename = filename # overwrite the existing archive
# create a new archive from the existing files + compiled binary
with zipfile.ZipFile(output_filename, 'w', zipfile.ZIP_DEFLATED) as zf:
base_path = os.path.normpath(unzipdir2)
for dirpath, dirnames, filenames in os.walk(unzipdir2):
for name in sorted(dirnames):
def compile_platform_binary(filename, output_filename=None):
""" Compile the binary of an FMU for the current platform and add it to the FMU
Parameters:
filename: filename of the source code FMU
output_filename: filename of the FMU with the compiled binary (None: overwrite existing FMU)
"""
from . import read_model_description, extract, platform, platform_tuple
import zipfile
from shutil import copyfile, rmtree
unzipdir = extract(filename)
model_description = read_model_description(filename)
binary = compile_dll(model_description=model_description, sources_dir=os.path.join(unzipdir, 'sources'))
unzipdir2 = extract(filename)
platform_dir = os.path.join(unzipdir2, 'binaries', platform if model_description.fmiVersion in ['1.0', '2.0'] else platform_tuple)
if not os.path.exists(platform_dir):
os.makedirs(platform_dir)
copyfile(src=binary, dst=os.path.join(platform_dir, os.path.basename(binary)))
if output_filename is None:
output_filename = filename # overwrite the existing archive
def instantiate_fmu(component, ssp_unzipdir, start_time, parameter_set=None):
""" Instantiate an FMU """
fmu_filename = os.path.join(ssp_unzipdir, component.source)
component.unzipdir = extract(fmu_filename)
# read the model description
model_description = read_model_description(fmu_filename, validate=False)
if model_description.coSimulation is None:
raise Exception("%s does not support co-simulation." % component.source)
# collect the value references
component.variables = {}
for variable in model_description.modelVariables:
# component.vrs[variable.name] = variable.valueReference
component.variables[variable.name] = variable
fmu_kwargs = {'guid': model_description.guid,
'unzipDirectory': component.unzipdir,
'modelIdentifier': model_description.coSimulation.modelIdentifier,
def main():
option = options[1]
model_description = read_model_description(option.fmu_filename)
unzipdir = extract(option.fmu_filename)
fmu = FMU2Slave(guid=model_description.guid,
unzipDirectory=unzipdir,
modelIdentifier=model_description.coSimulation.modelIdentifier,
instanceName='instance1')
# initialize
fmu.instantiate()
fmu.setupExperiment(tolerance=1E-4, startTime=0.0, stopTime=option.stop_time)
fmu.enterInitializationMode()
fmu.exitInitializationMode()
start = datetime.datetime.now()
i = 0
t = 0.0