Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.aspace = AddressSpace()
self.attribute_service = AttributeService(self.aspace)
self.view_service = ViewService(self.aspace)
self.method_service = MethodService(self.aspace)
self.node_mgt_service = NodeManagementService(self.aspace)
self.asyncio_transports = []
self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
self.history_manager = HistoryManager(self)
if user_manager is None:
logger.warning("No user manager specified. Using default permissive manager instead.")
user_manager = PermissiveUserManager()
self.user_manager = user_manager
# create a session to use on server side
self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal",
user=User(role=UserRole.Admin))
self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
def _get_node(isession, whatever):
if isinstance(whatever, Node):
return whatever
if isinstance(whatever, ua.NodeId):
return Node(isession, whatever)
return Node(isession, ua.NodeId(whatever))
def __init__(self, server):
self.root = Node(server, ObjectIds.RootFolder)
self.objects = Node(server, ObjectIds.ObjectsFolder)
self.server = Node(server, ObjectIds.Server)
self.types = Node(server, ObjectIds.TypesFolder)
self.base_object_type = Node(server, ObjectIds.BaseObjectType)
self.base_data_type = Node(server, ObjectIds.BaseDataType)
self.base_event_type = Node(server, ObjectIds.BaseEventType)
self.base_variable_type = Node(server, ObjectIds.BaseVariableType)
self.folder_type = Node(server, ObjectIds.FolderType)
self.enum_data_type = Node(server, ObjectIds.Enumeration)
self.types = Node(server, ObjectIds.TypesFolder)
self.data_types = Node(server, ObjectIds.DataTypesFolder)
self.event_types = Node(server, ObjectIds.EventTypesFolder)
self.reference_types = Node(server, ObjectIds.ReferenceTypesFolder)
self.variable_types = Node(server, ObjectIds.VariableTypesFolder)
self.object_types = Node(server, ObjectIds.ObjectTypesFolder)
self.namespace_array = Node(server, ObjectIds.Server_NamespaceArray)
self.opc_binary = Node(server, ObjectIds.OPCBinarySchema_TypeSystem)
self.base_structure_type = Node(server, ObjectIds.Structure)
self.server_state = Node(server, ObjectIds.Server_ServerStatus_State)
async def setup_nodes(self):
"""
Set up some nodes as defined by spec
"""
uries = ['http://opcfoundation.org/UA/']
ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
await ns_node.write_value(uries)
params = ua.WriteParameters()
for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall):
attr = ua.WriteValue()
def _to_nodeid(nodeid):
if isinstance(nodeid, int):
return ua.TwoByteNodeId(nodeid)
elif isinstance(nodeid, Node):
return nodeid.nodeid
elif isinstance(nodeid, ua.NodeId):
return nodeid
elif type(nodeid) in (str, bytes):
return ua.NodeId.from_string(nodeid)
else:
raise ua.UaError(f"Could not resolve '{nodeid}' to a type id")
If numvalues is > 0 and number of events in period is > numvalues
then result will be truncated
"""
details = ua.ReadEventDetails()
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.get_win_epoch()
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
if not isinstance(evtypes, (list, tuple)):
evtypes = [evtypes]
evtypes = [Node(self.server, evtype) for evtype in evtypes]
evfilter = await get_filter_from_event_type(evtypes)
details.Filter = evfilter
result = await self.history_read_events(details)
result.StatusCode.check()
event_res = []
for res in result.HistoryData.Events:
event_res.append(
Event.from_event_fields(evfilter.SelectClauses, res.EventFields)
)
return event_res
def __eq__(self, other):
if isinstance(other, Node) and self.nodeid == other.nodeid:
return True
return False
def get_node(self, nodeid: Union[ua.NodeId, str]) -> Node:
"""
Get node using NodeId object or a string representing a NodeId.
"""
return Node(self.uaclient, nodeid)
async def get_referenced_nodes(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both,
nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
"""
returns referenced nodes based on specific filter
Paramters are the same as for get_references
"""
references = await self.get_references(refs, direction, nodeclassmask, includesubtypes)
nodes = []
for desc in references:
node = Node(self.server, desc.NodeId)
nodes.append(node)
return nodes
def __init__(self, server):
self.root = Node(server, ObjectIds.RootFolder)
self.objects = Node(server, ObjectIds.ObjectsFolder)
self.server = Node(server, ObjectIds.Server)
self.types = Node(server, ObjectIds.TypesFolder)
self.base_object_type = Node(server, ObjectIds.BaseObjectType)
self.base_data_type = Node(server, ObjectIds.BaseDataType)
self.base_event_type = Node(server, ObjectIds.BaseEventType)
self.base_variable_type = Node(server, ObjectIds.BaseVariableType)
self.folder_type = Node(server, ObjectIds.FolderType)
self.enum_data_type = Node(server, ObjectIds.Enumeration)
self.types = Node(server, ObjectIds.TypesFolder)
self.data_types = Node(server, ObjectIds.DataTypesFolder)
self.event_types = Node(server, ObjectIds.EventTypesFolder)
self.reference_types = Node(server, ObjectIds.ReferenceTypesFolder)
self.variable_types = Node(server, ObjectIds.VariableTypesFolder)
self.object_types = Node(server, ObjectIds.ObjectTypesFolder)
self.namespace_array = Node(server, ObjectIds.Server_NamespaceArray)
self.opc_binary = Node(server, ObjectIds.OPCBinarySchema_TypeSystem)
self.base_structure_type = Node(server, ObjectIds.Structure)
self.server_state = Node(server, ObjectIds.Server_ServerStatus_State)