Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for e in range(7, length + 1):
self.events.append(byte2int(data[e]))
def __str__(self):
''' Builds a representation of the response
:returns: The string representation of the response
'''
arguments = (self.function_code, self.status, self.message_count, self.event_count)
return "GetCommEventLogResponse(%d, %d, %d, %d)" % arguments
#---------------------------------------------------------------------------#
# TODO Make these only work on serial
#---------------------------------------------------------------------------#
class ReportSlaveIdRequest(ModbusRequest):
'''
This function code is used to read the description of the type, the
current status, and other information specific to a remote device.
'''
function_code = 0x11
_rtu_frame_size = 4
def __init__(self, **kwargs):
''' Initializes a new instance
'''
ModbusRequest.__init__(self, **kwargs)
def encode(self):
''' Encodes the message
'''
return b''
:param data: The packet data to decode
'''
self.address, value = struct.unpack('>HH', data)
self.value = (value != 0)
def __str__(self):
''' Returns a string representation of the instance
:returns: A string representation of the instance
'''
return "WriteCoilResponse(%d)" % self.address, self.value
#---------------------------------------------------------------------------#
# TODO Fix this so we can write more than false to multiple variables
#---------------------------------------------------------------------------#
class WriteMultipleCoilsRequest(ModbusRequest):
'''
"This function code is used to force each coil in a sequence of coils to
either ON or OFF in a remote device. The Request PDU specifies the coil
references to be forced. Coils are addressed starting at zero. Therefore
coil numbered 1 is addressed as 0.
The requested ON/OFF states are specified by contents of the request
data field. A logical '1' in a bit position of the field requests the
corresponding output to be ON. A logical '0' requests it to be OFF."
'''
function_code = 15
def __init__(self, address=None, count=None):
''' Initializes a new instance
:param address: The starting request address
def __init__(self):
''' Initializes a new instance
'''
ModbusRequest.__init__(self)
:param data: The data to decode into the address
'''
count, self.records = 1, []
byte_count = byte2int(data[0])
while count < byte_count:
decoded = struct.unpack('>BHHH', data[count:count+7])
response_length = decoded[3] * 2
count += response_length + 7
record = FileRecord(record_length=decoded[3],
file_number=decoded[1], record_number=decoded[2],
record_data=data[count - response_length:count])
if decoded[0] == 0x06: self.records.append(record)
class ReadFifoQueueRequest(ModbusRequest):
'''
This function code allows to read the contents of a First-In-First-Out
(FIFO) queue of register in a remote device. The function returns a
count of the registers in the queue, followed by the queued data.
Up to 32 registers can be read: the count, plus up to 31 queued data
registers.
The queue count register is returned first, followed by the queued data
registers. The function reads the queue contents, but does not clear
them.
'''
function_code = 0x18
_rtu_frame_size = 6
def __init__(self, address=0x0000, **kwargs):
''' Initializes a new instance
Currently not all implemented
'''
import struct
from pymodbus.constants import ModbusStatus
from pymodbus.pdu import ModbusRequest
from pymodbus.pdu import ModbusResponse
from pymodbus.device import ModbusControlBlock, DeviceInformationFactory
from pymodbus.compat import byte2int, int2byte
_MCB = ModbusControlBlock()
#---------------------------------------------------------------------------#
# TODO Make these only work on serial
#---------------------------------------------------------------------------#
class ReadExceptionStatusRequest(ModbusRequest):
'''
This function code is used to read the contents of eight Exception Status
outputs in a remote device. The function provides a simple method for
accessing this information, because the Exception Output references are
known (no output reference is needed in the function).
'''
function_code = 0x07
_rtu_frame_size = 4
def __init__(self, **kwargs):
''' Initializes a new instance
'''
ModbusRequest.__init__(self, **kwargs)
def encode(self):
''' Encodes the message
def __ne__(self, relf):
''' Compares the left object to the right
'''
return not self.__eq__(relf)
def __repr__(self):
''' Gives a representation of the file record
'''
params = (self.file_number, self.record_number, self.record_length)
return 'FileRecord(file=%d, record=%d, length=%d)' % params
#---------------------------------------------------------------------------#
# File Requests/Responses
#---------------------------------------------------------------------------#
class ReadFileRecordRequest(ModbusRequest):
'''
This function code is used to perform a file record read. All request
data lengths are provided in terms of number of bytes and all record
lengths are provided in terms of registers.
A file is an organization of records. Each file contains 10000 records,
addressed 0000 to 9999 decimal or 0x0000 to 0x270f. For example, record
12 is addressed as 12. The function can read multiple groups of
references. The groups can be separating (non-contiguous), but the
references within each group must be sequential. Each group is defined
in a seperate 'sub-request' field that contains seven bytes::
The reference type: 1 byte (must be 0x06)
The file number: 2 bytes
The starting record number within the file: 2 bytes
The length of the record to be read: 2 bytes
def __init__(self, read_code=None, object_id=0x00, **kwargs):
''' Initializes a new instance
:param read_code: The device information read code
:param object_id: The object to read from
'''
ModbusRequest.__init__(self, **kwargs)
self.read_code = read_code or DeviceInformation.Basic
self.object_id = object_id
def __init__(self, address=None, value=None):
''' Initializes a new instance
:param address: The address to start writing add
:param value: The values to write
'''
ModbusRequest.__init__(self)
self.address = address
self.value = value