Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_check_slaveaddress(slaveaddress)
_check_mode(mode)
_check_functioncode(functioncode, None)
plainresponse = response
# Validate response length
if mode == MODE_ASCII:
if len(response) < MINIMAL_RESPONSE_LENGTH_ASCII:
raise InvalidResponseError(
"Too short Modbus ASCII response (minimum length {} bytes). Response: {!r}".format(
MINIMAL_RESPONSE_LENGTH_ASCII, response
)
)
elif len(response) < MINIMAL_RESPONSE_LENGTH_RTU:
raise InvalidResponseError(
"Too short Modbus RTU response (minimum length {} bytes). Response: {!r}".format(
MINIMAL_RESPONSE_LENGTH_RTU, response
)
)
if mode == MODE_ASCII:
# Validate the ASCII header and footer.
if response[_BYTEPOSITION_FOR_ASCII_HEADER] != _ASCII_HEADER:
raise InvalidResponseError(
"Did not find header "
+ "({!r}) as start of ASCII response. The plain response is: {!r}".format(
_ASCII_HEADER, response
)
)
elif response[-len(_ASCII_FOOTER):] != _ASCII_FOOTER:
calculate_checksum = _calculate_crc_string
number_of_checksum_bytes = NUMBER_OF_CRC_BYTES
received_checksum = response[-number_of_checksum_bytes:]
response_without_checksum = response[0:(len(response) - number_of_checksum_bytes)]
calculated_checksum = calculate_checksum(response_without_checksum)
if received_checksum != calculated_checksum:
template = (
"Checksum error in {} mode: {!r} instead of {!r} . The response "
+ "is: {!r} (plain response: {!r})"
)
text = template.format(
mode, received_checksum, calculated_checksum, response, plainresponse
)
raise InvalidResponseError(text)
# Check slave address
responseaddress = ord(response[_BYTEPOSITION_FOR_SLAVEADDRESS])
if responseaddress != slaveaddress:
raise InvalidResponseError(
"Wrong return slave address: {} instead of {}. The response is: {!r}".format(
responseaddress, slaveaddress, response
)
)
# Check if slave indicates error
_check_response_slaveerrorcode(response)
# Check function code
received_functioncode = ord(response[_BYTEPOSITION_FOR_FUNCTIONCODE])
minvalue=1,
maxvalue=max(
_MAX_NUMBER_OF_REGISTERS_TO_READ, _MAX_NUMBER_OF_REGISTERS_TO_WRITE
),
description="number of registers",
)
BYTERANGE_FOR_NUMBER_OF_REGISTERS = slice(2, 4)
bytes_for_mumber_of_registers = payload[BYTERANGE_FOR_NUMBER_OF_REGISTERS]
received_number_of_written_registers = _twobyte_string_to_num(
bytes_for_mumber_of_registers
)
if received_number_of_written_registers != number_of_registers:
raise InvalidResponseError(
"Wrong number of registers to write in the response: "
+ "{0}, but commanded is {1}. The data payload is: {2!r}".format(
received_number_of_written_registers, number_of_registers, payload
)
"Too short Modbus ASCII response (minimum length {} bytes). Response: {!r}".format(
MINIMAL_RESPONSE_LENGTH_ASCII, response
)
)
elif len(response) < MINIMAL_RESPONSE_LENGTH_RTU:
raise InvalidResponseError(
"Too short Modbus RTU response (minimum length {} bytes). Response: {!r}".format(
MINIMAL_RESPONSE_LENGTH_RTU, response
)
)
if mode == MODE_ASCII:
# Validate the ASCII header and footer.
if response[_BYTEPOSITION_FOR_ASCII_HEADER] != _ASCII_HEADER:
raise InvalidResponseError(
"Did not find header "
+ "({!r}) as start of ASCII response. The plain response is: {!r}".format(
_ASCII_HEADER, response
)
)
elif response[-len(_ASCII_FOOTER):] != _ASCII_FOOTER:
raise InvalidResponseError(
"Did not find footer "
+ "({!r}) as end of ASCII response. The plain response is: {!r}".format(
_ASCII_FOOTER, response
)
)
# Strip ASCII header and footer
response = response[1:-2]
_check_response_writedata(
payload, _num_to_twobyte_string(value, number_of_decimals, signed=signed)
)
elif functioncode == 15:
# response number of bits
_check_response_number_of_registers(payload, number_of_bits)
elif functioncode == 16:
_check_response_number_of_registers(payload, number_of_registers)
# Response for read bits
if functioncode in [1, 2]:
registerdata = payload[_NUMBER_OF_BYTES_BEFORE_REGISTERDATA:]
expected_number_of_bytes = _calculate_number_of_bytes_for_bits(number_of_bits)
if len(registerdata) != expected_number_of_bytes:
raise InvalidResponseError(
"The data length is wrong for payloadformat BIT/BITS."
+ " Expected: {} Actual: {}.".format(
expected_number_of_bytes, len(registerdata)
)
)
# Response for read registers
if functioncode in [3, 4]:
registerdata = payload[_NUMBER_OF_BYTES_BEFORE_REGISTERDATA:]
number_of_register_bytes = number_of_registers * _NUMBER_OF_BYTES_PER_REGISTER
if len(registerdata) != number_of_register_bytes:
raise InvalidResponseError(
"The register data length is wrong. "
+ "Registerdata: {!r} bytes. Expected: {!r}.".format(
len(registerdata), number_of_register_bytes
)
_check_string(formatstring, description="formatstring", minlength=1)
_check_string(packed, description="packed string", minlength=1)
if sys.version_info[0] > 2:
packed = bytes(
packed, encoding="latin1"
) # Convert types to make it Python3 compatible
try:
value = struct.unpack(formatstring, packed)[0]
except Exception:
errortext = (
"The received bytestring is probably wrong, as the bytestring-to-num "
)
errortext += "conversion failed. Bytestring: {0!r} Struct format code is: {1}"
raise InvalidResponseError(errortext.format(packed, formatstring))
return value
Raises:
TypeError, ValueError, InvalidResponseError
"""
_check_string(
payload, minlength=4, description="payload", exception_type=InvalidResponseError
)
_check_string(writedata, minlength=2, maxlength=2, description="writedata")
BYTERANGE_FOR_WRITEDATA = slice(2, 4)
received_writedata = payload[BYTERANGE_FOR_WRITEDATA]
if received_writedata != writedata:
raise InvalidResponseError(
"Wrong write data in the response: "
+ "{0!r}, but commanded is {1!r}. The data payload is: {2!r}".format(
received_writedata, writedata, payload
)
given_number_of_databytes = ord(payload[POSITION_FOR_GIVEN_NUMBER])
counted_number_of_databytes = len(payload) - NUMBER_OF_BYTES_TO_SKIP
if given_number_of_databytes != counted_number_of_databytes:
errortemplate = (
"Wrong given number of bytes in the response: "
+ "{0}, but counted is {1} as data payload length is {2}."
+ " The data payload is: {3!r}"
)
errortext = errortemplate.format(
given_number_of_databytes,
counted_number_of_databytes,
len(payload),
payload,
)
raise InvalidResponseError(errortext)
Raises:
TypeError, ValueError, InvalidResponseError
"""
_check_string(
payload, minlength=2, description="payload", exception_type=InvalidResponseError
)
_check_registeraddress(registeraddress)
BYTERANGE_FOR_STARTADDRESS = slice(0, 2)
bytes_for_startaddress = payload[BYTERANGE_FOR_STARTADDRESS]
received_startaddress = _twobyte_string_to_num(bytes_for_startaddress)
if received_startaddress != registeraddress:
raise InvalidResponseError(
"Wrong given write start adress: "
+ "{0}, but commanded is {1}. The data payload is: {2!r}".format(
received_startaddress, registeraddress, payload
)
def _check_response_registeraddress(payload, registeraddress):
"""Check that the start adress as given in the response is correct.
The first two bytes in the payload holds the address value.
Args:
* payload (string): The payload
* registeraddress (int): What the register address actually shoud be
(use decimal numbers, not hex).
Raises:
TypeError, ValueError, InvalidResponseError
"""
_check_string(
payload, minlength=2, description="payload", exception_type=InvalidResponseError
)
_check_registeraddress(registeraddress)
BYTERANGE_FOR_STARTADDRESS = slice(0, 2)
bytes_for_startaddress = payload[BYTERANGE_FOR_STARTADDRESS]
received_startaddress = _twobyte_string_to_num(bytes_for_startaddress)
if received_startaddress != registeraddress:
raise InvalidResponseError(
"Wrong given write start adress: "
+ "{0}, but commanded is {1}. The data payload is: {2!r}".format(
received_startaddress, registeraddress, payload
)