Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_encode_decode_signal_value_choice_str(self):
db_path = os.path.join(
os.path.dirname(__file__), "..", "test", "test.dbc")
for bus in formats.loadp(db_path).values():
test_frame1 = 0x123
data = {
'Signal': 'two'
}
data_bytes = tuple(bytearray(bus.encode(test_frame1, data)))
decoded = bus.decode(test_frame1, data_bytes, True)
for k, v in data.items():
assert str(decoded[k]) == v
def test_encode_decode_signal_value_choice_unicode(self):
db_path = os.path.join(
os.path.dirname(__file__), "..", "test", "test.dbc")
for bus in formats.loadp(db_path).values():
test_frame1 = 0x123
data = {
'Signal': u'two'
}
data_bytes = tuple(bytearray(bus.encode(test_frame1, data)))
decoded = bus.decode(test_frame1, data_bytes, True)
for k, v in data.items():
assert str(decoded[k]) == v
def test_encode_decode_signal_value(self):
db_path = os.path.join(
os.path.dirname(__file__), "..", "test", "test.dbc")
for bus in formats.loadp(db_path).values():
test_frame1 = 0x123
data = {
'Signal': 2,
'someTestSignal': 101,
}
data_bytes = tuple(bytearray(bus.encode(test_frame1, data)))
decoded = bus.decode(test_frame1, data_bytes, False)
for k, v in data.items():
assert decoded[k] == v
if len(sys.argv) <= 2:
print "not yet working script for generating a communication layer for dedicated ECU out of can database"
print "! missing support for sending cyclic messages"
print "! missing any TEST ! "
print " this code is just proofe of concept \n"
print "Usage: createccl.py CanDatabaseFile ECU_Name "
exit()
ccl_h = "#ifndef __CCL_H__\n#define __CCL_H__\n\n"
ccl_h += "typedef unsigned char uint8;\ntypedef unsigned int uint32;\n\n"
ccl_c = "#include \n#include \n#include \"ccl.h\"\n\n"
infile = sys.argv[1]
ecu = sys.argv[2]
dbs = canmatrix.formats.loadp(infile)
db = next(iter(dbs.values()))
receiveArray = []
receiveDict = {}
receiveIndex = 0
sendIndex = 0
txDict = {}
for frame in db.frames:
if ecu in frame.receivers:
receiveArray.append(frame.arbitration_id.id)
receiveDict[frame] = receiveIndex
receiveIndex += 1
if ecu in frame.transmitters:
txDict[frame] = sendIndex
sendIndex += 1
def join_frame_by_signal_start_bit(files): # type: (typing.List[str]) -> canmatrix.CanMatrix
target_db = next(iter(canmatrix.formats.loadp(files.pop(0)).values()))
pgn_x, id_x = list_pgn(db=target_db)
for f in files:
source_db = next(iter(canmatrix.formats.loadp(f).values()))
pgn_y, id_y = list_pgn(db=source_db)
same_pgn = ids_sharing_same_pgn(id_x, pgn_x, id_y, pgn_y)
for id_a, id_b in same_pgn:
# print("{0:#x} {1:#x}".format(id_x, id_y))
target_fr = target_db.frame_by_id(id_a)
source_fr = source_db.frame_by_id(id_b)
signal_to_add = []
for sig_t in target_fr.signals:
for sig_s in source_fr.signals:
# print(sig.name)
if sig_t.start_bit == sig_s.start_bit:
# print("\t{0} {1}".format(sig_t.name, sig_s.name))
signal_to_add.append(sig_s)
parser.add_option("", "--frame",
dest="exportframe", default=None,
help="create macros for Frame(s); Comma seperated list of Names ")
parser.add_option("", "--ecu",
dest="exportecu", default=None,
help="create macros for Ecu(s) Comma seperated ")
(cmdlineOptions, args) = parser.parse_args()
if len(args) < 2:
parser.print_help()
sys.exit(1)
infile = args[0]
outfile = args[1]
dbs = canmatrix.formats.loadp(infile)
db = next(iter(dbs.values()))
sourceCode = ""
if cmdlineOptions.exportframe is None and cmdlineOptions.exportecu is None:
for frame in db.frames:
sourceCode += createDecodeMacrosForFrame(
frame, "_" + frame.name + "_")
sourceCode += createStoreMacrosForFrame(
frame, "_" + frame.name + "_")
if cmdlineOptions.exportframe is not None:
for frameId in cmdlineOptions.exportframe.split(','):
try:
frame = db.frameById(int(frameId))
except ValueError:
frame = db.frameByName(frameId)
def main():
if len(sys.argv) < 3:
sys.stderr.write('Usage: sys.argv[0] import-file export-file\n')
sys.stderr.write('import-file: *.dbc|*.dbf|*.kcd\n')
sys.stderr.write('export-file: somefile.zip\n')
sys.exit(1)
infile = sys.argv[1]
outfile = os.path.splitext(sys.argv[2])[0]
db = next(iter(canmatrix.formats.loadp(infile).values()))
ticker_ecus(db, outfile)
def join_frame_by_signal_start_bit(files): # type: (typing.List[str]) -> canmatrix.CanMatrix
target_db = next(iter(canmatrix.formats.loadp(files.pop(0)).values()))
pgn_x, id_x = list_pgn(db=target_db)
for f in files:
source_db = next(iter(canmatrix.formats.loadp(f).values()))
pgn_y, id_y = list_pgn(db=source_db)
same_pgn = ids_sharing_same_pgn(id_x, pgn_x, id_y, pgn_y)
for id_a, id_b in same_pgn:
# print("{0:#x} {1:#x}".format(id_x, id_y))
target_fr = target_db.frame_by_id(id_a)
source_fr = source_db.frame_by_id(id_b)
signal_to_add = []
for sig_t in target_fr.signals: