Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_text_with_locale():
t0 = ua.LocalizedText('Root')
t1 = ua.LocalizedText('Root', 'de-AT')
t2 = ua.LocalizedText('Root', 'de-AT')
t3 = ua.LocalizedText('Root', 'de-DE')
t4 = ua.LocalizedText(locale='de-DE')
t5 = ua.LocalizedText(locale='de-DE')
assert t0 != t1
assert t1 == t2
assert t1 != t3
assert t3 != t4
assert t4 == t5
t6 = struct_from_binary(ua.LocalizedText, ua.utils.Buffer(struct_to_binary(t1)))
assert t1 == t6
elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
_logger.info("Close session request (%s)", user)
if self.session:
deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
await self.session.close_session(deletesubs)
else:
_logger.info("Request to close non-existing session (%s)", user)
response = ua.CloseSessionResponse()
_logger.info("sending close session response (%s)", user)
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
_logger.info("Activate session request (%s)", user)
params = struct_from_binary(ua.ActivateSessionParameters, body)
if not self.session:
_logger.info("request to activate non-existing session (%s)", user)
raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
if self._connection.security_policy.host_certificate is None:
data = self.session.nonce
else:
data = self._connection.security_policy.host_certificate + self.session.nonce
self._connection.security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
result = self.session.activate_session(params, self._connection.security_policy.peer_certificate)
response = ua.ActivateSessionResponse()
response.Parameters = result
# _logger.info("sending read response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
_logger.info("Read request (%s)", user)
self._connection.close()
response = ua.CloseSecureChannelResponse()
self.send_response(requesthdr.RequestHandle, seqhdr, response)
return False
elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
_logger.info("call request (%s)", user)
params = struct_from_binary(ua.CallParameters, body)
results = await self.session.call(params.MethodsToCall)
response = ua.CallResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.SetMonitoringModeRequest_Encoding_DefaultBinary):
_logger.info("set monitoring mode request (%s)", user)
params = struct_from_binary(ua.SetMonitoringModeParameters, body)
# FIXME: Implement SetMonitoringMode
# For now send dummy results to keep clients happy
response = ua.SetMonitoringModeResponse()
results = ua.SetMonitoringModeResult()
ids = params.MonitoredItemIds
statuses = [ua.StatusCode(ua.StatusCodes.Good) for node_id in ids]
results.Results = statuses
response.Parameters = results
_logger.info("sending set monitoring mode response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.SetPublishingModeRequest_Encoding_DefaultBinary):
_logger.info("set publishing mode request (%s)", user)
params = struct_from_binary(ua.SetPublishingModeParameters, body)
# FIXME: Implement SetPublishingMode
# For now send dummy results to keep clients happy
response.Parameters.Results = results
_logger.info("sending delete references response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
_logger.info("create subscription request")
params = struct_from_binary(ua.CreateSubscriptionParameters, body)
result = await self.session.create_subscription(params, callback=self.forward_publish_response)
response = ua.CreateSubscriptionResponse()
response.Parameters = result
_logger.info("sending create subscription response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
_logger.info("delete subscriptions request")
params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
results = await self.session.delete_subscriptions(params.SubscriptionIds)
response = ua.DeleteSubscriptionsResponse()
response.Results = results
_logger.info("sending delete subscription response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
_logger.info("create monitored items request")
params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
results = await self.session.create_monitored_items(params)
response = ua.CreateMonitoredItemsResponse()
response.Results = results
_logger.info("sending create monitored items response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
def _receive(self, msg):
self._check_incoming_chunk(msg)
self._incoming_parts.append(msg)
if msg.MessageHeader.ChunkType == ua.ChunkType.Intermediate:
return None
if msg.MessageHeader.ChunkType == ua.ChunkType.Abort:
err = struct_from_binary(ua.ErrorMessage, ua.utils.Buffer(msg.Body))
logger.warning(f"Message {msg} aborted: {err}")
# specs Part 6, 6.7.3 say that aborted message shall be ignored
# and SecureChannel should not be closed
self._incoming_parts = []
return None
if msg.MessageHeader.ChunkType == ua.ChunkType.Single:
message = ua.Message(self._incoming_parts)
self._incoming_parts = []
return message
raise ua.UaError(f"Unsupported chunk type: {msg}")
async def find_servers(self, params):
self.logger.debug("find_servers")
request = ua.FindServersRequest()
request.Parameters = params
data = await self.protocol.send_request(request)
response = struct_from_binary(ua.FindServersResponse, data)
self.logger.debug(response)
response.ResponseHeader.ServiceResult.check()
return response.Servers
response.Endpoints = endpoints
_logger.info("sending get endpoints response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
_logger.info("find servers request")
params = struct_from_binary(ua.FindServersParameters, body)
servers = self.iserver.find_servers(params)
response = ua.FindServersResponse()
response.Servers = servers
_logger.info("sending find servers response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
_logger.info("register server request")
serv = struct_from_binary(ua.RegisteredServer, body)
self.iserver.register_server(serv)
response = ua.RegisterServerResponse()
_logger.info("sending register server response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
_logger.info("register server 2 request")
params = struct_from_binary(ua.RegisterServer2Parameters, body)
results = self.iserver.register_server2(params)
response = ua.RegisterServer2Response()
response.ConfigurationResults = results
_logger.info("sending register server 2 response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
_logger.info("translate browsepaths to nodeids request")
response.Results = results
_logger.info("sending add node response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
_logger.info("delete nodes request")
params = struct_from_binary(ua.DeleteNodesParameters, body)
results = await self.session.delete_nodes(params)
response = ua.DeleteNodesResponse()
response.Results = results
_logger.info("sending delete node response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
_logger.info("add references request")
params = struct_from_binary(ua.AddReferencesParameters, body)
results = await self.session.add_references(params.ReferencesToAdd)
response = ua.AddReferencesResponse()
response.Results = results
_logger.info("sending add references response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
_logger.info("delete references request")
params = struct_from_binary(ua.DeleteReferencesParameters, body)
results = await self.session.delete_references(params.ReferencesToDelete)
response = ua.DeleteReferencesResponse()
response.Parameters.Results = results
_logger.info("sending delete references response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
response = ua.FindServersResponse()
response.Servers = servers
# _logger.info("sending find servers response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
_logger.info("register server request %s", user)
serv = struct_from_binary(ua.RegisteredServer, body)
self.iserver.register_server(serv)
response = ua.RegisterServerResponse()
# _logger.info("sending register server response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
_logger.info("register server 2 request %s", user)
params = struct_from_binary(ua.RegisterServer2Parameters, body)
results = self.iserver.register_server2(params)
response = ua.RegisterServer2Response()
response.ConfigurationResults = results
# _logger.info("sending register server 2 response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
_logger.info("translate browsepaths to nodeids request (%s)", user)
params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
paths = await self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
response = ua.TranslateBrowsePathsToNodeIdsResponse()
response.Results = paths
# _logger.info("sending translate browsepaths to nodeids response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
async def create_monitored_items(self, params):
self.logger.info("create_monitored_items")
request = ua.CreateMonitoredItemsRequest()
request.Parameters = params
data = await self.protocol.send_request(request)
response = struct_from_binary(ua.CreateMonitoredItemsResponse, data)
self.logger.debug(response)
response.ResponseHeader.ServiceResult.check()
return response.Results