Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def FINISH_test_connect_basic256(self):
c = Client(URL)
c.set_security_string("basic256,sign,XXXX")
c.connect()
c.disconnect()
async def test_basic256_encrypt(srv_crypto_all_certs):
_, cert = srv_crypto_all_certs
clt = Client(uri_crypto)
await clt.set_security_string(
f"Basic256Sha256,SignAndEncrypt,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem,{cert}")
async with clt:
assert await clt.nodes.objects.get_children()
def test_find_servers(self):
c = Client(URL)
res = c.connect_and_find_servers()
assert len(res) > 0
def test_find_endpoints(self):
c = Client(URL)
res = c.connect_and_get_server_endpoints()
assert len(res) > 0
def wrapper(self):
try:
client = Client(URL)
client.connect()
func(self, client)
finally:
client.disconnect()
return wrapper
async def task():
url = "opc.tcp://localhost:4840/freeopcua/server/"
# url = "opc.tcp://admin@localhost:4840/freeopcua/server/" #connect using a user
async with Client(url=url) as client:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
_logger.info("Objects node is: %r", root)
# Now getting a variable node using its browse path
obj = await root.get_child(["0:Objects", "2:MyObject"])
_logger.info("MyObject is: %r", obj)
myevent = await root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "2:MyFirstEvent"])
_logger.info("MyFirstEventType is: %r", myevent)
msclt = SubHandler()
sub = await client.create_subscription(100, msclt)
handle = await sub.subscribe_events(obj, myevent)
await asyncio.sleep(10)
await sub.unsubscribe(handle)
async def main():
url = 'opc.tcp://localhost:4840/freeopcua/server/'
async with Client(url=url) as client:
uri = 'http://examples.freeopcua.github.io'
idx = await client.get_namespace_index(uri)
var = await client.nodes.root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
nb = 4000
start = time.time()
attr = ua.WriteValue()
attr.NodeId = var.nodeid
attr.AttributeId = ua.AttributeIds.Value
attr.Value = ua.DataValue(ua.Variant(1.0, ua.VariantType.Float))
params = ua.WriteParameters()
params.NodesToWrite = [attr]
for i in range(nb):
params.NodesToWrite[0].Value.Value.Value = i
result = await client.uaclient.write(params)
#result[0].check()
async def task(loop):
url = "opc.tcp://0.0.0.0:4840/freeopcua/server/"
client = Client(url=url)
await client.set_security(
SecurityPolicyBasic256Sha256,
certificate_path=cert,
private_key_path=private_key,
server_certificate_path="certificate-example.der"
)
async with client:
objects = client.nodes.objects
child = await objects.get_child(['0:MyObject', '0:MyVariable'])
print(await child.get_value())
await child.set_value(42)
print(await child.get_value())
async def main():
url = 'opc.tcp://localhost:4840/freeopcua/server/'
# url = 'opc.tcp://commsvr.com:51234/UA/CAS_UA_Server'
async with Client(url=url) as client:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info('Children of root are: %r', await client.nodes.root.get_children())
uri = 'http://examples.freeopcua.github.io'
idx = await client.get_namespace_index(uri)
# get a specific node knowing its node id
# var = client.get_node(ua.NodeId(1002, 2))
# var = client.get_node("ns=3;i=2002")
var = await client.nodes.root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
print("My variable", var, await var.read_value())
# print(var)
async def main():
logging.basicConfig(level=logging.INFO)
#client = Client("opc.tcp://opcua.demo-this.com:51210/UA/SampleServer")
client = Client("opc.tcp://opcuaserver.com:48010")
client.name = "TOTO"
client.application_uri = "urn:freeopcua:clientasync"
async with client:
struct = client.get_node("ns=3;s=ControllerConfigurations")
#struct = client.get_node("ns=2;i=10239")
before = await struct.read_value()
#data = await client.load_type_definitions() # scan server for custom structures and import them. legacy code
data = await client.load_data_type_definitions() # scan server for custom structures and import them
after = await struct.read_value()
print("BEFORE", before)
print("AFTER", after)
embed()