How to use the opcua.Server function in opcua

To help you get started, we’ve selected a few opcua examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github FreeOpcUa / python-opcua / tests / tests_crypto_connect.py View on Github external
    @classmethod
    def setUpClass(self):
        # start our own server
        self.srv_crypto = Server()
        self.uri_crypto = 'opc.tcp://127.0.0.1:{0:d}'.format(port_num1)
        self.srv_crypto.set_endpoint(self.uri_crypto)
        # load server certificate and private key. This enables endpoints
        # with signing and encryption.
        self.srv_crypto.load_certificate("examples/certificate-example.der")
        self.srv_crypto.load_private_key("examples/private-key-example.pem")
        self.srv_crypto.start()

        # start a server without crypto
        self.srv_no_crypto = Server()
        self.uri_no_crypto = 'opc.tcp://127.0.0.1:{0:d}'.format(port_num2)
        self.srv_no_crypto.set_endpoint(self.uri_no_crypto)
        self.srv_no_crypto.start()
github FreeOpcUa / opcua-asyncio / tests / tests_server.py View on Github external
return # FIXME broken
        tmpfile = NamedTemporaryFile()
        path = tmpfile.name
        tmpfile.close()

        # create cache file
        server = Server(shelffile=path)

        # modify cache content
        id = ua.NodeId(ua.ObjectIds.Server_ServerStatus_SecondsTillShutdown)
        s = shelve.open(path, "w", writeback=True)
        s[id.to_string()].attributes[ua.AttributeIds.Value].value = ua.DataValue(123)
        s.close()

        # ensure that we are actually loading from the cache
        server = Server(shelffile=path)
        self.assertEqual(server.get_node(id).get_value(), 123)

        os.remove(path)
github FreeOpcUa / opcua-asyncio / tests / tests.py View on Github external
def setUpClass(self):
        self.srv = Server()
        self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num2)
        add_server_methods(self.srv)
        self.srv.start()
        self.opc = self.srv
        self.discovery = Server()
        self.discovery.set_application_uri("urn:freeopcua:python:discovery")
        self.discovery.set_endpoint('opc.tcp://localhost:%d' % port_discovery)
        self.discovery.start()
github FreeOpcUa / python-opcua / tests / tests_server.py View on Github external
    @classmethod
    def setUpClass(cls):
        cls.srv = Server()
        cls.srv.set_endpoint('opc.tcp://127.0.0.1:{0:d}'.format(port_num))
        add_server_methods(cls.srv)
        cls.srv.start()
        cls.opc = cls.srv
        cls.discovery = Server()
        cls.discovery.set_application_uri("urn:freeopcua:python:discovery")
        cls.discovery.set_endpoint('opc.tcp://127.0.0.1:{0:d}'.format(port_discovery))
        cls.discovery.start()
github FreeOpcUa / python-opcua / tests / tests_history.py View on Github external
    @classmethod
    def start_server_and_client(cls):
        cls.srv = Server()
        cls.srv.set_endpoint('opc.tcp://127.0.0.1:{0:d}'.format(port_num1))
        cls.srv.start()

        cls.clt = Client('opc.tcp://127.0.0.1:{0:d}'.format(port_num1))
        cls.clt.connect()
github akselov / digital-twin-opcua / opcua-server / opcua-server01.py View on Github external
print ("Check the code and uncomment the lines related to your python version.")
            self.disconnect()
            raise SystemExit
        elif ID == 3:
            print ("Error in write() statement.")
            print ("Variable value is not defined.")

if __name__ == "__main__":
    # Initialize OPC-UA Client connection
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

    # Setup robot object
    robot = KUKA('192.168.250.16')

    # Setup OPC-UA Server
    server = Server()
    server.set_endpoint("opc.tcp://192.168.250.200:50895")

    # Setup namespace
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # Get objects node
    objects = server.get_objects_node()

    # Populating address space
    myObject = objects.add_object(idx, "KUKA_KR16_2")

    # Adding desired variables
    A1 = myObject.add_variable(idx, "A1", 6.7)
    A2 = myObject.add_variable(idx, "A2", 6.7)
    A3 = myObject.add_variable(idx, "A3", 6.7)
github FreeOpcUa / python-opcua / examples / server-with-encryption.py View on Github external
import sys
import time

sys.path.insert(0, "..")

from opcua import ua, Server


if __name__ == "__main__":

    # setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

    # load server certificate and private key. This enables endpoints
    # with signing and encryption.
    server.load_certificate("certificate-example.der")
    server.load_private_key("private-key-example.pem")

    # setup our own namespace, not really necessary but should as spec
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # get Objects node, this is where we should put our custom stuff
    objects = server.get_objects_node()

    # populating our address space
    myobj = objects.add_object(idx, "MyObject")
github systerel / S2OPC / validation / server.py View on Github external
stopFlag = True


if __name__=='__main__':
    parser = argparse.ArgumentParser(description='FreeOpcUa test server')
    parser.add_argument('msTimeout', nargs='?', default=10000., type=float,
                        help='Server timeout (ms)')
    args = parser.parse_args()

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    # Open server
    print('Configuring FreeOpcUa test server')
    #logging.basicConfig(level=logging.DEBUG)
    server = Server()
    server.set_endpoint(sUri)

    cert_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'tests', 'data', 'cert')
    server.load_certificate(os.path.join(cert_dir, 'server_2k_cert.der'))
    server.load_private_key(os.path.join(cert_dir, 'server_2k_key.pem'))

    # Nodes are created under the Objects node
    objects = server.get_objects_node()
    # Use the same nodes as in the Read test
    for i,(sTypName,typ,val,_) in enumerate(variantInfoList):
        nid = 1000 + i + 1
        node = objects.add_variable(ua.NodeId(nid, 0), sTypName, ua.Variant(val, typ))
        node.set_writable()

    # Add a node which increments, the target of an interesting subscription
    nodeCnt = objects.add_variable(ua.NodeId('Counter', 0), 'Counter', ua.Variant(0, ua.VariantType.UInt64))
github FreeOpcUa / python-opcua / examples / server-custom-object.py View on Github external
1) create a basic object
   2) create a new object type and a instance of the new object type
   3) import a new object from xml address space and create a instance of the new object type
'''
import sys
sys.path.insert(0, "..")
import time


from opcua import ua, Server


if __name__ == "__main__":

    # setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

    # setup our own namespace, not really necessary but should as spec
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # get Objects node, this is where we should put our custom stuff
    objects = server.get_objects_node()

    # Example 1 - create a basic object
    #-------------------------------------------------------------------------------
    myobj = objects.add_object(idx, "MyObject")    
    #-------------------------------------------------------------------------------


    # Example 2 - create a new object type and a instance of the new object type
github FreeOpcUa / python-opcua / examples / server-instantiate-object.py View on Github external
import sys
sys.path.insert(0, "..")
import time
from IPython import embed


from opcua import ua, Server, instantiate


if __name__ == "__main__":

    # setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

    # setup our own namespace, not really necessary but should as spec
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # create our custom object type
    dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
    dev.add_variable(0, "sensor1", 1.0)
    dev.add_property(0, "device_id", "0340")
    ctrl = dev.add_object(0, "controller")
    ctrl.add_property(0, "state", "Idle")

    # instantiate our new object type
    mydevice = instantiate(server.nodes.objects, dev, bname="2:Device0001")
    #mydevice = server.nodes.objects.add_object(2, "Device0001", objecttype=dev)  # specificying objecttype to add_object also instanciate a node type