Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testUdpServerInitialize(self):
protocol = ModbusUdpProtocol(store=None)
self.assertEqual(protocol.control.Identity.VendorName, '')
identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
protocol = ModbusUdpProtocol(store=None, identity=identity)
self.assertEqual(protocol.control.Identity.VendorName, 'VendorName')
def testUdpServerSendData(self):
''' Test that the modbus udp asyncio server correctly sends data outbound '''
identity = ModbusDeviceIdentification(info={0x00: 'VendorName'})
data = b'x\01\x00\x00\x00\x00\x06\x01\x03\x00\x00\x00\x19'
server = yield from StartUdpServer(context=self.context,address=("127.0.0.1", 0))
if PYTHON_VERSION >= (3, 7):
server_task = asyncio.create_task(server.serve_forever())
else:
server_task = asyncio.ensure_future(server.serve_forever())
yield from server.serving
random_port = server.protocol._sock.getsockname()[1]
received = server.endpoint.datagram_received = Mock(wraps=server.endpoint.datagram_received)
done = self.loop.create_future()
received_value = None
class BasicClient(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
self.transport.sendto(data)
def run_updating_server():
# ----------------------------------------------------------------------- #
# initialize your data store
# ----------------------------------------------------------------------- #
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17]*100),
co=ModbusSequentialDataBlock(0, [17]*100),
hr=ModbusSequentialDataBlock(0, [17]*100),
ir=ModbusSequentialDataBlock(0, [17]*100))
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
identity.ProductName = 'pymodbus Server'
identity.ModelName = 'pymodbus Server'
identity.MajorMinorRevision = '2.2.0'
# ----------------------------------------------------------------------- #
# run the server you want
# ----------------------------------------------------------------------- #
time = 5 # 5 seconds delay
loop = LoopingCall(f=updating_writer, a=(context,))
loop.start(time, now=False) # initially delay by time
StartTcpServer(context, identity=identity, address=("localhost", 5020))
def run_custom_db_server():
# ----------------------------------------------------------------------- #
# initialize your data store
# ----------------------------------------------------------------------- #
block = CustomDataBlock([0]*100)
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
identity.ProductName = 'pymodbus Server'
identity.ModelName = 'pymodbus Server'
identity.MajorMinorRevision = '2.2.0'
# ----------------------------------------------------------------------- #
# run the server you want
# ----------------------------------------------------------------------- #
# p = Process(target=device_writer, args=(queue,))
# p.start()
StartTcpServer(context, identity=identity, address=("localhost", 5020))
:param identity: An optional identify structure
"""
self.decoder = ServerDecoder()
if isinstance(framer, IModbusFramer):
self.framer = framer
else: self.framer = ModbusSocketFramer
if isinstance(store, ModbusServerContext):
self.store = store
else: self.store = ModbusServerContext()
self.control = ModbusControlBlock()
self.access = ModbusAccessControl()
if isinstance(identity, ModbusDeviceIdentification):
self.control.Identity.update(identity)
# ----------------------------------------------------------------------- #
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17]*100),
co=ModbusSequentialDataBlock(0, [17]*100),
hr=ModbusSequentialDataBlock(0, [17]*100),
ir=ModbusSequentialDataBlock(0, [17]*100))
store.register(CustomModbusRequest.function_code, 'cm',
ModbusSequentialDataBlock(0, [17] * 100))
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
# If you don't set this or any fields, they are defaulted to empty strings.
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'Pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
identity.ProductName = 'Pymodbus Server'
identity.ModelName = 'Pymodbus Server'
identity.MajorMinorRevision = '2.2.0'
# ----------------------------------------------------------------------- #
# run the server you want
# ----------------------------------------------------------------------- #
# TCP Server
StartTcpServer(context, identity=identity, address=("localhost", 5020),
custom_functions=[CustomModbusRequest])
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [17] * 100),
co=ModbusSequentialDataBlock(0, [17] * 100),
hr=ModbusSequentialDataBlock(0, [17] * 100),
ir=ModbusSequentialDataBlock(0, [17] * 100))
store.register(CustomModbusRequest.function_code, 'cm',
ModbusSequentialDataBlock(0, [17] * 100))
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
# If you don't set this or any fields, they are defaulted to empty strings.
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'Pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/riptideio/pymodbus/'
identity.ProductName = 'Pymodbus Server'
identity.ModelName = 'Pymodbus Server'
identity.MajorMinorRevision = '2.3.0'
# ----------------------------------------------------------------------- #
# run the server you want
# ----------------------------------------------------------------------- #
# Tcp:
StartTcpServer(context, identity=identity, address=("localhost", 5020),
custom_functions=[CustomModbusRequest])
# initialize your data store
# ----------------------------------------------------------------------- #
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0,range(1,101)),
co=ModbusSequentialDataBlock(0,range(101,201)),
hr=ModbusSequentialDataBlock(0,range(201,301)),
ir=ModbusSequentialDataBlock(0,range(301,401)))
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
identity.ProductName = 'pymodbus Server'
identity.ModelName = 'pymodbus Server'
identity.MajorMinorRevision = '1.0'
# connect to simulation
HOST = '127.0.0.1'
PORT = 55555
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
# ----------------------------------------------------------------------- #
# run the server you want
# ----------------------------------------------------------------------- #
time = 1 # 5 seconds delay