Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testClientProtocolClose(self, protocol):
"""
Test the client protocol close
:return:
"""
protocol = protocol(ModbusSocketFramer(ClientDecoder()))
transport = mock.MagicMock()
factory = mock.MagicMock()
if isinstance(protocol, ModbusUdpClientProtocol):
protocol.factory = factory
protocol.connection_made(transport)
assert protocol.transport == transport
assert protocol.connected
protocol.close()
transport.close.assert_called_once_with()
assert not protocol.connected
def testClientProtocolConnect(self):
''' Test the client protocol connect '''
decoder = object()
framer = ModbusSocketFramer(decoder)
protocol = ModbusClientProtocol(framer=framer)
self.assertFalse(protocol._connected)
protocol.connectionMade()
self.assertTrue(protocol._connected)
def testClientProtocolExecute(self, protocol):
''' Test the client protocol execute method '''
framer = ModbusSocketFramer(None)
protocol = protocol(framer=framer)
transport = mock.MagicMock()
protocol.connection_made(transport)
protocol.transport.write = mock.Mock()
request = ReadCoilsRequest(1, 1)
d = protocol.execute(request)
tid = request.transaction_id
assert d == protocol.transaction.getTransaction(tid)
def __init__(self, framer=None, **kwargs):
self._connected = False
self.framer = framer or ModbusSocketFramer(ClientDecoder())
if isinstance(self.framer, type):
# Framer class not instance
self.framer = self.framer(ClientDecoder(), client=None)
if isinstance(self.framer, ModbusSocketFramer):
self.transaction = DictTransactionManager(self, **kwargs)
else:
self.transaction = FifoTransactionManager(self, **kwargs)
pymodbus/pymodbus/client/sync.py). Let it run, then fix the self.framer
later... We know that self.transaction is OK, because framer isn't a
ModbusSocketFramer.
The methods to connect are::
- ascii
- rtu
- binary
'''
# If a 'framer' is supplied, use it (and come up with a self.method name)
super( modbus_client_rtu, self ).__init__( method=method, **kwargs )
if framer is not None:
assert not isinstance( self.framer, ModbusSocketFramer )
assert not isinstance( framer, ModbusSocketFramer )
self.method = framer.__name__
self.framer = framer( ClientDecoder() )
def __init__(self, address, port = MODBUS_PORT):
block = ModbusSequentialDataBlock(0x00, [0]*0x3ff)
store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
self.context = ModbusServerContext(slaves=store, single=True)
identity = ModbusDeviceIdentification()
identity.VendorName = 'MockPLCs'
identity.ProductCode = 'MP'
identity.VendorUrl = 'http://github.com/bashwork/pymodbus/'
identity.ProductName = 'MockPLC 3000'
identity.ModelName = 'MockPLC Ultimate'
identity.MajorMinorRevision = '1.0'
ModbusServerFactory.__init__(self, self.context, ModbusSocketFramer, identity)
def StartServer(context, identity=None):
''' Helper method to start the Modbus Async TCP server
:param context: The server data context
:param identify: The server identity to use
'''
framer = ModbusSocketFramer
modbus_server = ModbusServer(ModbusApplication(store=context, framer=framer, identity=identity))
modbus_server.listen(options.port)
ioloop.IOLoop.instance().start()
def main():
""" The main runner function
"""
option = get_options()
if option.debug:
try:
modbus_log.setLevel(logging.DEBUG)
logging.basicConfig()
except Exception as e:
print("Logging is not supported on this system")
framer = lookup = {
'tcp': ModbusSocketFramer,
'rtu': ModbusRtuFramer,
'binary': ModbusBinaryFramer,
'ascii': ModbusAsciiFramer,
}.get(option.framer, ModbusSocketFramer)(None)
generate_messages(framer, option)
def __init__(self, framer=None, **kwargs):
self._connected = False
self.framer = framer or ModbusSocketFramer(ClientDecoder())
if isinstance(self.framer, type):
# Framer class not instance
self.framer = self.framer(ClientDecoder(), client=None)
if isinstance(self.framer, ModbusSocketFramer):
self.transaction = DictTransactionManager(self, **kwargs)
else:
self.transaction = FifoTransactionManager(self, **kwargs)