Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_datavalue():
dv = ua.DataValue(123)
assert dv.Value == ua.Variant(123)
assert type(dv.Value) == ua.Variant
dv = ua.DataValue('abc')
assert dv.Value == ua.Variant('abc')
now = datetime.utcnow()
dv.SourceTimestamp = now
def add_value(history, age):
value = ua.DataValue()
value.SourceTimestamp = datetime.utcnow() - timedelta(hours=age)
return history.save_node_value(NODE_ID, value)
async def main():
url = 'opc.tcp://localhost:4840/freeopcua/server/'
async with Client(url=url) as client:
uri = 'http://examples.freeopcua.github.io'
idx = await client.get_namespace_index(uri)
var = await client.nodes.root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
nb = 4000
start = time.time()
attr = ua.WriteValue()
attr.NodeId = var.nodeid
attr.AttributeId = ua.AttributeIds.Value
attr.Value = ua.DataValue(ua.Variant(1.0, ua.VariantType.Float))
params = ua.WriteParameters()
params.NodesToWrite = [attr]
for i in range(nb):
params.NodesToWrite[0].Value.Value.Value = i
result = await client.uaclient.write(params)
#result[0].check()
#await var.set_value(i)
print("\n Write frequency: \n", nb / (time.time() - start))
async def set_event_notifier(self, values):
"""
Set the event notifier attribute.
:param values: an iterable of EventNotifier enum values.
"""
event_notifier_bitfield = ua.EventNotifier.to_bitfield(values)
await self.write_attribute(
ua.AttributeIds.EventNotifier,
ua.DataValue(ua.Variant(event_notifier_bitfield, ua.VariantType.Byte))
)
def value_to_datavalue(val, varianttype=None):
"""
convert anyting to a DataValue using varianttype
"""
if isinstance(val, ua.DataValue):
datavalue = val
elif isinstance(val, ua.Variant):
datavalue = ua.DataValue(val)
datavalue.SourceTimestamp = datetime.utcnow()
else:
datavalue = ua.DataValue(ua.Variant(val, varianttype))
datavalue.SourceTimestamp = datetime.utcnow()
return datavalue
# write a single attr to OPC UA
my_python_obj.MyVariable = 12.3
my_python_obj.MyProperty = 55 # this value will not be visible to clients because write is not called
my_python_obj.write_value('MyVariable')
time.sleep(3)
# write all attr of the object to OPC UA
my_python_obj.MyVariable = 98.1
my_python_obj.MyProperty = 99
my_python_obj.write_value()
time.sleep(3)
# write directly to the OPC UA node of the object
dv = ua.DataValue(ua.Variant(5.5, ua.VariantType.Double))
my_python_obj.nodes['MyVariable'].write_value(dv)
dv = ua.DataValue(ua.Variant(4, ua.VariantType.UInt64))
my_python_obj.nodes['MyVariable'].write_value(dv)
time.sleep(3)
finally:
# close connection, remove subscriptions, etc
server.stop()
def _add_node_attr(self, item, nodedata, name, vtype=None, add_timestamps=False):
if item.SpecifiedAttributes & getattr(ua.NodeAttributesMask, name):
dv = ua.DataValue(ua.Variant(getattr(item, name), vtype))
if add_timestamps:
# dv.ServerTimestamp = datetime.utcnow() # Disabled until someone explains us it should be there
dv.SourceTimestamp = datetime.utcnow()
nodedata.attributes[getattr(ua.AttributeIds, name)] = AttributeValue(dv)
for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall):
attr = ua.WriteValue()
attr.NodeId = ua.NodeId(nodeid)
attr.AttributeId = ua.AttributeIds.Value
attr.Value = ua.DataValue(ua.Variant(10000, ua.VariantType.UInt32), ua.StatusCode(ua.StatusCodes.Good))
attr.Value.ServerTimestamp = datetime.utcnow()
params.NodesToWrite.append(attr)
result = await self.isession.write(params)
result[0].check()
async def read_node_history(self, node_id, start, end, nb_values):
table = self._get_table_name(node_id)
start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
cont = None
results = []
# select values from the database; recreate UA Variant from binary
try:
async with self._db.execute(
f'SELECT * FROM "{table}" WHERE "SourceTimestamp" BETWEEN ? AND ? '
f'ORDER BY "_Id" {order} LIMIT ?', (start_time, end_time, limit,)) as cursor:
async for row in cursor:
# rebuild the data value object
dv = ua.DataValue(variant_from_binary(Buffer(row[6])))
dv.ServerTimestamp = row[1]
dv.SourceTimestamp = row[2]
dv.StatusCode = ua.StatusCode(row[3])
results.append(dv)
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Read Error for %s: %s", node_id, e)
if nb_values:
if len(results) > nb_values:
cont = results[nb_values].SourceTimestamp
results = results[:nb_values]
return results, cont