How to use the opcua.Client function in opcua

To help you get started, we’ve selected a few opcua examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github FreeOpcUa / freeopcua / tests / test_highlevel.py View on Github external
def setUpClass(self):
        #start server in its own process
        self.srv = ServerProcess()
        self.srv.start()
        self.srv.started.wait() # let it initialize

        #start client
        self.clt = opcua.Client();
        self.clt.set_endpoint("opc.tcp://localhost:4841")
        self.clt.connect()
        self.opc = self.clt
github FreeOpcUa / python-opcua / tests / tests_crypto_connect.py View on Github external
def test_nocrypto(self):
        clt = Client(self.uri_no_crypto)
        clt.connect()
        try:
            clt.get_objects_node().get_children()
        finally:
            clt.disconnect()
github FreeOpcUa / python-opcua / tests / tests_server.py View on Github external
def test_discovery(self):
        client = Client(self.discovery.endpoint.geturl())
        client.connect()
        try:
            servers = client.find_servers()
            new_app_uri = "urn:freeopcua:python:server:test_discovery"
            self.srv.set_application_uri(new_app_uri)
            with RegistrationService() as regService:
                regService.register_to_discovery(self.srv, self.discovery.endpoint.geturl(), period=0)
                time.sleep(0.1)  # let server register registration
                new_servers = client.find_servers()
                self.assertEqual(len(new_servers) - len(servers) , 1)
                self.assertFalse(new_app_uri in [s.ApplicationUri for s in servers])
                self.assertTrue(new_app_uri in [s.ApplicationUri for s in new_servers])
        finally:
            client.disconnect()
github FreeOpcUa / opcua-asyncio / tests / tests.py View on Github external
def test_discovery(self):
        client = Client(self.discovery.endpoint.geturl())
        client.connect()
        try:
            servers = client.find_servers()
            new_app_uri = "urn:freeopcua:python:server:test_discovery"
            self.srv.application_uri = new_app_uri
            self.srv.register_to_discovery(self.discovery.endpoint.geturl(), 0)
            time.sleep(0.1) # let server register registration
            new_servers = client.find_servers()
            self.assertEqual(len(new_servers) - len(servers) , 1)
            self.assertFalse(new_app_uri in [s.ApplicationUri for s in servers])
            self.assertTrue(new_app_uri in [s.ApplicationUri for s in new_servers])
        finally:
            client.disconnect()
github FreeOpcUa / python-opcua / test_external_server.py View on Github external
def test_find_endpoints(self):
        c = Client(URL)
        res = c.connect_and_get_server_endpoints()
        self.assertTrue(len(res) > 0)
github FreeOpcUa / python-opcua / tests / tests_crypto_connect.py View on Github external
def test_basic256sha56_encrypt_success(self):
        clt = Client(self.uri_crypto)
        try:
            clt.set_security(security_policies.SecurityPolicyBasic256Sha256,
                             'examples/certificate-example.der',
                             'examples/private-key-example.pem',
                             None,
                             ua.MessageSecurityMode.SignAndEncrypt
                             )
            clt.connect()
            self.assertTrue(clt.get_objects_node().get_children())
        finally:
            clt.disconnect()
github FreeOpcUa / python-opcua / test_external_server.py View on Github external
def test_connect_anonymous(self):
        c = Client(URL)
        c.connect()
        c.disconnect()
github FreeOpcUa / python-opcua / opcua / tools.py View on Github external
dest="array",
                        default="guess",
                        choices=["guess", "true", "false"],
                        help="Value is an array")
    parser.add_argument("-t",
                        "--datatype",
                        dest="datatype",
                        default="guess",
                        choices=["guess", 'byte', 'sbyte', 'nodeid', 'expandednodeid', 'qualifiedname', 'browsename', 'string', 'float', 'double', 'int16', 'int32', "int64", 'uint16', 'uint32', 'uint64', "bool", "string", 'datetime', 'bytestring', 'xmlelement', 'statuscode', 'localizedtext'],
                        help="Data type to return")
    parser.add_argument("value",
                        help="Value to be written",
                        metavar="VALUE")
    args = parse_args(parser, requirenodeid=True)

    client = Client(args.url, timeout=args.timeout)
    _configure_client_with_args(client, args)
    client.connect()
    try:
        node = get_node(client, args)
        val = _val_to_variant(args.value, args)
        node.set_attribute(args.attribute, ua.DataValue(val))
    finally:
        client.disconnect()
    sys.exit(0)
    print(args)
github FreeOpcUa / opcua-client-gui / uaclient / uaclient.py View on Github external
def get_endpoints(uri):
        client = Client(uri, timeout=2)
        client.connect_and_get_server_endpoints()
        edps = client.connect_and_get_server_endpoints()
        for i, ep in enumerate(edps, start=1):
            logger.info('Endpoint %s:', i)
            for (n, v) in endpoint_to_strings(ep):
                logger.info('  %s: %s', n, v)
            logger.info('')
        return edps
github FreeOpcUa / python-opcua / examples / client_to_kepware.py View on Github external
"""
    Client to subscription. It will receive events from server
    """

    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val)

    def event_notification(self, event):
        print("Python: New event", event)


if __name__ == "__main__":
    #from IPython import embed
    logging.basicConfig(level=logging.WARN)
    client = Client("opc.tcp://192.168.56.100:49320/OPCUA/SimulationServer/")
    #client = Client("opc.tcp://192.168.56.100:4840/OPCUA/SimulationServer/")
    #client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
    try:
        client.connect()
        root = client.get_root_node()
        print("Root is", root)
        print("childs of root are: ", root.get_children())
        print("name of root is", root.get_browse_name())
        objects = client.get_objects_node()
        print("childs og objects are: ", objects.get_children())


        tag1 = client.get_node("ns=2;s=Channel1.Device1.Tag1")
        print("tag1 is: {0} with value {1} ".format(tag1, tag1.get_value()))
        tag2 = client.get_node("ns=2;s=Channel1.Device1.Tag2")
        print("tag2 is: {0} with value {1} ".format(tag2, tag2.get_value()))