How to use the canmatrix.ArbitrationId.from_pgn function in canmatrix

To help you get started, we’ve selected a few canmatrix examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github ebroecker / canmatrix / src / canmatrix / j1939_decoder.py View on Github external
elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 255:
            # Connection Abort
            self.abort_reason = can_data[1]
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding Connection Abbort not yet implemented")


        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xEEFF).pgn:
            #Address Claimed
            #arbitration_id.j1939_source
            #name in can_data[0:8]
            return ("ERROR - address claim detected not yet implemented")
            pass

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xEBFF).pgn:
            # transfer data

            self._data = self._data + can_data[1:min(8, self.bytes_left + 1)]
            self.bytes_left = max(self.bytes_left - 7, 0)

            if self.count_succesive_frames == 0:
                #print(self._data)
                frame = matrix.frame_by_pgn(self.transfered_pgn)
                if frame is not None:
                    signals = frame.decode(self._data)
                    return ("BAM last data", signals)
                return ("BAM last data", {})
            else:
                self.count_succesive_frames -= 1
                return ("BAM data     ", {})
        return ("",{})
github ebroecker / canmatrix / src / canmatrix / j1939_decoder.py View on Github external
def decode(self, arbitration_id, can_data, matrix = None):
        if matrix is not None:
            frame = matrix.frame_by_pgn(arbitration_id.pgn)
        else:
            frame = None
        if frame is not None:
            return ("regular " + frame.name, frame.decode(can_data))
        elif self.j1939_db.frame_by_pgn(arbitration_id.pgn) is not None:
            signals = self.j1939_db.decode(arbitration_id,can_data)
            frame_name = self.j1939_db.frame_by_pgn(arbitration_id.pgn).name
            return ("J1939 known: " + frame_name, signals)

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 32:
            # BAM detected
            self.length = (int(can_data[2]) << 8) + int(can_data[1])
            self.count_succesive_frames = int(can_data[3])
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            self.bytes_left = self.length
            self._data = bytearray()
            return ("BAM          ", {})

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 16:
            # RTS detected
            self.length = (int(can_data[2]) << 8) + int(can_data[1])
            self.count_of_packets = int(can_data[3])
            self.total_count_of_packet_sent = int(can_data[4])
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding RTS not yet implemented")
github ebroecker / canmatrix / src / canmatrix / j1939_decoder.py View on Github external
elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 16:
            # RTS detected
            self.length = (int(can_data[2]) << 8) + int(can_data[1])
            self.count_of_packets = int(can_data[3])
            self.total_count_of_packet_sent = int(can_data[4])
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding RTS not yet implemented")

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 17:
            # CTS detected
            self.max_packets_at_once = can_data[1]
            self.sequence_number_to_start = can_data[2]
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding CTS not yet implemented")

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 19:
            # ACK detected
            self.message_size = (int(can_data[2]) << 8) + int(can_data[1])
            self.count_of_packets = int(can_data[3])
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding ACK not yet implemented")

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 255:
            # Connection Abort
            self.abort_reason = can_data[1]
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding Connection Abbort not yet implemented")


        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xEEFF).pgn:
            #Address Claimed
            #arbitration_id.j1939_source
github ebroecker / canmatrix / src / canmatrix / j1939_decoder.py View on Github external
elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 17:
            # CTS detected
            self.max_packets_at_once = can_data[1]
            self.sequence_number_to_start = can_data[2]
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding CTS not yet implemented")

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 19:
            # ACK detected
            self.message_size = (int(can_data[2]) << 8) + int(can_data[1])
            self.count_of_packets = int(can_data[3])
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding ACK not yet implemented")

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 255:
            # Connection Abort
            self.abort_reason = can_data[1]
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding Connection Abbort not yet implemented")


        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xEEFF).pgn:
            #Address Claimed
            #arbitration_id.j1939_source
            #name in can_data[0:8]
            return ("ERROR - address claim detected not yet implemented")
            pass

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xEBFF).pgn:
            # transfer data
github ebroecker / canmatrix / src / canmatrix / j1939_decoder.py View on Github external
elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 19:
            # ACK detected
            self.message_size = (int(can_data[2]) << 8) + int(can_data[1])
            self.count_of_packets = int(can_data[3])
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding ACK not yet implemented")

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xECFF).pgn and can_data[0] == 255:
            # Connection Abort
            self.abort_reason = can_data[1]
            self.transfered_pgn = (int(can_data[7]) << 16) + (int(can_data[6]) << 8) + int(can_data[5])
            return ("ERROR - decoding Connection Abbort not yet implemented")


        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xEEFF).pgn:
            #Address Claimed
            #arbitration_id.j1939_source
            #name in can_data[0:8]
            return ("ERROR - address claim detected not yet implemented")
            pass

        elif arbitration_id.pgn == canmatrix.ArbitrationId.from_pgn(0xEBFF).pgn:
            # transfer data

            self._data = self._data + can_data[1:min(8, self.bytes_left + 1)]
            self.bytes_left = max(self.bytes_left - 7, 0)

            if self.count_succesive_frames == 0:
                #print(self._data)
                frame = matrix.frame_by_pgn(self.transfered_pgn)
                if frame is not None: