Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testTcpTwistedClient(self):
"""
Test the TCP Twisted client
:return:
"""
from twisted.internet import reactor
with patch("twisted.internet.reactor") as mock_reactor:
def test_callback(client):
pass
def test_errback(client):
pass
AsyncModbusTCPClient(schedulers.REACTOR,
framer=ModbusSocketFramer(ClientDecoder()),
callback=test_callback,
errback=test_errback)
def testSerialClientDisconnect(self, mock_serial, mock_seriostream, mock_ioloop):
""" Test the tornado serial client client disconnect """
client = AsyncModbusSerialClient(ioloop=schedulers.IO_LOOP,
framer=ModbusRtuFramer(
ClientDecoder()),
port=SERIAL_PORT)
client.connect()
self.assertTrue(client._connected)
def handle_failure(failure):
self.assertTrue(isinstance(failure.exception(), ConnectionException))
d = client._build_response(0x00)
d.add_done_callback(handle_failure)
client.close()
self.assertFalse(client._connected)
def ascii_framer():
return ModbusAsciiFramer(ClientDecoder())
""" The main runner function """
options = get_options()
if options.debug:
try:
log.setLevel(logging.DEBUG)
logging.basicConfig()
except Exception as ex:
print("Logging is not supported on this system")
# split the query into a starting and ending range
query = [int(p) for p in options.query.split(':')]
try:
log.debug("Initializing the client")
framer = ModbusSocketFramer(ClientDecoder())
reader = LoggingContextReader(options.output)
factory = ScraperFactory(framer, reader, query)
# how to connect based on TCP vs Serial clients
if isinstance(framer, ModbusSocketFramer):
reactor.connectTCP(options.host, options.port, factory)
else:
SerialModbusClient(factory, options.port, reactor)
log.debug("Starting the client")
reactor.run()
log.debug("Finished scraping the client")
except Exception as ex:
print(ex)
def __implementation(method, client):
""" Returns the requested framer
:method: The serial framer to instantiate
:returns: The requested serial framer
"""
method = method.lower()
if method == 'ascii':
return ModbusAsciiFramer(ClientDecoder(), client)
elif method == 'rtu':
return ModbusRtuFramer(ClientDecoder(), client)
elif method == 'binary':
return ModbusBinaryFramer(ClientDecoder(), client)
elif method == 'socket':
return ModbusSocketFramer(ClientDecoder(), client)
raise ParameterException("Invalid framer method requested")
"""
Closes underlying transport layer ,essentially closing the client
:return:
"""
if self.transport and hasattr(self.transport, "close"):
self.transport.close()
self._connected = False
class ModbusTcpClientProtocol(ModbusClientProtocol):
"""
Async TCP Client protocol based on twisted.
Default framer: ModbusSocketFramer
"""
framer = ModbusSocketFramer(ClientDecoder())
class ModbusSerClientProtocol(ModbusClientProtocol):
"""
Async Serial Client protocol based on twisted
Default framer: ModbusRtuFramer
"""
def __init__(self, framer=None, **kwargs):
framer = framer or ModbusRtuFramer(ClientDecoder())
super(ModbusSerClientProtocol, self).__init__(framer, **kwargs)
# --------------------------------------------------------------------------- #
# Not Connected Client Protocol
# --------------------------------------------------------------------------- #
def decode(self, message):
""" Attempt to decode the supplied message
:param message: The messge to decode
"""
if IS_PYTHON3:
value = message if self.encode else c.encode(message, 'hex_codec')
else:
value = message if self.encode else message.encode('hex')
print("="*80)
print("Decoding Message %s" % value)
print("="*80)
decoders = [
self.framer(ServerDecoder()),
self.framer(ClientDecoder()),
]
for decoder in decoders:
print("%s" % decoder.decoder.__class__.__name__)
print("-"*80)
try:
decoder.addToFrame(message)
if decoder.checkFrame():
decoder.advanceFrame()
decoder.processIncomingPacket(message, self.report)
else:
self.check_errors(decoder, message)
except Exception as ex:
self.check_errors(decoder, message)
@staticmethod
def __implementation(method):
''' Returns the requested framer
:method: The serial framer to instantiate
:returns: The requested serial framer
'''
method = method.lower()
if method == 'ascii': return ModbusAsciiFramer(ClientDecoder())
elif method == 'rtu': return ModbusRtuFramer(ClientDecoder())
elif method == 'binary': return ModbusBinaryFramer(ClientDecoder())
raise ParameterException("Invalid framer method requested")