Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def FreeFederate(fed, fedinfo):
h.helicsFederateFinalize(fed)
state = h.helicsFederateGetState(fed)
assert state == 3 # TODO: should this be 3?
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(fed)
h.helicsFederateInfoSetIntegerProperty(fedinfo, h.helics_property_int_log_level, 1)
vFed = h.helicsCreateValueFederate("TestA Federate", fedinfo)
yield vFed
h.helicsFederateFinalize(vFed)
state = h.helicsFederateGetState(vFed)
assert state == 3
while (h.helicsBrokerIsConnected(broker)):
time.sleep(1)
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(vFed)
h.helicsCloseLibrary()
counter -= 1
time.sleep(1)
# Broker should be connected at this point
assert h.helicsBrokerIsConnected(helicsBroker) == 1, "Broker should still be connected"
h.helicsFederateFinalize(vFed)
state = h.helicsFederateGetState(vFed)
assert state == 3
while (h.helicsBrokerIsConnected(helicsBroker)):
time.sleep(1)
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(vFed)
h.helicsFederateInfoSetIntegerProperty(fedinfo, h.helics_property_int_log_level, 1)
mFed = h.helicsCreateMessageFederate("TestA Federate", fedinfo)
yield mFed
h.helicsFederateFinalize(mFed)
state = h.helicsFederateGetState(mFed)
assert state == 3
while (h.helicsBrokerIsConnected(broker)):
time.sleep(1)
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(mFed)
h.helicsCloseLibrary()
def destroy_federate(fed):
h.helicsFederateFinalize(fed)
# status, state = h.helicsFederateGetState(fed)
# assert state == 3
while (h.helicsBrokerIsConnected(broker)):
time.sleep(1)
h.helicsFederateFree(fed)
h.helicsCloseLibrary()
def destroy_value_federate(fed, broker):
status = h.helicsFederateFinalize(fed)
assert status == 0
status, state = h.helicsFederateGetState(fed)
assert status == 0
assert state == 3
while (h.helicsBrokerIsConnected(broker)):
time.sleep(1)
h.helicsFederateFree(fed)
h.helicsCloseLibrary()
val = value
currenttime = h.helicsFederateRequestTime(vfed, t)
status = h.helicsPublicationPublishDouble(pub, val)
print("PI SENDER: Sending value pi = {} at time {} to PI RECEIVER".format(val, currenttime[-1]))
time.sleep(1)
status = h.helicsFederateFinalize(vfed)
print("PI SENDER: Federate finalized")
while (h.helicsBrokerIsConnected(broker)):
time.sleep(1)
h.helicsFederateFree(vfed)
h.helicsCloseLibrary()
print("PI SENDER: Broker disconnected")
def destroy_federate(fed):
h.helicsFederateFinalize(fed)
h.helicsFederateFree(fed)
h.helicsCloseLibrary()
def destroy_value_federate(fed):
status = h.helicsFederateFinalize(fed)
assert status == 0
status, state = h.helicsFederateGetState(fed)
assert state == 3
assert status == 0
h.helicsFederateFree(fed)
h.helicsCloseLibrary()