Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Set the message interval (timedelta) for federate. Note th#
# HELICS minimum message time interval is 1 ns and by default
# it uses a time delta of 1 second. What is provided to the
# setTimedelta routine is a multiplier for the default timedelta.
# Set one second message interval #
h.helicsFederateInfoSetTimeProperty(fedinfo, h.helics_property_time_delta, deltat)
h.helicsFederateInfoSetIntegerProperty(fedinfo, h.helics_property_int_log_level, 1)
mFed = h.helicsCreateMessageFederate("TestA Federate", fedinfo)
yield mFed
h.helicsFederateFinalize(mFed)
state = h.helicsFederateGetState(mFed)
assert state == 3
while (h.helicsBrokerIsConnected(broker)):
time.sleep(1)
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(mFed)
h.helicsCloseLibrary()
testValue = "String2"
pubid = h.helicsFederateRegisterGlobalPublication (vFed, "pub1", h.helics_data_type_string, "")
subid = h.helicsFederateRegisterSubscription (vFed, "pub1", "")
h.helicsInputSetDefaultString(subid, defaultValue)
h.helicsFederateEnterExecutingMode(vFed)
counter = 60
while counter > 0:
counter -= 1
time.sleep(1)
# Broker should be connected at this point
assert h.helicsBrokerIsConnected(helicsBroker) == 1, "Broker should still be connected"
h.helicsFederateFinalize(vFed)
state = h.helicsFederateGetState(vFed)
assert state == 3
while (h.helicsBrokerIsConnected(helicsBroker)):
time.sleep(1)
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(vFed)
def test_value_federate_runFederateTestDouble(vFed):
defaultValue = 1.0
testValue = 2.0
pubid = h.helicsFederateRegisterGlobalPublication (vFed, "pub1", h.helics_data_type_double, "")
subid = h.helicsFederateRegisterSubscription (vFed, "pub1", "")
h.helicsInputSetDefaultDouble(subid, defaultValue)
h.helicsFederateEnterExecutingMode (vFed)
# publish string1 at time=0.0;
h.helicsPublicationPublishDouble(pubid, testValue)
value = h.helicsInputGetDouble(subid)
assert value == defaultValue
grantedtime = h.helicsFederateRequestTime (vFed, 1.0)
assert grantedtime == 0.01
value = h.helicsInputGetDouble(subid)
assert value == testValue
# publish string1 at time=0.0;
h.helicsPublicationPublishDouble(pubid, testValue + 1)
grantedtime = h.helicsFederateRequestTime (vFed, 2.0)
assert grantedtime == 0.02
value = h.helicsInputGetDouble(subid)
assert value == testValue + 1
def test_message_federate_send(mFed):
epid1 = h.helicsFederateRegisterEndpoint(mFed, "ep1", None)
epid2 = h.helicsFederateRegisterGlobalEndpoint(mFed, "ep2", "random")
h.helicsFederateSetTimeProperty(mFed, h.helics_property_time_delta, 1.0)
h.helicsFederateEnterExecutingMode(mFed)
data = "random-data"
h.helicsEndpointSendEventRaw(epid1, "ep2", data, 1.0)
granted_time = h.helicsFederateRequestTime(mFed, 2.0)
assert granted_time == 1.0
res = h.helicsFederateHasMessage(mFed)
assert res == 1
res = h.helicsEndpointHasMessage(epid1)
assert res == 0
res = h.helicsEndpointHasMessage(epid2)
assert res == 1
message = h.helicsEndpointGetMessage(epid2)
assert message.data == 'random-data'
assert message.length == 11
assert message.original_dest == ''
val2 = h.helicsInputGetNamedPoint(subid)
# make sure the string is what we expect
# assert value2 == testValue1
assert val2 == [testValue1, testVal1]
# publish a second string
h.helicsPublicationPublishNamedPoint(pubid, testValue2, testVal2)
# make sure the value is still what we expect
val3 = h.helicsInputGetNamedPoint(subid)
# make sure the string is what we expect
# assert value3 == testValue1
assert val3 == [testValue1, testVal1]
# advance time
grantedtime = h.helicsFederateRequestTime(vFed, 2.0)
assert grantedtime == 0.02
# make sure the value was updated
val4 = h.helicsInputGetNamedPoint(subid)
# make sure the string is what we expect
# assert value4 == testValue2
assert val4 == [testValue2, testVal2]
def test_value_federate_single_transfer(vFed):
pubid = h.helicsFederateRegisterGlobalPublication (vFed, "pub1", h.helics_data_type_string, "");
subid = h.helicsFederateRegisterSubscription (vFed, "pub1", "");
h.helicsFederateEnterExecutingMode(vFed)
h.helicsPublicationPublishString(pubid, "string1")
grantedtime = h.helicsFederateRequestTime(vFed, 1.0)
assert grantedtime == 0.01
s = h.helicsInputGetString(subid)
assert s == "string1"
rTestValue = 2.0
iTestValue = 2.0
pubid = h.helicsFederateRegisterGlobalPublication (vFed, "pub1", h.helics_data_type_complex, "")
subid = h.helicsFederateRegisterSubscription (vFed, "pub1", "")
h.helicsInputSetDefaultComplex(subid, rDefaultValue, iDefaultValue)
h.helicsFederateEnterExecutingMode (vFed)
# publish string1 at time=0.0;
h.helicsPublicationPublishComplex(pubid, rTestValue, iTestValue)
value1, value2 = h.helicsInputGetComplex(subid)
assert value1 == rDefaultValue
assert value2 == iDefaultValue
grantedtime = h.helicsFederateRequestTime (vFed, 1.0)
assert grantedtime == 0.01
value1, value2 = h.helicsInputGetComplex(subid)
assert value1 == rTestValue
assert value2 == iTestValue
def FreeFederate(fed, fedinfo):
h.helicsFederateFinalize(fed)
state = h.helicsFederateGetState(fed)
assert state == 3 # TODO: should this be 3?
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(fed)
h.helicsFederateInfoSetIntegerProperty(fedinfo, h.helics_property_int_log_level, 1)
vFed = h.helicsCreateValueFederate("TestA Federate", fedinfo)
yield vFed
h.helicsFederateFinalize(vFed)
state = h.helicsFederateGetState(vFed)
assert state == 3
while (h.helicsBrokerIsConnected(broker)):
time.sleep(1)
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(vFed)
h.helicsCloseLibrary()
counter -= 1
time.sleep(1)
# Broker should be connected at this point
assert h.helicsBrokerIsConnected(helicsBroker) == 1, "Broker should still be connected"
h.helicsFederateFinalize(vFed)
state = h.helicsFederateGetState(vFed)
assert state == 3
while (h.helicsBrokerIsConnected(helicsBroker)):
time.sleep(1)
h.helicsFederateInfoFree(fedinfo)
h.helicsFederateFree(vFed)