Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testWriteMultipleRegisterRequest(self):
context = MockContext()
request = WriteMultipleRegistersRequest(0x00, [0x00]*10)
result = request.execute(context)
self.assertEqual(result.exception_code, ModbusExceptions.IllegalAddress)
request.count = 0x05 # bytecode != code * 2
result = request.execute(context)
self.assertEqual(result.exception_code, ModbusExceptions.IllegalValue)
request.count = 0x800 # outside of range
result = request.execute(context)
self.assertEqual(result.exception_code, ModbusExceptions.IllegalValue)
context.valid = True
request = WriteMultipleRegistersRequest(0x00, [0x00]*10)
result = request.execute(context)
self.assertEqual(result.function_code, request.function_code)
# if broadcasting then execute on all slave contexts, note response will be ignored
for unit_id in self.server.context.slaves():
response = request.execute(self.server.context[unit_id])
else:
context = self.server.context[request.unit_id]
response = request.execute(context)
except NoSuchSlaveException as ex:
_logger.debug("requested slave does "
"not exist: %s" % request.unit_id )
if self.server.ignore_missing_slaves:
return # the client will simply timeout waiting for a response
response = request.doException(merror.GatewayNoResponse)
except Exception as ex:
_logger.debug("Datastore unable to fulfill request: "
"%s; %s", ex, traceback.format_exc())
response = request.doException(merror.SlaveFailure)
# no response when broadcasting
if not broadcast:
response.transaction_id = request.transaction_id
response.unit_id = request.unit_id
self.send(response)
def _execute(self, request, **kwargs):
''' Executes the request and returns the result
:param request: The decoded request message
'''
try:
context = self.factory.store[request.unit_id]
response = request.execute(context)
except NoSuchSlaveException as ex:
_logger.debug("requested slave does not exist: %s" % request.unit_id )
if self.factory.ignore_missing_slaves:
return # the client will simply timeout waiting for a response
response = request.doException(merror.GatewayNoResponse)
except Exception as ex:
_logger.debug("Datastore unable to fulfill request: %s" % ex)
response = request.doException(merror.SlaveFailure)
#self.framer.populateResult(response)
response.transaction_id = request.transaction_id
response.unit_id = request.unit_id
self._send(response)
def execute(self, context):
''' Run a read exeception status request against the store
:param context: The datastore to request from
:returns: The populated response
'''
if not (0x0000 <= self.address <= 0xffff):
return self.doException(merror.IllegalValue)
if len(self.values) > 31:
return self.doException(merror.IllegalValue)
# TODO pull the values from some context
return ReadFifoQueueResponse(self.values)
def execute(self, request):
'''
Executes the request and returns the result
@param request The decoded request message
'''
self._request = request
try:
response = request.execute(self.request_callback.store)
except Exception, ex:
response = request.doException(merror.SlaveFailure)
#self.framer.populateResult(response)
response.transaction_id = request.transaction_id
response.uint_id = request.unit_id
self.send(response)
def execute(self, request):
"""
Executes the request and returns the result
@param request The decoded request message
"""
self._request = request
try:
context = self.application.store[request.unit_id]
response = request.execute(context)
except Exception, ex:
_logger.debug("Datastore unable to fulfill request: %s" % ex)
response = request.doException(merror.SlaveFailure)
#self.framer.populateResult(response)
response.transaction_id = request.transaction_id
response.unit_id = request.unit_id
self.send(response)
def _execute(self, request, addr):
''' Executes the request and returns the result
:param request: The decoded request message
'''
try:
context = self.store[request.unit_id]
response = request.execute(context)
except NoSuchSlaveException as ex:
_logger.debug("requested slave does not exist: %s" % request.unit_id )
if self.ignore_missing_slaves:
return # the client will simply timeout waiting for a response
response = request.doException(merror.GatewayNoResponse)
except Exception as ex:
_logger.debug("Datastore unable to fulfill request: %s" % ex)
response = request.doException(merror.SlaveFailure)
#self.framer.populateResult(response)
response.transaction_id = request.transaction_id
response.unit_id = request.unit_id
self._send(response, addr)
def execute(self, context):
''' Run a read exeception status request against the store
:param context: The datastore to request from
:returns: The populated response
'''
if not (0x0000 <= self.address <= 0xffff):
return self.doException(merror.IllegalValue)
if len(self.values) > 31:
return self.doException(merror.IllegalValue)
# TODO pull the values from some context
return ReadFifoQueueResponse(self.values)
def execute(self, context):
''' Run a read discrete input request against a datastore
Before running the request, we make sure that the request is in
the max valid range (0x001-0x7d0). Next we make sure that the
request is valid against the current datastore.
:param context: The datastore to request from
:returns: The initializes response message, exception message otherwise
'''
if not (1 <= self.count <= 0x7d0):
return self.doException(merror.IllegalValue)
if not context.validate(self.function_code, self.address, self.count):
return self.doException(merror.IllegalAddress)
values = context.getValues(self.function_code, self.address, self.count)
return ReadDiscreteInputsResponse(values)
def __str__(self):
""" Builds a representation of an exception response
:returns: The string representation of an exception response
"""
message = ModbusExceptions.decode(self.exception_code)
parameters = (self.function_code, self.original_code, message)
return "Exception Response(%d, %d, %s)" % parameters