Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_event_generator_object(test, evgen, obj, emitting_node=None):
test.assertEqual(evgen.event.SourceName, obj.get_browse_name().Name)
test.assertEqual(evgen.event.SourceNode, obj.nodeid)
if not emitting_node:
test.assertEqual(obj.get_event_notifier(), {ua.EventNotifier.SubscribeToEvents})
refs = obj.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
else:
test.assertEqual(emitting_node.get_event_notifier(), {ua.EventNotifier.SubscribeToEvents})
refs = emitting_node.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
test.assertIn(evgen.event.EventType, [x.nodeid for x in refs])
def test_reference_generator_1(self):
id1 = ua.NodeId(1, namespaceidx=2, nodeidtype=ua.NodeIdType.Numeric)
id2 = ua.NodeId(2, namespaceidx=2, nodeidtype=ua.NodeIdType.Numeric)
ref = ua.NodeId(ua.ObjectIds.HasEncoding, 0)
result = reference_generator(id1, id2, ref)
self.assertTrue(result.IsForward)
self.assertEqual(result.ReferenceTypeId, ref)
self.assertEqual(result.SourceNodeId, id1)
self.assertEqual(result.TargetNodeClass, ua.NodeClass.DataType)
self.assertEqual(result.TargetNodeId, id2)
def __search_tags(self, node, recursion_level, sub=None):
try:
for childId in node.get_children():
ch = self.client.get_node(childId)
current_var_path = '.'.join(x.split(":")[1] for x in ch.get_path(20000, True))
if self.__interest_nodes:
if ch.get_node_class() == ua.NodeClass.Object:
for interest_node in self.__interest_nodes:
for int_node in interest_node:
if re.search(int_node.split('\\.')[recursion_level-2], ch.get_display_name().Text):
try:
methods = ch.get_methods()
for method in methods:
self.__available_object_resources[interest_node[int_node]["deviceName"]]["methods"].append({method.get_display_name().Text: method,
"node": ch})
except Exception as e:
log.exception(e)
self.__search_tags(ch, recursion_level+1, sub)
elif ch.get_node_class() == ua.NodeClass.Variable:
try:
for interest_node in self.__interest_nodes:
for int_node in interest_node:
if interest_node[int_node].get("attributes_updates"):
"""
node_class = node.get_node_class()
if node_class is ua.NodeClass.Object:
self.add_etree_object(node)
elif node_class is ua.NodeClass.ObjectType:
self.add_etree_object_type(node)
elif node_class is ua.NodeClass.Variable:
self.add_etree_variable(node)
elif node_class is ua.NodeClass.VariableType:
self.add_etree_variable_type(node)
elif node_class is ua.NodeClass.ReferenceType:
self.add_etree_reference_type(node)
elif node_class is ua.NodeClass.DataType:
self.add_etree_datatype(node)
elif node_class is ua.NodeClass.Method:
self.add_etree_method(node)
else:
self.logger.info("Exporting node class not implemented: %s ", node_class)
def _update_actions_state(self, current, previous):
node = self.get_current_node(current)
self.ui.actionCall.setEnabled(False)
if node:
if node.get_node_class() == ua.NodeClass.Method:
self.ui.actionCall.setEnabled(True)
instantiate_optional=True):
"""
instantiate a node type under parent
"""
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = bname
addnode.ParentNodeId = parentid
addnode.ReferenceTypeId = rdesc.ReferenceTypeId
addnode.TypeDefinition = rdesc.TypeDefinition
if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
addnode.NodeClass = ua.NodeClass.Object
_read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
addnode.NodeClass = ua.NodeClass.Variable
_read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
elif rdesc.NodeClass in (ua.NodeClass.Method, ):
addnode.NodeClass = ua.NodeClass.Method
_read_and_copy_attrs(node_type, ua.MethodAttributes(), addnode)
elif rdesc.NodeClass in (ua.NodeClass.DataType, ):
addnode.NodeClass = ua.NodeClass.DataType
_read_and_copy_attrs(node_type, ua.DataTypeAttributes(), addnode)
else:
logger.error("Instantiate: Node class not supported: %s", rdesc.NodeClass)
raise RuntimeError("Instantiate: Node class not supported")
return
if dname is not None:
addnode.NodeAttributes.DisplayName = dname
res = server.add_nodes([addnode])[0]
Instanciate a new node under 'parent' using a type
"""
print("Instanciating: node %s in %s" % (rdesc, parentid))
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = ua.NodeId()
addnode.BrowseName = rdesc.BrowseName
addnode.NodeClass = rdesc.NodeClass
addnode.ParentNodeId = parentid
addnode.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
addnode.TypeDefinition = rdesc.TypeDefinition
print("ADDNODE", addnode)
node_type = Node(server, rdesc.NodeId)
if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
print(node_type, " is object")
_read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
#_add_object_attrs(addnode, rdesc, node_type)
elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
print(node_type, " is variable")
_read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
#_add_variable_attrs(addnode, rdesc, node_type)
else:
print("Node class not supported: ", rdesc.NodeClass)
print("ADDNODE FINAL ", addnode)
server.add_nodes([addnode])
refs = []
def create_standard_address_space_Part13(server):
node = ua.AddNodesItem()
node.RequestedNewNodeId = NumericNodeId(11187, 0)
node.BrowseName = QualifiedName('AggregateConfigurationType', 0)
node.NodeClass = NodeClass.ObjectType
node.ParentNodeId = NumericNodeId(58, 0)
node.ReferenceTypeId = NumericNodeId(45, 0)
attrs = ua.ObjectTypeAttributes()
attrs.DisplayName = LocalizedText("AggregateConfigurationType")
attrs.IsAbstract = False
node.NodeAttributes = attrs
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = NumericNodeId(46, 0)
ref.SourceNodeId = NumericNodeId(11187, 0)
ref.TargetNodeClass = NodeClass.DataType
ref.TargetNodeId = NumericNodeId(11188, 0)
refs.append(ref)
ref = ua.AddReferencesItem()
bname,
dname=None,
recursive=True,
instantiate_optional=True):
"""
instantiate a node type under parent
"""
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = bname
addnode.ParentNodeId = parentid
addnode.ReferenceTypeId = rdesc.ReferenceTypeId
addnode.TypeDefinition = rdesc.TypeDefinition
if rdesc.NodeClass in (ua.NodeClass.Object, ua.NodeClass.ObjectType):
addnode.NodeClass = ua.NodeClass.Object
_read_and_copy_attrs(node_type, ua.ObjectAttributes(), addnode)
elif rdesc.NodeClass in (ua.NodeClass.Variable, ua.NodeClass.VariableType):
addnode.NodeClass = ua.NodeClass.Variable
_read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
elif rdesc.NodeClass in (ua.NodeClass.Method, ):
addnode.NodeClass = ua.NodeClass.Method
_read_and_copy_attrs(node_type, ua.MethodAttributes(), addnode)
elif rdesc.NodeClass in (ua.NodeClass.DataType, ):
addnode.NodeClass = ua.NodeClass.DataType
_read_and_copy_attrs(node_type, ua.DataTypeAttributes(), addnode)
else:
logger.error("Instantiate: Node class not supported: %s", rdesc.NodeClass)
raise RuntimeError("Instantiate: Node class not supported")
return
if dname is not None:
def create_standard_address_space_Part10(server):
node = ua.AddNodesItem()
node.RequestedNewNodeId = ua.NodeId.from_string("i=2391")
node.BrowseName = ua.QualifiedName.from_string("ProgramStateMachineType")
node.NodeClass = ua.NodeClass.ObjectType
node.ParentNodeId = ua.NodeId.from_string("i=2771")
node.ReferenceTypeId = ua.NodeId.from_string("i=45")
attrs = ua.ObjectTypeAttributes()
attrs.Description = ua.LocalizedText("A state machine for a program.")
attrs.DisplayName = ua.LocalizedText("ProgramStateMachineType")
attrs.IsAbstract = False
node.NodeAttributes = attrs
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=47")
ref.SourceNodeId = ua.NodeId.from_string("i=2391")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=3830")
refs.append(ref)