Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_null_string(self):
v = ua.Variant(None, ua.VariantType.String)
b = variant_to_binary(v)
v2 = variant_from_binary(ua.utils.Buffer(b))
self.assertEqual(v.Value, v2.Value)
v = ua.Variant("", ua.VariantType.String)
b = variant_to_binary(v)
v2 = variant_from_binary(ua.utils.Buffer(b))
self.assertEqual(v.Value, v2.Value)
def test_datavalue(self):
dv = ua.DataValue(123)
self.assertEqual(dv.Value, ua.Variant(123))
self.assertEqual(type(dv.Value), ua.Variant)
dv = ua.DataValue('abc')
self.assertEqual(dv.Value, ua.Variant('abc'))
now = datetime.utcnow()
dv.SourceTimestamp = now
def test_extension_object(self):
obj = ua.UserNameIdentityToken()
obj.UserName = "admin"
obj.Password = b"pass"
obj2 = extensionobject_from_binary(ua.utils.Buffer(extensionobject_to_binary(obj)))
self.assertEqual(type(obj), type(obj2))
self.assertEqual(obj.UserName, obj2.UserName)
self.assertEqual(obj.Password, obj2.Password)
v1 = ua.Variant(obj)
v2 = variant_from_binary(ua.utils.Buffer(variant_to_binary(v1)))
self.assertEqual(type(v1), type(v2))
self.assertEqual(v1.VariantType, v2.VariantType)
def test_value(self):
o = self.opc.get_objects_node()
var = ua.Variant(1.98, ua.VariantType.Double)
v = o.add_variable(3, 'VariableValue', var)
val = v.get_value()
self.assertEqual(1.98, val)
dvar = ua.DataValue(var)
dv = v.get_data_value()
self.assertEqual(ua.DataValue, type(dv))
self.assertEqual(dvar.Value, dv.Value)
self.assertEqual(dvar.Value, var)
extobj = ua.EnumValueType()
extobj.Value = 1466724
extobj.DisplayName.Text = 'Method'
extobj.Description.Text = 'All method attributes are specified.'
value.append(extobj)
extobj = ua.EnumValueType()
extobj.Value = 1371236
extobj.DisplayName.Text = 'ReferenceType'
extobj.Description.Text = 'All reference type attributes are specified.'
value.append(extobj)
extobj = ua.EnumValueType()
extobj.Value = 1335532
extobj.DisplayName.Text = 'View'
extobj.Description.Text = 'All view attributes are specified.'
value.append(extobj)
attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)
attrs.ValueRank = 1
node.NodeAttributes = attrs
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=37")
ref.SourceNodeId = ua.NodeId.from_string("i=11881")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=78")
refs.append(ref)
server.add_references(refs)
node = ua.AddNodesItem()
node.RequestedNewNodeId = ua.NodeId.from_string("i=376")
node.BrowseName = ua.QualifiedName.from_string("AddNodesItem")
cont_timestamps = []
results = []
# select events from the database; SQL select clause is built from EventFilter and available fields
try:
for row in _c_read.execute(
'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
.format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
fdict = {}
cont_timestamps.append(row[0])
for i, field in enumerate(row[1:]):
if field is not None:
fdict[clauses[i]] = variant_from_binary(Buffer(field))
else:
fdict[clauses[i]] = ua.Variant(None)
results.append(events.Event.from_field_dict(fdict))
except sqlite3.Error as e:
self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
if nb_values:
if len(results) > nb_values: # start > ua.get_win_epoch() and
cont = cont_timestamps[nb_values]
results = results[:nb_values]
return results, cont
def to_variant(*args):
uaargs = []
for arg in args:
if not isinstance(arg, ua.Variant):
arg = ua.Variant(arg)
uaargs.append(arg)
return uaargs
dt_attributes = ua.DataTypeAttributes()
dt_attributes.DisplayName = ua.LocalizedText(type_name)
dt_node.NodeAttributes = dt_attributes
# create description node
desc_node = ua.AddNodesItem()
desc_node.RequestedNewNodeId = description_node_id
desc_node.BrowseName = ua.QualifiedName(name, self._idx)
desc_node.NodeClass = ua.NodeClass.Variable
desc_node.ParentNodeId = self.dict_id
desc_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent, 0)
desc_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeDescriptionType, 0)
desc_attributes = ua.VariableAttributes()
desc_attributes.DisplayName = ua.LocalizedText(type_name)
desc_attributes.DataType = ua.NodeId(ua.ObjectIds.String)
desc_attributes.Value = ua.Variant(name, ua.VariantType.String)
desc_attributes.ValueRank = -1
desc_node.NodeAttributes = desc_attributes
# create object node which the loaded python class should link to
obj_node = ua.AddNodesItem()
obj_node.RequestedNewNodeId = bind_obj_node_id
obj_node.BrowseName = ua.QualifiedName('Default Binary', 0)
obj_node.NodeClass = ua.NodeClass.Object
obj_node.ParentNodeId = data_type_node_id
obj_node.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasEncoding, 0)
obj_node.TypeDefinition = ua.NodeId(ua.ObjectIds.DataTypeEncodingType, 0)
obj_attributes = ua.ObjectAttributes()
obj_attributes.DisplayName = ua.LocalizedText('Default Binary')
obj_attributes.EventNotifier = 0
obj_node.NodeAttributes = obj_attributes
# Open server
print('Configuring FreeOpcUa test server')
#logging.basicConfig(level=logging.DEBUG)
server = Server()
server.set_endpoint(sUri)
cert_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'tests', 'data', 'cert')
server.load_certificate(os.path.join(cert_dir, 'server_2k_cert.der'))
server.load_private_key(os.path.join(cert_dir, 'server_2k_key.pem'))
# Nodes are created under the Objects node
objects = server.get_objects_node()
# Use the same nodes as in the Read test
for i,(sTypName,typ,val,_) in enumerate(variantInfoList):
nid = 1000 + i + 1
node = objects.add_variable(ua.NodeId(nid, 0), sTypName, ua.Variant(val, typ))
node.set_writable()
# Add a node which increments, the target of an interesting subscription
nodeCnt = objects.add_variable(ua.NodeId('Counter', 0), 'Counter', ua.Variant(0, ua.VariantType.UInt64))
nodeCnt.set_writable()
# Add a writable node, so that a client can subscribe to it, while another one can modify it.
nodeStr = objects.add_variable(ua.NodeId('StatusString', 0), 'StatusString', ua.Variant('Everything is ok.', ua.VariantType.String))
nodeStr.set_writable()
# Starts the server
server.start()
try:
print('Server started. Stops in {:.2f} s'.format(args.msTimeout/1000.))
t0 = time.time()+args.msTimeout/1000.
# The freeopcua toolkit is heavily an asyncio thing, which is run in another thread
i = 0
extobj = ua.EnumValueType()
extobj.Value = 524288
extobj.DisplayName.Text = 'ValueRank'
extobj.Description.Text = 'The value rank attribute is writable.'
value.append(extobj)
extobj = ua.EnumValueType()
extobj.Value = 1048576
extobj.DisplayName.Text = 'WriteMask'
extobj.Description.Text = 'The write mask attribute is writable.'
value.append(extobj)
extobj = ua.EnumValueType()
extobj.Value = 2097152
extobj.DisplayName.Text = 'ValueForVariableType'
extobj.Description.Text = 'The value attribute is writable.'
value.append(extobj)
attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)
attrs.ValueRank = 1
node.NodeAttributes = attrs
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=37")
ref.SourceNodeId = ua.NodeId.from_string("i=11882")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=78")
refs.append(ref)
server.add_references(refs)
node = ua.AddNodesItem()
node.RequestedNewNodeId = ua.NodeId.from_string("i=521")
node.BrowseName = ua.QualifiedName.from_string("ContinuationPoint")