Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_value_federate_runFederateTestDouble(vFed):
defaultValue = 1.0
testValue = 2.0
pubid = h.helicsFederateRegisterGlobalPublication (vFed, "pub1", h.helics_data_type_double, "")
subid = h.helicsFederateRegisterSubscription (vFed, "pub1", "")
h.helicsInputSetDefaultDouble(subid, defaultValue)
h.helicsFederateEnterExecutingMode (vFed)
# publish string1 at time=0.0;
h.helicsPublicationPublishDouble(pubid, testValue)
value = h.helicsInputGetDouble(subid)
assert value == defaultValue
grantedtime = h.helicsFederateRequestTime (vFed, 1.0)
assert grantedtime == 0.01
value = h.helicsInputGetDouble(subid)
assert value == testValue
# publish string1 at time=0.0;
h.helicsPublicationPublishDouble(pubid, testValue + 1)
grantedtime = h.helicsFederateRequestTime (vFed, 2.0)
assert grantedtime == 0.02
value = h.helicsInputGetDouble(subid)
assert value == testValue + 1
def test_message_federate_send(mFed):
epid1 = h.helicsFederateRegisterEndpoint(mFed, "ep1", None)
epid2 = h.helicsFederateRegisterGlobalEndpoint(mFed, "ep2", "random")
h.helicsFederateSetTimeProperty(mFed, h.helics_property_time_delta, 1.0)
h.helicsFederateEnterExecutingMode(mFed)
data = "random-data"
h.helicsEndpointSendEventRaw(epid1, "ep2", data, 1.0)
granted_time = h.helicsFederateRequestTime(mFed, 2.0)
assert granted_time == 1.0
res = h.helicsFederateHasMessage(mFed)
assert res == 1
res = h.helicsEndpointHasMessage(epid1)
assert res == 0
res = h.helicsEndpointHasMessage(epid2)
assert res == 1
message = h.helicsEndpointGetMessage(epid2)
assert message.data == 'random-data'
assert message.length == 11
assert message.original_dest == ''
val2 = h.helicsInputGetNamedPoint(subid)
# make sure the string is what we expect
# assert value2 == testValue1
assert val2 == [testValue1, testVal1]
# publish a second string
h.helicsPublicationPublishNamedPoint(pubid, testValue2, testVal2)
# make sure the value is still what we expect
val3 = h.helicsInputGetNamedPoint(subid)
# make sure the string is what we expect
# assert value3 == testValue1
assert val3 == [testValue1, testVal1]
# advance time
grantedtime = h.helicsFederateRequestTime(vFed, 2.0)
assert grantedtime == 0.02
# make sure the value was updated
val4 = h.helicsInputGetNamedPoint(subid)
# make sure the string is what we expect
# assert value4 == testValue2
assert val4 == [testValue2, testVal2]
def test_value_federate_single_transfer(vFed):
pubid = h.helicsFederateRegisterGlobalPublication (vFed, "pub1", h.helics_data_type_string, "");
subid = h.helicsFederateRegisterSubscription (vFed, "pub1", "");
h.helicsFederateEnterExecutingMode(vFed)
h.helicsPublicationPublishString(pubid, "string1")
grantedtime = h.helicsFederateRequestTime(vFed, 1.0)
assert grantedtime == 0.01
s = h.helicsInputGetString(subid)
assert s == "string1"
rTestValue = 2.0
iTestValue = 2.0
pubid = h.helicsFederateRegisterGlobalPublication (vFed, "pub1", h.helics_data_type_complex, "")
subid = h.helicsFederateRegisterSubscription (vFed, "pub1", "")
h.helicsInputSetDefaultComplex(subid, rDefaultValue, iDefaultValue)
h.helicsFederateEnterExecutingMode (vFed)
# publish string1 at time=0.0;
h.helicsPublicationPublishComplex(pubid, rTestValue, iTestValue)
value1, value2 = h.helicsInputGetComplex(subid)
assert value1 == rDefaultValue
assert value2 == iDefaultValue
grantedtime = h.helicsFederateRequestTime (vFed, 1.0)
assert grantedtime == 0.01
value1, value2 = h.helicsInputGetComplex(subid)
assert value1 == rTestValue
assert value2 == iTestValue
###################### Entering Execution Mode ##########################################################
h.helicsFederateEnterExecutingMode(fed)
hours = 24
total_inteval = int(60 * 60 * hours)
grantedtime = -1
update_interval = 5*60
feeder_limit_upper = 4 * (1000*1000)
feeder_limit_lower = 2.7 * (1000*1000)
k = 0
data ={};
time_sim = []; feeder_real_power = []; feeder_imag_power = []
for t in range(0, total_inteval, update_interval):
while grantedtime < t:
grantedtime = h.helicsFederateRequestTime (fed, t)
time.sleep(0.1)
time_sim.append(t/3600)
############################# Subscribing to Feeder Load from to GridLAB-D ##############################################
key =[]; Real_demand = []; Imag_demand = [];
for i in range(0,subkeys_count):
sub = subid["m{}".format(i)]
rload, iload = h.helicsInputGetComplex(sub)
sub_key = h.helicsSubscriptionGetKey(sub)
print(sub_key)
if "totalLoad" in str(sub_key):
key_feeder_load = sub_key
distribution_fed_name = str(key_feeder_load.split('/totalLoad')[0])
Real_feeder_load = rload
Imag_feeder_load = iload
feeder_real_power.append(rload/1000)
print("Entering execution mode")
h.helicsFederateEnterExecutionMode(fed)
if delay is not None:
h.helicsFilterSet(fid, "delay", 2.0)
grantedtime = -1
while True:
try:
stop_at_time, value_to_send = get_input(grantedtime)
except KeyboardInterrupt:
print("")
break
while grantedtime < stop_at_time:
print(">>>>>>>> Requesting time = {}".format(stop_at_time))
status, grantedtime = h.helicsFederateRequestTime(fed, stop_at_time)
assert status == 0
if grantedtime != stop_at_time:
status, value = h.helicsSubscriptionGetString(subid)
assert status == 0
print("Interrupt value '{}' from Federate 2".format(value))
print("<<<<<<<< Granted Time = {}".format(grantedtime))
assert grantedtime == stop_at_time, "stop_at_time = {}, grantedtime = {}".format(stop_at_time, grantedtime)
if value_to_send is not None and value_to_send != '':
print("Sending '{}' to Federate 2".format(value_to_send))
status = h.helicsPublicationPublishString(pubid, str(value_to_send))
assert status == 0
status = h.helicsEndpointSendMessageRaw(epid, "endpoint2", str(value_to_send))
assert status == 0
status, value = h.helicsSubscriptionGetString(subid)
assert status == 0
print("Received value '{}' from Federate 2".format(value))
x=x+1
########################## Creating headers and Printing results to CSVs #####################################
head = str('Time(in Hours)')
for i in range(voltages.shape[1]):
head = head+','+('Bus'+str(i+1))
numpy.savetxt("Transmission_Voltages.csv", numpy.column_stack((pf_time, voltages)), delimiter=",", fmt='%s',header = head, comments='')
numpy.savetxt("Transmission_MW_demand.csv", numpy.column_stack((pf_time, real_demand)), delimiter=",", fmt='%s',header = head, comments='')
numpy.savetxt("Transmission_LMP.csv", numpy.column_stack((opf_time, LMP_solved)), delimiter=",", fmt='%s',header = head, comments='')
############################## Terminating Federate ########################################################
t = 60 * 60 * 24
while grantedtime < t:
grantedtime = h.helicsFederateRequestTime (fed, t)
logger.info("Destroying federate")
destroy_federate(fed)
logger.info("Done!")
voltage_cosim_bus = (ppc['bus'][cosim_bus,7]*ppc['bus'][cosim_bus,9])*1.043
######################################### Starting Co-simulation ####################################################
for t in range(0, total_inteval, pf_interval):
############################ Publishing Voltage to GridLAB-D #######################################################
voltage_gld = complex(voltage_cosim_bus*1000)
logger.info("Voltage value = {} kV".format(abs(voltage_gld)/1000))
for i in range(0,pubkeys_count):
pub = pubid["m{}".format(i)]
status = h.helicsPublicationPublishComplex(pub, voltage_gld.real, voltage_gld.imag)
# status = h.helicsEndpointSendEventRaw(epid, "fixed_price", 10, t)
while grantedtime < t:
grantedtime = h.helicsFederateRequestTime (fed, t)
time.sleep(0.1)
############################# Subscribing to Feeder Load from to GridLAB-D ##############################################
for i in range(0,subkeys_count):
sub = subid["m{}".format(i)]
rload, iload = h.helicsInputGetComplex(sub)
logger.info("Python Federate grantedtime = {}".format(grantedtime))
logger.info("Load value = {} kW".format(complex(rload, iload)/1000))
#print(voltage_plot,real_demand)
actual_demand=peak_demand*bus_profiles[x,:]
ppc['bus'][:,2]=actual_demand
ppc['bus'][:,3]=actual_demand*math.tan(math.acos(.85))
ppc['bus'][cosim_bus,2]=rload*load_amplification_factor/1000000
ppc['bus'][cosim_bus,3]=iload*load_amplification_factor/1000000
for keys in data:
ax = fig.add_subplot(2, 3, i)
ax.plot(time_sim, data[keys])
ax.set_ylabel('EV Output in kW')
ax.set_xlabel('Time ')
ax.set_title(keys)
i=i+1
plt.show(block=True)
data['time'] = time_sim
data['feeder_load(real)'] = feeder_real_power
pd.DataFrame.from_dict(data=data).to_csv('EV_Outputs.csv', header=True)
t = 60 * 60 * 24
while grantedtime < t:
grantedtime = h.helicsFederateRequestTime (fed, t)
logger.info("Destroying federate")
destroy_federate(fed)