Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
idtoken.PolicyId = "username"
idtoken.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken)
appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
appdesc.ApplicationUri = self._application_uri
appdesc.ApplicationType = self.application_type
appdesc.ProductUri = self.product_uri
appdesc.DiscoveryUrls.append(self.endpoint.geturl())
edp = ua.EndpointDescription()
edp.EndpointUrl = self.endpoint.geturl()
edp.Server = appdesc
if self.certificate:
edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
edp.SecurityMode = mode
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = idtokens
edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
edp.SecurityLevel = 0
self.iserver.add_endpoint(edp)
async def load_client_certificate(self, path: str):
"""
load our certificate from file, either pem or der
"""
self.user_certificate = await uacrypto.load_certificate(path)
def _add_certificate_auth(self, params, certificate, challenge):
params.UserIdentityToken = ua.X509IdentityToken()
params.UserIdentityToken.PolicyId = self.server_policy_id(ua.UserTokenType.Certificate, "certificate_basic256")
params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
# specs part 4, 5.6.3.1: the data to sign is created by appending
# the last serverNonce to the serverCertificate
params.UserTokenSignature = ua.SignatureData()
# use signature algorithm that was used for certificate generation
if certificate.signature_hash_algorithm.name == "sha256":
sig = uacrypto.sign_sha256(self.user_private_key, challenge)
params.UserTokenSignature.Algorithm = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
params.UserTokenSignature.Signature = sig
else:
sig = uacrypto.sign_sha1(self.user_private_key, challenge)
params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
params.UserTokenSignature.Signature = sig
def verify(self, data, signature):
expected = uacrypto.hmac_sha256(self.key, data)
if signature != expected:
raise uacrypto.InvalidSignature
def cert_to_string(der):
if not der:
return '[no certificate]'
try:
from .crypto import uacrypto
except ImportError:
return f"{len(der)} bytes"
cert = uacrypto.x509_from_der(der)
return uacrypto.x509_to_string(cert)
def verify(self, data, signature):
uacrypto.verify_sha1(self.server_cert, data, signature)
idtoken.PolicyId = "username"
idtoken.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken)
appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
appdesc.ApplicationUri = self._application_uri
appdesc.ApplicationType = self.application_type
appdesc.ProductUri = self.product_uri
appdesc.DiscoveryUrls.append(self.endpoint.geturl())
edp = ua.EndpointDescription()
edp.EndpointUrl = self.endpoint.geturl()
edp.Server = appdesc
if self.certificate:
edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
edp.SecurityMode = mode
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = idtokens
edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
edp.SecurityLevel = 0
self.iserver.add_endpoint(edp)
async def set_security(self,
policy,
certificate_path: str,
private_key_path: str,
private_key_password: str = None,
server_certificate_path: str = None,
mode: ua.MessageSecurityMode = ua.MessageSecurityMode.SignAndEncrypt):
"""
Set SecureConnection mode.
Call this before connect()
"""
if server_certificate_path is None:
# load certificate from server's list of endpoints
endpoints = await self.connect_and_get_server_endpoints()
endpoint = Client.find_endpoint(endpoints, mode, policy.URI)
server_cert = uacrypto.x509_from_der(endpoint.ServerCertificate)
else:
server_cert = await uacrypto.load_certificate(server_certificate_path)
cert = await uacrypto.load_certificate(certificate_path)
pk = await uacrypto.load_private_key(private_key_path, password=private_key_password)
self.security_policy = policy(server_cert, cert, pk, mode)
self.uaclient.set_security(self.security_policy)
async def load_certificate(self, path: str, format: str = None):
"""
load server certificate from file, either pem or der
"""
self.certificate = await uacrypto.load_certificate(path, format)
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep(pubkey, data)