How to use the asyncua.ua.StatusCodes function in asyncua

To help you get started, we’ve selected a few asyncua examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github FreeOpcUa / opcua-asyncio / tests / test_common.py View on Github external
def func2(parent, methodname, value):
        if methodname == "panic":
            return ua.StatusCode(ua.StatusCodes.BadOutOfMemory)
        if methodname != "sin":
            res = ua.CallMethodResult()
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadInvalidArgument)
            res.InputArgumentResults = [ua.StatusCode(ua.StatusCodes.BadNotSupported), ua.StatusCode()]
            return res
        return math.sin(value)
github FreeOpcUa / opcua-asyncio / tests / tests_common.py View on Github external
def test_method_array(self):
        o = self.opc.get_objects_node()
        m = o.get_child("2:ServerMethodArray")
        result = o.call_method(m, "sin", ua.Variant(math.pi))
        self.assertTrue(result < 0.01)
        with self.assertRaises(ua.UaStatusCodeError) as cm:
            result = o.call_method(m, "cos", ua.Variant(math.pi))
        self.assertEqual(cm.exception.code, ua.StatusCodes.BadInvalidArgument)
        with self.assertRaises(ua.UaStatusCodeError) as cm:
            result = o.call_method(m, "panic", ua.Variant(math.pi))
        self.assertEqual(cm.exception.code, ua.StatusCodes.BadOutOfMemory)
github FreeOpcUa / opcua-asyncio / tests / test_unit.py View on Github external
def test_string_to_variant_status_code():
    s_statuscode = "Good"
    statuscode = ua.StatusCode(ua.StatusCodes.Good)
    s_statuscode2 = "Uncertain"
    statuscode2 = ua.StatusCode(ua.StatusCodes.Uncertain)
    assert statuscode == string_to_val(s_statuscode, ua.VariantType.StatusCode)
    assert statuscode2 == string_to_val(s_statuscode2, ua.VariantType.StatusCode)
github FreeOpcUa / opcua-asyncio / tests / tests_common.py View on Github external
def test_method_array(self):
        o = self.opc.get_objects_node()
        m = o.get_child("2:ServerMethodArray")
        result = o.call_method(m, "sin", ua.Variant(math.pi))
        self.assertTrue(result < 0.01)
        with self.assertRaises(ua.UaStatusCodeError) as cm:
            result = o.call_method(m, "cos", ua.Variant(math.pi))
        self.assertEqual(cm.exception.code, ua.StatusCodes.BadInvalidArgument)
        with self.assertRaises(ua.UaStatusCodeError) as cm:
            result = o.call_method(m, "panic", ua.Variant(math.pi))
        self.assertEqual(cm.exception.code, ua.StatusCodes.BadOutOfMemory)
github FreeOpcUa / opcua-asyncio / tests / tests_common.py View on Github external
def func2(parent, methodname, value):
        if methodname == "panic":
            return ua.StatusCode(ua.StatusCodes.BadOutOfMemory)
        if methodname != "sin":
            res = ua.CallMethodResult()
            res.StatusCode = ua.StatusCode(ua.StatusCodes.BadInvalidArgument)
            res.InputArgumentResults = [ua.StatusCode(ua.StatusCodes.BadNotSupported), ua.StatusCode()]
            return res
        return math.sin(value)
github FreeOpcUa / opcua-asyncio / asyncua / server / subscription_service.py View on Github external
async def delete_subscriptions(self, ids):
        self.logger.info("delete subscriptions: %s", ids)
        res = []
        existing_subs = []
        for i in ids:
            sub = self.subscriptions.pop(i, None)
            if sub is None:
                res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
            else:
                existing_subs.append(sub)
                res.append(ua.StatusCode())
        await asyncio.gather(*[sub.stop() for sub in existing_subs])
        return res
github FreeOpcUa / opcua-asyncio / asyncua / server / address_space.py View on Github external
# If Identifier of requested NodeId is null we generate a new NodeId using
            # the namespace of the nodeid, this is an extention of the spec to allow
            # to requests the server to generate a new nodeid in a specified namespace
            # self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
        else:
            if item.RequestedNewNodeId in self._aspace:
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
                return result

        if item.ParentNodeId.is_null():
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
            # item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
            if check:
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
                return result

        parentdata = self._aspace.get(item.ParentNodeId)
        if parentdata is None and not item.ParentNodeId.is_null():
            self.logger.info("add_node: while adding node %s, requested parent node %s does not exists",
                             item.RequestedNewNodeId, item.ParentNodeId)
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
            return result

        nodedata = NodeData(item.RequestedNewNodeId)

        self._add_node_attributes(nodedata, item, add_timestamps=check)

        # now add our node to db
        self._aspace[nodedata.nodeid] = nodedata
github FreeOpcUa / opcua-asyncio / asyncua / server / address_space.py View on Github external
def _add_node(self, item, user, check=True):
        # self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
        result = ua.AddNodesResult()

        if not user.role == UserRole.Admin:
            result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
            return result

        if item.RequestedNewNodeId.has_null_identifier():
            # If Identifier of requested NodeId is null we generate a new NodeId using
            # the namespace of the nodeid, this is an extention of the spec to allow
            # to requests the server to generate a new nodeid in a specified namespace
            # self.logger.debug("RequestedNewNodeId has null identifier, generating Identifier")
            item.RequestedNewNodeId = self._aspace.generate_nodeid(item.RequestedNewNodeId.NamespaceIndex)
        else:
            if item.RequestedNewNodeId in self._aspace:
                self.logger.warning("AddNodesItem: Requested NodeId %s already exists", item.RequestedNewNodeId)
                result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
                return result

        if item.ParentNodeId.is_null():
            # self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
github FreeOpcUa / opcua-asyncio / asyncua / server / uaprocessor.py View on Github external
self._connection.close()
            response = ua.CloseSecureChannelResponse()
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
            return False

        elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
            _logger.info("call request")
            params = struct_from_binary(ua.CallParameters, body)
            results = await self.session.call(params.MethodsToCall)
            response = ua.CallResponse()
            response.Results = results
            self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)

        else:
            _logger.warning("Unknown message received %s", typeid)
            raise ServiceError(ua.StatusCodes.BadServiceUnsupported)

        return True
github FreeOpcUa / opcua-asyncio / asyncua / server / uaprocessor.py View on Github external
return False
            elif header.MessageType == ua.MessageType.SecureMessage:
                return await self.process_message(msg.SequenceHeader(), msg.body())
        elif isinstance(msg, ua.Hello):
            ack = ua.Acknowledge()
            ack.ReceiveBufferSize = msg.ReceiveBufferSize
            ack.SendBufferSize = msg.SendBufferSize
            data = uatcp_to_binary(ua.MessageType.Acknowledge, ack)
            self._transport.write(data)
        elif isinstance(msg, ua.ErrorMessage):
            _logger.warning("Received an error message type")
        elif msg is None:
            pass  # msg is a ChunkType.Intermediate of an ua.MessageType.SecureMessage
        else:
            _logger.warning("Unsupported message type: %s", header.MessageType)
            raise ServiceError(ua.StatusCodes.BadTcpMessageTypeInvalid)
        return True