Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ret += self.message
elif isinstance(self.message, list):
for r in self.message:
ret += struct.pack('>H', r)
elif isinstance(self.message, int):
ret += struct.pack('>H', self.message)
return ret
def decode(self, data):
''' Base decoder for a diagnostic request
:param data: The data to decode into the function code
'''
self.sub_function_code, self.message = struct.unpack('>HH', data)
class DiagnosticStatusResponse(ModbusResponse):
'''
This is a base class for all of the diagnostic response functions
It works by performing all of the encoding and decoding of variable
data and lets the higher classes define what extra data to append
and how to execute a request
'''
function_code = 0x08
def __init__(self):
'''
Base initializer for a diagnostic response
'''
ModbusResponse.__init__(self)
def encode(self):
:returns: The populated response
'''
information = DeviceInformationFactory.get(_MCB)
identifier = "-".join(information.values()).encode()
identifier = identifier or b'Pymodbus'
return ReportSlaveIdResponse(identifier)
def __str__(self):
''' Builds a representation of the request
:returns: The string representation of the request
'''
return "ResportSlaveIdRequest(%d)" % self.function_code
class ReportSlaveIdResponse(ModbusResponse):
'''
The format of a normal response is shown in the following example.
The data contents are specific to each type of device.
'''
function_code = 0x11
_rtu_byte_count_pos = 2
def __init__(self, identifier=b'\x00', status=True, **kwargs):
''' Initializes a new instance
:param identifier: The identifier of the slave
:param status: The status response to report
'''
ModbusResponse.__init__(self, **kwargs)
self.identifier = identifier
self.status = status
def get_response_pdu_size(self):
"""
Func_code (1 byte) + Output Address (2 byte) + Output Value (2 Bytes)
:return:
"""
return 1 + 2 + 2
def __str__(self):
''' Returns a string representation of the instance
:return: A string representation of the instance
'''
return "WriteCoilRequest(%d, %s) => " % (self.address, self.value)
class WriteSingleCoilResponse(ModbusResponse):
'''
The normal response is an echo of the request, returned after the coil
state has been written.
'''
function_code = 5
_rtu_frame_size = 8
def __init__(self, address=None, value=None, **kwargs):
''' Initializes a new instance
:param address: The variable address written to
:param value: The value written at address
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.value = value
def __init__(self, address=None, count=None, **kwargs):
''' Initializes a new instance
:param address: The address to start writing to
:param count: The number of registers to write to
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.count = count
if decoded[0] == 0x06: self.records.append(record)
def execute(self, context):
''' Run a read exeception status request against the store
:param context: The datastore to request from
:returns: The populated response
'''
# TODO do some new context operation here
# if file number, record number, or address + length
# is too big, return an error.
files = []
return ReadFileRecordResponse(files)
class ReadFileRecordResponse(ModbusResponse):
'''
The normal response is a series of 'sub-responses,' one for each
'sub-request.' The byte count field is the total combined count of
bytes in all 'sub-responses.' In addition, each 'sub-response'
contains a field that shows its own byte count.
'''
function_code = 0x14
_rtu_byte_count_pos = 2
def __init__(self, records=None, **kwargs):
''' Initializes a new instance
:param records: The requested file records
'''
ModbusResponse.__init__(self, **kwargs)
self.records = records or []
def __init__(self, values, **kwargs):
''' Initializes a new instance
:param values: The requested values to be returned
'''
ModbusResponse.__init__(self, **kwargs)
self.bits = values or []
if not context.validate(self.function_code, self.address, self.count):
return self.doException(merror.IllegalAddress)
context.setValues(self.function_code, self.address, self.values)
return WriteMultipleRegistersResponse(self.address, self.count)
def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
params = (self.address, self.count)
return "WriteMultipleRegisterRequest %d => %d" % params
class WriteMultipleRegistersResponse(ModbusResponse):
'''
"The normal response returns the function code, starting address, and
quantity of registers written.
'''
function_code = 16
_rtu_frame_size = 8
def __init__(self, address=None, count=None, **kwargs):
''' Initializes a new instance
:param address: The address to start writing to
:param count: The number of registers to write to
'''
ModbusResponse.__init__(self, **kwargs)
self.address = address
self.count = count