Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# optional array index
if indx is not None:
request.propertyArrayIndex = indx
# optional priority
if priority is not None:
request.priority = priority
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
if _debug: ReadWritePropertyConsoleCmd._debug(" - iocb: %r", iocb)
# give it to the application
deferred(this_application.request_io, iocb)
# wait for it to complete
iocb.wait()
# do something for success
if iocb.ioResponse:
sys.stdout.write("ack\n")
# do something for error/reject/abort
if iocb.ioError:
sys.stdout.write(str(iocb.ioError) + '\n')
except Exception as error:
ReadWritePropertyConsoleCmd._exception("exception: %r", error)
request = WhoIsRouterToNetwork()
if network:
request.wirtnNetwork = int(network)
if destination:
request.pduDestination = Address(destination)
self._log.debug(
"WhoIsRouterToNetwork Destination : {}".format(destination)
)
else:
request.pduDestination = LocalBroadcast()
except:
self._log.error("WhoIsRouterToNetwork : invalid arguments")
return
iocb = IOCB((self.this_application.nsap.local_adapter, request)) # make an IOCB
iocb.set_timeout(2)
deferred(self.this_application.nse.request_io, iocb)
iocb.wait()
try:
self.init_routing_table(str(self.this_application.nse._iartn.pop()))
except IndexError:
pass
request = WhoIsRouterToNetwork()
if not args:
request.pduDestination = LocalBroadcast()
elif args[0].isdigit():
request.pduDestination = LocalBroadcast()
request.wirtnNetwork = int(args[0])
else:
request.pduDestination = Address(args[0])
if len(args) > 1:
request.wirtnNetwork = int(args[1])
except:
self._log.error("WhoIsRouterToNetwork : invalid arguments")
return
iocb = IOCB((self.this_application.nsap.local_adapter, request)) # make an IOCB
iocb.set_timeout(2)
deferred(self.this_application.nse.request_io, iocb)
iocb.wait()
try:
self.init_routing_table(str(self.this_application.nse._iartn.pop()))
except IndexError:
pass
request = ReadPropertyRequest(
objectIdentifier=obj_id,
propertyIdentifier=prop_id,
)
request.pduDestination = Address(addr)
if len(args) == 1:
request.propertyArrayIndex = int(args[0])
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
if _debug: ReadWritePropertyConsoleCmd._debug(" - iocb: %r", iocb)
# give it to the application
deferred(this_application.request_io, iocb)
# wait for it to complete
iocb.wait()
# do something for success
if iocb.ioResponse:
apdu = iocb.ioResponse
# should be an ack
if not isinstance(apdu, ReadPropertyACK):
if _debug: ReadWritePropertyConsoleCmd._debug(" - not an ack")
return
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype)
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = ReadPointListApplication(point_list, this_device, args.ini.address)
# fire off a request when the core has a chance
deferred(this_application.next_request)
_log.debug("running")
run()
# dump out the results
for request, response in zip(point_list, this_application.response_values):
print(request, response)
_log.debug("fini")
# create the VLAN router, bind it to the local network
router = VLANRouter(local_address, local_network)
# create a VLAN
vlan = Network(broadcast_address=LocalBroadcast())
# create a node for the router, address 1 on the VLAN
router_addr = Address(1)
router_node = Node(router_addr)
vlan.add_node(router_node)
# bind the router stack to the vlan network through this node
router.nsap.bind(router_node, vlan_network, router_addr)
# send network topology
deferred(router.nse.i_am_router_to_network)
# device identifier is assigned from the address
device_instance = vlan_network * 100 + int(args.addr2)
_log.debug(" - device_instance: %r", device_instance)
# make a vlan device object
vlan_device = \
LocalDeviceObject(
objectName="VLAN Node %d" % (device_instance,),
objectIdentifier=('device', device_instance),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=15,
)
_log.debug(" - vlan_device: %r", vlan_device)
parser.add_argument('net2', type=int,
help='network number of second network',
)
# now parse the arguments
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# create the router
router = IP2IPRouter(Address(args.addr1), args.net1, Address(args.addr2), args.net2)
if _debug: _log.debug(" - router: %r", router)
# send network topology
deferred(router.nse.i_am_router_to_network)
_log.debug("running")
run()
_log.debug("fini")
if _debug: _log.debug(" - property_list: %r", property_list)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a simple application
this_application = ReadPropertyApplication(this_device, args.ini.address)
# fire off a request when the core has a chance
deferred(this_application.next_request)
_log.debug("running")
run()
_log.debug("fini")
deferred(vlan1_app.who_is)
#
# Test 2
#
if args.test_id == 2:
# make a read request
read_property_request = ReadPropertyRequest(
destination=Address("2:2"),
objectIdentifier=("device", 202),
propertyIdentifier="objectName",
)
# ask the first device to send it
deferred(vlan1_app.request, read_property_request)
#
# Test 3
#
if args.test_id == 3:
# make a read request
read_property_request = ReadPropertyRequest(
destination=Address("3:2"),
objectIdentifier=("device", 302),
propertyIdentifier="objectName",
)
# ask the first device to send it
deferred(vlan1_app.request, read_property_request)
_log.debug(" - router2 bound to VLAN-4")
# make the application, add it to the network
vlan4_app = VLANApplication(
objectName="VLAN Node 402", deviceInstance=402, address=2
)
vlan4.add_node(vlan4_app.vlan_node)
_log.debug(" - vlan4_app: %r", vlan4_app)
#
# Test 1
#
if args.test_id == 1:
# ask the first device to Who-Is everybody
deferred(vlan1_app.who_is)
#
# Test 2
#
if args.test_id == 2:
# make a read request
read_property_request = ReadPropertyRequest(
destination=Address("2:2"),
objectIdentifier=("device", 202),
propertyIdentifier="objectName",
)
# ask the first device to send it
deferred(vlan1_app.request, read_property_request)