Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_basic256_encrypt_fail(srv_crypto_all_certs):
# FIXME: how to make it fail???
_, cert = srv_crypto_all_certs
clt = Client(uri_crypto)
with pytest.raises(ua.UaError):
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
f"{EXAMPLE_PATH}certificate-example.der",
f"{EXAMPLE_PATH}private-key-example.pem",
None,
None,
mode=ua.MessageSecurityMode.None_
)
def _delete_monitored_items(self, mid: int):
if mid not in self._monitored_items:
return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
for k, v in self._monitored_events.items():
if mid in v:
v.remove(mid)
if not v:
self._monitored_events.pop(k)
break
for k, v in self._monitored_datachange.items():
if v == mid:
self.aspace.delete_datachange_callback(k)
self._monitored_datachange.pop(k)
break
self._monitored_items.pop(mid)
return ua.StatusCode()
async def history_read(self, params):
self.logger.info("history_read")
request = ua.HistoryReadRequest()
request.Parameters = params
data = await self.protocol.send_request(request)
response = struct_from_binary(ua.HistoryReadResponse, data)
self.logger.debug(response)
response.ResponseHeader.ServiceResult.check()
return response.Results
el = self.elements[index]
# ops = [self._eval_op(op, event) for op in el.FilterOperands]
ops = el.FilterOperands # just to make code more readable
if el.FilterOperator == ua.FilterOperator.Equals:
return self._eval_op(ops[0], event) == self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.IsNull:
return self._eval_op(ops[0], event) is None # FIXME: might be too strict
if el.FilterOperator == ua.FilterOperator.GreaterThan:
return self._eval_op(ops[0], event) > self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.LessThan:
return self._eval_op(ops[0], event) < self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
return self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
return self._eval_op(ops[0], event) <= self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.Like:
return self._like_operator(self._eval_op(ops[0], event), self._eval_op(ops[1], event))
if el.FilterOperator == ua.FilterOperator.Not:
return not self._eval_op(ops[0], event)
if el.FilterOperator == ua.FilterOperator.Between:
return self._eval_op(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.InList:
return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
if el.FilterOperator == ua.FilterOperator.And:
self.elements(ops[0].Index)
return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.Or:
return self._eval_op(ops[0], event) or self._eval_op(ops[1], event)
if el.FilterOperator == ua.FilterOperator.Cast:
self.logger.warn("Cast operand not implemented, assuming True")
return True
if el.FilterOperator == ua.FilterOperator.OfType:
def _get_sdef(self, node, obj):
if not obj.definitions:
return None
sdef = ua.StructureDefinition()
if obj.parent:
sdef.BaseDataType = obj.parent
for data in obj.refs:
if data.reftype == "HasEncoding":
# looks likebinary encodingisthe firt one...can someone confirm?
sdef.DefaultEncodingId = data.target
break
optional = False
for field in obj.definitions:
f = ua.StructureField()
f.Name = field.name
f.DataType = field.datatype
f.ValueRank = field.valuerank
f.IsOptional = field.optional
if f.IsOptional:
optional = True
f.ArrayDimensions = field.arraydim
f.Description = ua.LocalizedText(text=field.desc)
sdef.Fields.append(f)
if optional:
sdef.StructureType = ua.StructureType.StructureWithOptionalFields
else:
sdef.StructureType = ua.StructureType.Structure
return sdef
def _guess_datatype(variant):
if variant.VariantType == ua.VariantType.ExtensionObject:
if variant.Value is None:
raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
if type(variant.Value) in (list, tuple):
if len(variant.Value) == 0:
raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
extobj = variant.Value[0]
else:
extobj = variant.Value
classname = extobj.__class__.__name__
if not hasattr(ua.ObjectIds, classname):
raise ua.UaError(f"Cannot guess DataType of {variant} of python type {type(variant)}")
return ua.NodeId(getattr(ua.ObjectIds, classname))
else:
return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
idtokens.append(idtoken)
if "Basic256Sha256" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic256sha256'
idtoken.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken)
if "Username" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "username"
idtoken.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken)
appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
appdesc.ApplicationUri = self._application_uri
appdesc.ApplicationType = self.application_type
appdesc.ProductUri = self.product_uri
appdesc.DiscoveryUrls.append(self.endpoint.geturl())
edp = ua.EndpointDescription()
edp.EndpointUrl = self.endpoint.geturl()
edp.Server = appdesc
if self.certificate:
edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
edp.SecurityMode = mode
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = idtokens
edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
edp.SecurityLevel = 0
self.iserver.add_endpoint(edp)
self._policies = [ua.SecurityPolicyFactory()]
if self._security_policy != [ua.SecurityPolicyType.NoSecurity]:
if not (self.certificate and self.iserver.private_key):
self.logger.warning("Endpoints other than open requested but private key and certificate are not set.")
return
if ua.SecurityPolicyType.NoSecurity in self._security_policy:
self.logger.warning(
"Creating an open endpoint to the server, although encrypted endpoints are enabled.")
if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(
ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
self.iserver.private_key))
if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
self._policies.append(
ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key))
def unpack_uatype(vtype, data):
if hasattr(Primitives, vtype.name):
st = getattr(Primitives, vtype.name)
return st.unpack(data)
elif vtype.value > 25:
return Primitives.Bytes.unpack(data)
elif vtype == ua.VariantType.ExtensionObject:
return extensionobject_from_binary(data)
elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
return nodeid_from_binary(data)
elif vtype == ua.VariantType.Variant:
return variant_from_binary(data)
else:
if hasattr(ua, vtype.name):
cls = getattr(ua, vtype.name)
return struct_from_binary(cls, data)
else:
raise UaError(f'Cannot unpack unknown variant type {vtype}')