Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
myuint64 = client.get_node(f"ns={static_idx};s=UInt64")
myint32 = client.get_node(f"ns={static_idx};s=Int32")
myuint32 = client.get_node(f"ns={static_idx};s=UInt32")
var = await client.nodes.objects.get_child(["3:Simulation", "3:Random"])
print("var is: ", var)
print("value of var is: ", await var.read_value())
#await var.write_value(ua.Variant([23], ua.VariantType.Double))
print("setting float value")
await myfloat.write_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", await myfloat.read_value())
device = await client.nodes.objects.get_child(["6:MyObjects", "6:MyDevice"])
method = await device.get_child("6:MyMethod")
result = await device.call_method(method, ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double))
print("Mehtod result is: ", result)
handler = SubHandler()
sub = await client.create_subscription(500, handler)
handle = await sub.subscribe_data_change(var)
handle2 = await sub.subscribe_events(evtypes=2788)
cond = await client.nodes.root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "0:ConditionType"])
for _ in range(5):
# refresh server condition to force generation of events
await cond.call_method("0:ConditionRefresh", ua.Variant(sub.subscription_id, ua.VariantType.UInt32))
await asyncio.sleep(1)
await sub.unsubscribe(handle)
var = client.get_node(ua.NodeId("Random1", 5))
print("var is: ", var)
print("value of var is: ", var.read_value())
var.write_value(ua.Variant([23], ua.VariantType.Double))
print("setting float value")
myfloat.write_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", myfloat.read_value())
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(var)
device = objects.get_child(["2:MyObjects", "2:MyDevice"])
method = device.get_child("2:MyMethod")
result = device.call_method(method, ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double))
print("Mehtod result is: ", result)
#embed()
time.sleep(3)
sub.unsubscribe(handle)
sub.delete()
#client.close_session()
finally:
client.disconnect()
stop_thread_loop()
async def trigger(self, time_attr=None, message=None):
"""
Trigger the event. This will send a notification to all subscribed clients
"""
self.event.EventId = ua.Variant(uuid.uuid4().hex.encode('utf-8'), ua.VariantType.ByteString)
if time_attr:
self.event.Time = time_attr
else:
self.event.Time = datetime.utcnow()
self.event.ReceiveTime = datetime.utcnow()
self.event.LocalTime = ua.uaprotocol_auto.TimeZoneDataType()
if sys.version_info.major > 2:
localtime = time.localtime(self.event.Time.timestamp())
self.event.LocalTime.Offset = localtime.tm_gmtoff//60
else:
localtime = time.localtime(time.mktime(self.event.Time.timetuple()))
self.event.LocalTime.Offset = -(time.altzone if localtime.tm_isdst else time.timezone)
self.event.LocalTime.DaylightSavingInOffset = bool(localtime.tm_isdst != -1)
if message:
return ua.Variant([
uuid.UUID(guid) for guid in obj.value
], getattr(ua.VariantType, obj.valuetype[6:]))
elif obj.valuetype.startswith("ListOf"):
vtype = obj.valuetype[6:]
if hasattr(ua.ua_binary.Primitives, vtype):
return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
elif vtype == "LocalizedText":
return ua.Variant([getattr(ua, vtype)(text=item["Text"], locale=item["Locale"]) for item in obj.value])
else:
return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
elif obj.valuetype == 'ExtensionObject':
extobj = self._make_ext_obj(obj.value)
return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype == 'Guid':
return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype == 'LocalizedText':
ltext = ua.LocalizedText()
for name, val in obj.value:
if name == "Text":
ltext.Text = val
elif name == "Locale":
ltext.Locale = val
else:
_logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
return ua.Variant(ltext, ua.VariantType.LocalizedText)
elif obj.valuetype == 'NodeId':
return ua.Variant(ua.NodeId.from_string(obj.value))
else:
return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
async def write_value_rank(self, value):
"""
Set attribute ArrayDimensions of node
"""
v = ua.Variant(value, ua.VariantType.Int32)
await self.write_attribute(ua.AttributeIds.ValueRank, ua.DataValue(v))
[('MyNumericProperty', ua.VariantType.Float),
('MyStringProperty', ua.VariantType.String)])
# create second event
etype2 = server.create_custom_event_type(2, 'MySecondEvent', ua.ObjectIds.BaseEventType,
[('MyOtherProperty', ua.VariantType.Float)])
# get an event generator for the myobj node which generates custom events
myevgen = server.get_event_generator(etype, myobj)
myevgen.event.Severity = 500
myevgen.event.MyStringProperty = ua.Variant("hello world")
myevgen.event.MyNumericProperty = ua.Variant(-456)
# get another event generator for the myobj node which generates different custom events
myevgen2 = server.get_event_generator(etype2, myobj)
myevgen2.event.Severity = 123
myevgen2.event.MyOtherProperty = ua.Variant(1.337)
# get an event generator for the server node which generates BaseEventType
serverevgen = server.get_event_generator()
serverevgen.event.Severity = 111
# Configure server to use sqlite as history database (default is a simple in memory dict)
server.iserver.history_manager.set_storage(HistorySQLite("my_event_history.sql"))
# starting!
server.start()
# enable history for myobj events; must be called after start since it uses subscription
server.iserver.enable_history_event(myobj, period=None)
# enable history for server events; must be called after start since it uses subscription
server_node = server.get_node(ua.ObjectIds.Server)
def _add_node_attributes(self, nodedata, item, add_timestamps):
# add common attrs
nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
)
nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
)
nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
)
# add requested attrs
self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
def _val_to_variant(val, args):
array = args.array
if args.datatype == "guess":
if val in ("true", "True", "false", "False"):
return _arg_to_variant(val, array, _arg_to_bool)
try:
return _arg_to_variant(val, array, int)
except ValueError:
try:
return _arg_to_variant(val, array, float)
except ValueError:
return _arg_to_variant(val, array, str)
elif args.datatype == "bool":
if val in ("1", "True", "true"):
return ua.Variant(True, ua.VariantType.Boolean)
else:
return ua.Variant(False, ua.VariantType.Boolean)
elif args.datatype == "sbyte":
return _arg_to_variant(val, array, int, ua.VariantType.SByte)
elif args.datatype == "byte":
return _arg_to_variant(val, array, int, ua.VariantType.Byte)
# elif args.datatype == "uint8":
# return _arg_to_variant(val, array, int, ua.VariantType.Byte)
elif args.datatype == "uint16":
return _arg_to_variant(val, array, int, ua.VariantType.UInt16)
elif args.datatype == "uint32":
return _arg_to_variant(val, array, int, ua.VariantType.UInt32)
elif args.datatype == "uint64":
return _arg_to_variant(val, array, int, ua.VariantType.UInt64)
# elif args.datatype == "int8":
# return ua.Variant(int(val), ua.VariantType.Int8)
def string_to_variant(string, vtype):
"""
convert back a string to an ua.Variant
"""
return ua.Variant(string_to_val(string, vtype), vtype)