Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Create device sutype
devd_t = dev_t.add_object_type(0, "MyDeviceDervived")
v_t = devd_t.add_variable(0, "childparam", 1.0)
v_t.set_modelling_rule(True)
p_t = devd_t.add_property(0, "sensorx_id", "0340")
p_t.set_modelling_rule(True)
# instanciate device
nodes = instantiate(self.opc.nodes.objects, dev_t, bname="2:Device0001")
mydevice = nodes[0]
self.assertEqual(mydevice.get_node_class(), ua.NodeClass.Object)
self.assertEqual(mydevice.get_type_definition(), dev_t.nodeid)
obj = mydevice.get_child(["0:controller"])
prop = mydevice.get_child(["0:controller", "0:state"])
with self.assertRaises(ua.UaError):
mydevice.get_child(["0:controller", "0:vendor"])
with self.assertRaises(ua.UaError):
mydevice.get_child(["0:controller", "0:model"])
self.assertEqual(prop.get_type_definition().Identifier, ua.ObjectIds.PropertyType)
self.assertEqual(prop.get_value(), "Running")
self.assertNotEqual(prop.nodeid, prop_t.nodeid)
# instanciate device subtype
nodes = instantiate(self.opc.nodes.objects, devd_t, bname="2:Device0002")
mydevicederived = nodes[0]
prop1 = mydevicederived.get_child(["0:sensorx_id"])
var1 = mydevicederived.get_child(["0:childparam"])
var_parent = mydevicederived.get_child(["0:sensor"])
prop_parent = mydevicederived.get_child(["0:sensor_id"])
def test_custom_variant():
with pytest.raises(ua.UaError):
v = ua.Variant(b"ljsdfljds", ua.VariantTypeCustom(89))
v = ua.Variant(b"ljsdfljds", ua.VariantTypeCustom(61))
v2 = variant_from_binary(ua.utils.Buffer(variant_to_binary(v)))
assert v.VariantType == v2.VariantType
assert v == v2
def test_bad_string():
with pytest.raises(ua.UaStringParsingError):
ua.NodeId.from_string("ns=r;s=yu")
with pytest.raises(ua.UaStringParsingError):
ua.NodeId.from_string("i=r;ns=1")
with pytest.raises(ua.UaStringParsingError):
ua.NodeId.from_string("ns=1")
with pytest.raises(ua.UaError):
ua.QualifiedName.from_string("i:yu")
with pytest.raises(ua.UaError):
ua.QualifiedName.from_string("i:::yu")
def _to_nodeid(nodeid):
if isinstance(nodeid, int):
return ua.TwoByteNodeId(nodeid)
elif isinstance(nodeid, Node):
return nodeid.nodeid
elif isinstance(nodeid, ua.NodeId):
return nodeid
elif type(nodeid) in (str, bytes):
return ua.NodeId.from_string(nodeid)
else:
raise ua.UaError(f"Could not resolve '{nodeid}' to a type id")
def _guess_datatype(variant):
if variant.VariantType == ua.VariantType.ExtensionObject:
if variant.Value is None:
raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
if type(variant.Value) in (list, tuple):
if len(variant.Value) == 0:
raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
extobj = variant.Value[0]
else:
extobj = variant.Value
classname = extobj.__class__.__name__
if not hasattr(ua.ObjectIds, classname):
raise ua.UaError(f"Cannot guess DataType of {variant} of python type {type(variant)}")
return ua.NodeId(getattr(ua.ObjectIds, classname))
else:
return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
def require_cryptography(obj):
"""
Raise exception if cryptography module is not available.
Call this function in constructors.
"""
if not CRYPTOGRAPHY_AVAILABLE:
raise UaError(f"Can't use {obj.__class__.__name__}, cryptography module is not installed")
params.EndpointUrl = self.server_url.geturl()
params.SessionName = f"{self.description} Session{self._session_counter}"
# Requested maximum number of milliseconds that a Session should remain open without activity
params.RequestedSessionTimeout = self.session_timeout
params.MaxResponseMessageSize = 0 # means no max size
response = await self.uaclient.create_session(params)
if self.security_policy.host_certificate is None:
data = nonce
else:
data = self.security_policy.host_certificate + nonce
self.security_policy.asymmetric_cryptography.verify(data, response.ServerSignature.Signature)
self._server_nonce = response.ServerNonce
if not self.security_policy.peer_certificate:
self.security_policy.peer_certificate = response.ServerCertificate
elif self.security_policy.peer_certificate != response.ServerCertificate:
raise ua.UaError("Server certificate mismatch")
# remember PolicyId's: we will use them in activate_session()
ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
self._policy_ids = ep.UserIdentityTokens
# Actual maximum number of milliseconds that a Session shall remain open without activity
if self.session_timeout != response.RevisedSessionTimeout:
_logger.warning("Requested session timeout to be %dms, got %dms instead",
self.secure_channel_timeout, response.RevisedSessionTimeout)
self.session_timeout = response.RevisedSessionTimeout
self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
return response
if self._allow_prev_token and security_hdr.TokenId == self.prev_security_token.TokenId:
# From spec, part 4, section 5.5.2.1: Clients should accept Messages secured by an
# expired SecurityToken for up to 25 % of the token lifetime. This should ensure that
# Messages sent by the Server before the token expired are not rejected because of
# network delays.
timeout = self.prev_security_token.CreatedAt + \
timedelta(milliseconds=self.prev_security_token.RevisedLifetime * 1.25)
if timeout < datetime.utcnow():
raise ua.UaError(f"Security token id {security_hdr.TokenId} has timed out "
f"({timeout} < {datetime.utcnow()})")
return
expected_tokens = [self.security_token.TokenId, self.next_security_token.TokenId]
if self._allow_prev_token:
expected_tokens.insert(0, self.prev_security_token.TokenId)
raise ua.UaError(f"Invalid security token id {security_hdr.TokenId}, expected one of: {expected_tokens}")