Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Set core type from string #
status = h.helicsFederateInfoSetCoreTypeFromString(fedinfo, "zmq")
# Federate init string #
status = h.helicsFederateInfoSetCoreInitString(fedinfo, fedinitstring)
# Set the message interval (timedelta) for federate. Note th#
# HELICS minimum message time interval is 1 ns and by default
# it uses a time delta of 1 second. What is provided to the
# setTimedelta routine is a multiplier for the default timedelta.
# Set one second message interval #
status = h.helicsFederateInfoSetTimeDelta(fedinfo, deltat)
status = h.helicsFederateInfoSetLoggingLevel(fedinfo, 1)
# Create value federate #
vfed = h.helicsCreateValueFederate(fedinfo)
print("PI SENDER: Value federate created")
# Register the publication #
pub = h.helicsFederateRegisterGlobalPublication(vfed, "testA", "double", "")
print("PI SENDER: Publication registered")
# Enter execution mode #
status = h.helicsFederateEnterExecutionMode(vfed)
print("PI SENDER: Entering execution mode")
# This federate will be publishing deltat*pi for numsteps steps #
this_time = 0.0
value = pi
fedinfo = h.helicsFederateInfoCreate()
status = h.helicsFederateInfoSetFederateName(fedinfo, "TestB Federate")
assert status == 0
status = h.helicsFederateInfoSetCoreTypeFromString(fedinfo, "zmq")
assert status == 0
status = h.helicsFederateInfoSetCoreInitString(fedinfo, fedinitstring)
assert status == 0
status = h.helicsFederateInfoSetTimeDelta(fedinfo, deltat)
assert status == 0
status = h.helicsFederateInfoSetLoggingLevel(fedinfo, 1)
assert status == 0
fed = h.helicsCreateCombinationFederate(fedinfo)
return fed
fedinfo = h.helicsFederateInfoCreate()
status = h.helicsFederateInfoSetFederateName(fedinfo, "TestA Federate")
assert status == 0
status = h.helicsFederateInfoSetCoreTypeFromString(fedinfo, "zmq")
assert status == 0
status = h.helicsFederateInfoSetCoreInitString(fedinfo, fedinitstring)
assert status == 0
status = h.helicsFederateInfoSetTimeDelta(fedinfo, deltat)
assert status == 0
status = h.helicsFederateInfoSetLoggingLevel(fedinfo, 1)
assert status == 0
fed = h.helicsCreateCombinationFederate(fedinfo)
return fed
# Federate init string
print("PI RECEIVER: Setting Federate Info Init String")
status = h.helicsFederateInfoSetCoreInitString(fedinfo, fedinitstring)
# Set the message interval (timedelta) for federate. Note that
# HELICS minimum message time interval is 1 ns and by default
# it uses a time delta of 1 second. What is provided to the
# setTimedelta routine is a multiplier for the default timedelta.
# Set one second message interval
print("PI RECEIVER: Setting Federate Info Time Delta")
status = h.helicsFederateInfoSetTimeDelta(fedinfo, deltat)
print("PI RECEIVER: Setting Federate Info Logging")
status = h.helicsFederateInfoSetLoggingLevel(fedinfo, 1)
# Create value federate
print("PI RECEIVER: Creating Value Federate")
vfed = h.helicsCreateValueFederate(fedinfo)
print("PI RECEIVER: Value federate created")
# Subscribe to PI SENDER's publication
sub = h.helicsFederateRegisterSubscription(vfed, "testA", "double", "")
print("PI RECEIVER: Subscription registered")
status = h.helicsFederateEnterExecutionMode(vfed)
print("PI RECEIVER: Entering execution mode")
value = 0.0
prevtime = 0
fedinfo = h.helicsFederateInfoCreate()
status = h.helicsFederateInfoSetFederateName(fedinfo, "Combination Federate")
assert status == 0
status = h.helicsFederateInfoSetCoreTypeFromString(fedinfo, "zmq")
assert status == 0
status = h.helicsFederateInfoSetCoreInitString(fedinfo, fedinitstring)
assert status == 0
status = h.helicsFederateInfoSetTimeDelta(fedinfo, deltat)
assert status == 0
status = h.helicsFederateInfoSetLoggingLevel(fedinfo, 1)
assert status == 0
fed = h.helicsCreateCombinationFederate(fedinfo)
return fed
fedinfo = h.helicsFederateInfoCreate()
status = h.helicsFederateInfoSetFederateName(fedinfo, "Combination Federate")
assert status == 0
status = h.helicsFederateInfoSetCoreTypeFromString(fedinfo, "zmq")
assert status == 0
status = h.helicsFederateInfoSetCoreInitString(fedinfo, fedinitstring)
assert status == 0
status = h.helicsFederateInfoSetTimeDelta(fedinfo, deltat)
assert status == 0
status = h.helicsFederateInfoSetLoggingLevel(fedinfo, 1)
assert status == 0
fed = h.helicsCreateCombinationFederate(fedinfo)
return fed
fedinfo = h.helicsFederateInfoCreate()
status = h.helicsFederateInfoSetFederateName(fedinfo, "Combination Federate")
assert status == 0
status = h.helicsFederateInfoSetCoreTypeFromString(fedinfo, "zmq")
assert status == 0
status = h.helicsFederateInfoSetCoreInitString(fedinfo, fedinitstring)
assert status == 0
status = h.helicsFederateInfoSetTimeDelta(fedinfo, deltat)
assert status == 0
status = h.helicsFederateInfoSetLoggingLevel(fedinfo, 1)
assert status == 0
fed = h.helicsCreateCombinationFederate(fedinfo)
return fed