Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def func2(parent, methodname, value):
if methodname == "panic":
return ua.StatusCode(ua.StatusCodes.BadOutOfMemory)
if methodname != "sin":
res = ua.CallMethodResult()
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadInvalidArgument)
res.InputArgumentResults = [ua.StatusCode(ua.StatusCodes.BadNotSupported), ua.StatusCode()]
return res
return math.sin(value)
def test_method_array(self):
o = self.opc.get_objects_node()
m = o.get_child("2:ServerMethodArray")
result = o.call_method(m, "sin", ua.Variant(math.pi))
self.assertTrue(result < 0.01)
with self.assertRaises(ua.UaStatusCodeError) as cm:
result = o.call_method(m, "cos", ua.Variant(math.pi))
self.assertEqual(cm.exception.code, ua.StatusCodes.BadInvalidArgument)
with self.assertRaises(ua.UaStatusCodeError) as cm:
result = o.call_method(m, "panic", ua.Variant(math.pi))
self.assertEqual(cm.exception.code, ua.StatusCodes.BadOutOfMemory)
def test_string_to_variant_status_code():
s_statuscode = "Good"
statuscode = ua.StatusCode(ua.StatusCodes.Good)
s_statuscode2 = "Uncertain"
statuscode2 = ua.StatusCode(ua.StatusCodes.Uncertain)
assert statuscode == string_to_val(s_statuscode, ua.VariantType.StatusCode)
assert statuscode2 == string_to_val(s_statuscode2, ua.VariantType.StatusCode)
def test_method_array(self):
o = self.opc.get_objects_node()
m = o.get_child("2:ServerMethodArray")
result = o.call_method(m, "sin", ua.Variant(math.pi))
self.assertTrue(result < 0.01)
with self.assertRaises(ua.UaStatusCodeError) as cm:
result = o.call_method(m, "cos", ua.Variant(math.pi))
self.assertEqual(cm.exception.code, ua.StatusCodes.BadInvalidArgument)
with self.assertRaises(ua.UaStatusCodeError) as cm:
result = o.call_method(m, "panic", ua.Variant(math.pi))
self.assertEqual(cm.exception.code, ua.StatusCodes.BadOutOfMemory)
def func2(parent, methodname, value):
if methodname == "panic":
return ua.StatusCode(ua.StatusCodes.BadOutOfMemory)
if methodname != "sin":
res = ua.CallMethodResult()
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadInvalidArgument)
res.InputArgumentResults = [ua.StatusCode(ua.StatusCodes.BadNotSupported), ua.StatusCode()]
return res
return math.sin(value)
async def delete_subscriptions(self, ids):
self.logger.info("delete subscriptions: %s", ids)
res = []
existing_subs = []
for i in ids:
sub = self.subscriptions.pop(i, None)
if sub is None:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
else:
existing_subs.append(sub)
res.append(ua.StatusCode())
await asyncio.gather(*[sub.stop() for sub in existing_subs])
return res
# If Identifier of requested NodeId is null we generate a new NodeId using
# the namespace of the nodeid, this is an extention of the spec to allow
# to requests the server to generate a new nodeid in a specified namespace
# self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
else:
if item.RequestedNewNodeId in self._aspace:
self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
return result
if item.ParentNodeId.is_null():
# self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
# item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
if check:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
return result
parentdata = self._aspace.get(item.ParentNodeId)
if parentdata is None and not item.ParentNodeId.is_null():
self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
item.RequestedNewNodeId, item.ParentNodeId)
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
return result
nodedata = NodeData(item.RequestedNewNodeId)
self._add_node_attributes(nodedata, item, add_timestamps=check)
# now add our node to db
self._aspace[nodedata.nodeid] = nodedata
def _add_node(self, item, user, check=True):
# self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
result = ua.AddNodesResult()
if not user.role == UserRole.Admin:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
return result
if item.RequestedNewNodeId.has_null_identifier():
# If Identifier of requested NodeId is null we generate a new NodeId using
# the namespace of the nodeid, this is an extention of the spec to allow
# to requests the server to generate a new nodeid in a specified namespace
# self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
else:
if item.RequestedNewNodeId in self._aspace:
self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
return result
if item.ParentNodeId.is_null():
# self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
self._connection.close()
response = ua.CloseSecureChannelResponse()
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
return False
elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
_logger.info("call request")
params = struct_from_binary(ua.CallParameters, body)
results = await self.session.call(params.MethodsToCall)
response = ua.CallResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
else:
_logger.warning("Unknown message received %s", typeid)
raise ServiceError(ua.StatusCodes.BadServiceUnsupported)
return True
return False
elif header.MessageType == ua.MessageType.SecureMessage:
return await self.process_message(msg.SequenceHeader(), msg.body())
elif isinstance(msg, ua.Hello):
ack = ua.Acknowledge()
ack.ReceiveBufferSize = msg.ReceiveBufferSize
ack.SendBufferSize = msg.SendBufferSize
data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
self._transport.write(data)
elif isinstance(msg, ua.ErrorMessage):
_logger.warning("Received an error message type")
elif msg is None:
pass # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
else:
_logger.warning("Unsupported message type: %s", header.MessageType)
raise ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
return True