Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
result, mdata = self._make_monitored_item_common(params)
ev_notify_byte = self.aspace.read_attribute_value(params.ItemToMonitor.NodeId,
ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
return result
# result.FilterResult = ua.EventFilterResult() # spec says we can ignore if not error
mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.filter.WhereClause)
self._commit_monitored_item(result, mdata)
if params.ItemToMonitor.NodeId not in self._monitored_events:
self._monitored_events[params.ItemToMonitor.NodeId] = []
self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
return result
async def read_data_value(self):
"""
Get value of a node as a DataValue object. Only variables (and properties) have values.
An exception will be generated for other node types.
DataValue contain a variable value as a variant as well as server and source timestamps
"""
return await self.read_attribute(ua.AttributeIds.Value)
async def read_node_class(self):
"""
get node class attribute of node
"""
result = await self.read_attribute(ua.AttributeIds.NodeClass)
return result.Value.Value
def _lsprint_long(pnode, depth, indent=""):
if not indent:
print("{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
print("")
for node in pnode.get_children():
attrs = node.read_attributes([ua.AttributeIds.DisplayName,
ua.AttributeIds.BrowseName,
ua.AttributeIds.NodeClass,
ua.AttributeIds.WriteMask,
ua.AttributeIds.UserWriteMask,
ua.AttributeIds.DataType,
ua.AttributeIds.Value])
name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
update = attrs[-1].ServerTimestamp
if nclass == ua.NodeClass.Variable:
print("{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
else:
print("{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
if depth:
_lsprint_long(node, depth - 1, indent + " ")
def _lsprint_long(pnode, depth, indent=""):
if not indent:
print("{0:30} {1:25} {2:25} {3:10} {4:30} {5:25}".format("DisplayName", "NodeId", "BrowseName", "DataType", "Timestamp", "Value"))
print("")
for node in pnode.get_children():
attrs = node.read_attributes([ua.AttributeIds.DisplayName,
ua.AttributeIds.BrowseName,
ua.AttributeIds.NodeClass,
ua.AttributeIds.WriteMask,
ua.AttributeIds.UserWriteMask,
ua.AttributeIds.DataType,
ua.AttributeIds.Value])
name, bname, nclass, mask, umask, dtype, val = [attr.Value.Value for attr in attrs]
update = attrs[-1].ServerTimestamp
if nclass == ua.NodeClass.Variable:
print("{0}{1:30} {2:25} {3:25} {4:10} {5!s:30} {6!s:25}".format(indent, name.to_string(), node.nodeid.to_string(), bname.to_string(), dtype.to_string(), update, val))
else:
print("{0}{1:30} {2:25} {3:25}".format(indent, name.to_string(), bname.to_string(), node.nodeid.to_string()))
if depth:
_lsprint_long(node, depth - 1, indent + " ")
async def set_event_notifier(self, values):
"""
Set the event notifier attribute.
:param values: an iterable of EventNotifier enum values.
"""
event_notifier_bitfield = ua.EventNotifier.to_bitfield(values)
await self.write_attribute(
ua.AttributeIds.EventNotifier,
ua.DataValue(ua.Variant(event_notifier_bitfield, ua.VariantType.Byte))
)
def _add_node_attributes(self, nodedata, item, add_timestamps):
# add common attrs
nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(
ua.DataValue(ua.Variant(nodedata.nodeid, ua.VariantType.NodeId))
)
nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(
ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName))
)
nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(
ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32))
)
# add requested attrs
self._add_nodeattributes(item.NodeAttributes, nodedata, add_timestamps)
async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
results = []
for item in params.ItemsToCreate:
if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
result = self._create_events_monitored_item(item)
else:
result = await self._create_data_change_monitored_item(item)
results.append(result)
return results
async def read_value_rank(self):
"""
Read and return ArrayDimensions attribute of node
"""
res = await self.read_attribute(ua.AttributeIds.ValueRank)
return res.Value.Value
async def read_browse_name(self):
"""
Get browse name of a node. A browse name is a QualifiedName object
composed of a string(name) and a namespace index.
"""
result = await self.read_attribute(ua.AttributeIds.BrowseName)
return result.Value.Value