Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, address=None, values=None, **kwargs):
''' Initializes a new instance
:param address: The starting request address
:param values: The values to write
'''
ModbusRequest.__init__(self, **kwargs)
self.address = address
if not values: values = []
elif not hasattr(values, '__iter__'): values = [values]
self.values = values
self.byte_count = (len(self.values) + 7) // 8
def testSyncTcpClientInstantiation(self):
client = ModbusTcpClient()
self.assertNotEqual(client, None)
def testBasicSyncTcpClient(self, mock_select):
''' Test the basic methods for the tcp sync client'''
# receive/send
mock_select.select.return_value = [True]
client = ModbusTcpClient()
client.socket = mockSocket()
self.assertEqual(0, client._send(None))
self.assertEqual(1, client._send(b'\x00'))
self.assertEqual(b'\x00', client._recv(1))
# connect/disconnect
self.assertTrue(client.connect())
client.close()
# already closed socket
client.socket = False
client.close()
self.assertEqual("ModbusTcpClient(127.0.0.1:502)", str(client))
def testTcpClientRegister(self):
class CustomeRequest:
function_code = 79
client = ModbusTcpClient()
client.framer = Mock()
client.register(CustomeRequest)
assert client.framer.decoder.register.called_once_with(CustomeRequest)
# -----------------------------------------------------------------------#
#MLE
23:0xF27C,#PDU22
24:0x50B0,
25:0x9A6B,
26:0xBFBF,
#MBE
27:0xBFBF,#PDU26
28:0x6B9A,
29:0xB050,
30:0x7CF2
})
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
context = ModbusServerContext(slaves=store, single=True)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
# If you don't set this or any fields, they are defaulted to empty strings.
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = 'Pymodbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/riptideio/pymodbus/'
identity.ProductName = 'Pymodbus Server'
identity.ModelName = 'Pymodbus Server'
identity.MajorMinorRevision = '1.5'
# ----------------------------------------------------------------------- #
# run the server you want
def testWriteMultipleRegisterRequest(self):
context = MockContext()
request = WriteMultipleRegistersRequest(0x00, [0x00]*10)
result = request.execute(context)
self.assertEqual(result.exception_code, ModbusExceptions.IllegalAddress)
request.count = 0x05 # bytecode != code * 2
result = request.execute(context)
self.assertEqual(result.exception_code, ModbusExceptions.IllegalValue)
request.count = 0x800 # outside of range
result = request.execute(context)
self.assertEqual(result.exception_code, ModbusExceptions.IllegalValue)
context.valid = True
request = WriteMultipleRegistersRequest(0x00, [0x00]*10)
result = request.execute(context)
self.assertEqual(result.function_code, request.function_code)
def testTcpTwistedClient(self):
"""
Test the TCP Twisted client
:return:
"""
from twisted.internet import reactor
with patch("twisted.internet.reactor") as mock_reactor:
def test_callback(client):
pass
def test_errback(client):
pass
AsyncModbusTCPClient(schedulers.REACTOR,
framer=ModbusSocketFramer(ClientDecoder()),
callback=test_callback,
errback=test_errback)
def testSerialClientDisconnect(self, mock_serial, mock_seriostream, mock_ioloop):
""" Test the tornado serial client client disconnect """
client = AsyncModbusSerialClient(ioloop=schedulers.IO_LOOP,
framer=ModbusRtuFramer(
ClientDecoder()),
port=SERIAL_PORT)
client.connect()
self.assertTrue(client._connected)
def handle_failure(failure):
self.assertTrue(isinstance(failure.exception(), ConnectionException))
d = client._build_response(0x00)
d.add_done_callback(handle_failure)
client.close()
self.assertFalse(client._connected)
def testClientProtocolClose(self, protocol):
"""
Test the client protocol close
:return:
"""
protocol = protocol(ModbusSocketFramer(ClientDecoder()))
transport = mock.MagicMock()
factory = mock.MagicMock()
if isinstance(protocol, ModbusUdpClientProtocol):
protocol.factory = factory
protocol.connection_made(transport)
assert protocol.transport == transport
assert protocol.connected
protocol.close()
transport.close.assert_called_once_with()
assert not protocol.connected
def ascii_framer():
return ModbusAsciiFramer(ClientDecoder())