Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_custom_type(test, type, base_type, node_class=None):
base = opcua.Node(test.opc.iserver.isession, ua.NodeId(base_type))
test.assertTrue(type in base.get_children())
nodes = type.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
test.assertEqual(base, nodes[0])
properties = type.get_properties()
if node_class:
test.assertEqual(node_class, type.get_node_class())
test.assertIsNot(properties, None)
test.assertEqual(len(properties), 2)
test.assertTrue(type.get_child("2:PropertyNum") in properties)
test.assertEqual(type.get_child("2:PropertyNum").get_data_value().Value.VariantType, ua.VariantType.Int32)
test.assertTrue(type.get_child("2:PropertyString") in properties)
test.assertEqual(type.get_child("2:PropertyString").get_data_value().Value.VariantType, ua.VariantType.String)
def test_data_type_to_variant_type(self):
test_data = {
ua.ObjectIds.Boolean: ua.VariantType.Boolean,
ua.ObjectIds.Byte: ua.VariantType.Byte,
ua.ObjectIds.String: ua.VariantType.String,
ua.ObjectIds.Int32: ua.VariantType.Int32,
ua.ObjectIds.UInt32: ua.VariantType.UInt32,
ua.ObjectIds.NodeId: ua.VariantType.NodeId,
ua.ObjectIds.LocalizedText: ua.VariantType.LocalizedText,
ua.ObjectIds.Structure: ua.VariantType.ExtensionObject,
ua.ObjectIds.EnumValueType: ua.VariantType.ExtensionObject,
ua.ObjectIds.Enumeration: ua.VariantType.Int32, # enumeration
ua.ObjectIds.AttributeWriteMask: ua.VariantType.UInt32,
ua.ObjectIds.AxisScaleEnumeration: ua.VariantType.Int32 # enumeration
}
for dt, vdt in test_data.items():
self.assertEqual(ua_utils.data_type_to_variant_type(self.opc.get_node(ua.NodeId(dt))), vdt)
def runTest(self):
return # FIXME broken
tmpfile = NamedTemporaryFile()
path = tmpfile.name
tmpfile.close()
# create cache file
server = Server(shelffile=path)
# modify cache content
id = ua.NodeId(ua.ObjectIds.Server_ServerStatus_SecondsTillShutdown)
s = shelve.open(path, "w", writeback=True)
s[id.to_string()].attributes[ua.AttributeIds.Value].value = ua.DataValue(123)
s.close()
# ensure that we are actually loading from the cache
server = Server(shelffile=path)
self.assertEqual(server.get_node(id).get_value(), 123)
os.remove(path)
def test_eventgenerator_double_customEvent(self):
event1 = self.opc.create_custom_event_type(3, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Int32), ('PropertyString', ua.VariantType.String)])
event2 = self.opc.create_custom_event_type(4, 'MyEvent2', event1, [('PropertyBool', ua.VariantType.Boolean), ('PropertyInt', ua.VariantType.Int32)])
evgen = self.opc.get_event_generator(event2, ua.ObjectIds.Server)
check_eventgenerator_CustomEvent(self, evgen, event2)
check_eventgenerator_SourceServer(self, evgen)
# Properties from MyEvent1
self.assertEqual(evgen.event.PropertyNum, 0)
self.assertEqual(evgen.event.PropertyString, None)
# Properties from MyEvent2
self.assertEqual(evgen.event.PropertyBool, False)
self.assertEqual(evgen.event.PropertyInt, 0)
def run(self):
self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
while not self._do_stop:
with self._cond:
self._cond.wait(self.timeout / 1000)
if self._do_stop:
break
self.logger.debug("renewing channel")
self.client.open_secure_channel(renew=True)
val = server_state.get_value()
self.logger.debug("server state is: %s ", val)
self.logger.debug("keepalive thread has stopped")
ref.ReferenceTypeId = ua.NodeId.from_string("i=46")
ref.SourceNodeId = ua.NodeId.from_string("i=12169")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=120")
refs.append(ref)
server.add_references(refs)
node = ua.AddNodesItem()
node.RequestedNewNodeId = ua.NodeId.from_string("i=3068")
node.BrowseName = ua.QualifiedName.from_string("NodeVersion")
node.NodeClass = ua.NodeClass.Variable
node.TypeDefinition = ua.NodeId.from_string("i=68")
attrs = ua.VariableAttributes()
attrs.Description = ua.LocalizedText("The version number of the node (used to indicate changes to references of the owning node).")
attrs.DisplayName = ua.LocalizedText("NodeVersion")
attrs.DataType = ua.NodeId(ua.ObjectIds.String)
attrs.ValueRank = -2
node.NodeAttributes = attrs
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=40")
ref.SourceNodeId = ua.NodeId.from_string("i=3068")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=68")
refs.append(ref)
server.add_references(refs)
node = ua.AddNodesItem()
node.RequestedNewNodeId = ua.NodeId.from_string("i=12170")
node.BrowseName = ua.QualifiedName.from_string("ViewVersion")
if isinstance(val, (list, tuple)):
if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
elname = "uax:ListOf" + type_name
else: # this is an extentionObject:
elname = "uax:ListOfExtensionObject"
list_el = Et.SubElement(el, elname)
for nval in val:
self._value_to_etree(list_el, type_name, dtype, nval)
else:
dtype_base = get_base_data_type(self.server.get_node(dtype))
dtype_base = dtype_base.nodeid
if dtype_base == ua.NodeId(ua.ObjectIds.Enumeration):
dtype_base = ua.NodeId(ua.ObjectIds.Int32)
type_name = ua.ObjectIdNames[dtype_base.Identifier]
if dtype_base.NamespaceIndex == 0 and dtype_base.Identifier <= 21:
type_name = ua.ObjectIdNames[dtype_base.Identifier]
val_el = Et.SubElement(el, "uax:" + type_name)
self._val_to_etree(val_el, dtype_base, val)
else:
self._extobj_to_etree(el, type_name, dtype, val)
def _get_datatype_from_string(self, idx, name):
#FIXME: this is very heavy and missing recusion, what is the correct way to do that?
for node in self.server_mgr.get_node(ua.ObjectIds.BaseDataType).get_children():
try:
dtype = node.get_child(f'{idx}:{name}')
except ua.UaError:
continue
return dtype
return None
def _rdesc_from_node(parent, node):
results = node.get_attributes([ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName])
nclass, qname, dname = [res.Value.Value for res in results]
rdesc = ua.ReferenceDescription()
rdesc.NodeId = node.nodeid
rdesc.BrowseName = qname
rdesc.DisplayName = dname
rdesc.NodeClass = nclass
if parent.get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
else:
rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
typedef = node.get_type_definition()
if typedef:
rdesc.TypeDefinition = typedef
return rdesc
def get_node_subtypes(node, nodes=None):
if nodes is None:
nodes = [node]
for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
nodes.append(child)
get_node_subtypes(child, nodes)
return nodes