Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse_can_frame(frame):
if frame.extended:
can_id = frame.id
source_node_id = can_id & 0x7F
service_not_message = bool((can_id >> 7) & 1)
if service_not_message:
destination_node_id = (can_id >> 8) & 0x7F
request_not_response = bool((can_id >> 15) & 1)
service_type_id = (can_id >> 16) & 0xFF
try:
data_type_name = uavcan.DATATYPES[(service_type_id, uavcan.dsdl.CompoundType.KIND_SERVICE)].full_name
except KeyError:
data_type_name = '' % service_type_id
else:
message_type_id = (can_id >> 8) & 0xFFFF
if source_node_id == 0:
source_node_id = 'Anon'
message_type_id &= 0b11
destination_node_id = ''
try:
data_type_name = uavcan.DATATYPES[(message_type_id, uavcan.dsdl.CompoundType.KIND_MESSAGE)].full_name
except KeyError:
data_type_name = '' % message_type_id
else:
data_type_name = 'N/A'
source_node_id = 'N/A'
destination_node_id = 'N/A'
def run_parser(source_dirs, search_dirs):
try:
types = dsdl.parse_namespaces(source_dirs, search_dirs)
except dsdl.DsdlException as ex:
logger.info('Parser failure', exc_info=True)
die(ex)
return types
def on_request(self):
logging.debug("[#{0:03d}:uavcan.protocol.file.GetInfo] {1!r}".format(
self.transfer.source_node_id,
self.request.path.path.decode()))
try:
vpath = self.request.path.path.decode()
with open(os.path.join(self.base_path, vpath), "rb") as fw:
data = fw.read()
self.response.error.value = self.response.error.OK
self.response.size = len(data)
self.response.crc64 = \
uavcan.dsdl.signature.compute_signature(data)
self.response.entry_type.flags = \
(self.response.entry_type.FLAG_FILE |
self.response.entry_type.FLAG_READABLE)
except Exception:
logging.exception("[#{0:03d}:uavcan.protocol.file.GetInfo] error")
self.response.error.value = self.response.error.UNKNOWN_ERROR
self.response.size = 0
self.response.crc64 = 0
self.response.entry_type.flags = 0
parser.add_argument('build_dir', nargs=1)
parser.add_argument('--build', action='append')
args = parser.parse_args()
buildlist = None
if args.build:
buildlist = set(args.build)
namespace_paths = [os.path.abspath(path) for path in args.namespace_dir]
build_dir = os.path.abspath(args.build_dir[0])
os.chdir(os.path.dirname(__file__))
templates_dir = 'templates'
messages = uavcan.dsdl.parse_namespaces(namespace_paths)
message_dict = {}
for msg in messages:
message_dict[msg.full_name] = msg
for template in templates:
with open(os.path.join(templates_dir, template['source_file']), 'rb') as f:
template['source'] = f.read()
def build_message(msg_name):
print 'building %s' % (msg_name,)
msg = message_dict[msg_name]
for template in templates:
output = em.expand(template['source'], msg=msg)
if not output.strip():
continue
def run_parser(source_dirs, search_dirs):
try:
types = dsdl.parse_namespaces(source_dirs, search_dirs)
except dsdl.DsdlException as ex:
logger.info('Parser failure', exc_info=True)
die(ex)
return types
def list_message_data_type_names_with_dtid():
# Custom data type mappings must be configured in the library
message_types = []
for (dtid, kind), dtype in uavcan.DATATYPES.items():
if dtid is not None and kind == uavcan.dsdl.CompoundType.KIND_MESSAGE:
message_types.append(str(dtype))
return list(sorted(message_types))
def _extract_fields(self, x):
if isinstance(x, (int, float, bool, str, bytes)):
return x
if isinstance(x, uavcan.transport.Void) or isinstance(x.type, uavcan.dsdl.parser.VoidType): # HACK
return None
if isinstance(x.type, uavcan.dsdl.parser.PrimitiveType):
return x.value
if isinstance(x.type, uavcan.dsdl.parser.CompoundType):
return {name: self._extract_fields(value) for name, value in x.fields.items()}
if isinstance(x.type, uavcan.dsdl.parser.ArrayType):
val = [self._extract_fields(y) for y in x]
try:
val = bytes(val) # String handling heuristic
except Exception:
pass
return val
raise Exception('Invalid field type: %s' % x.type)
# Try to prepend the built-in DSDL files
# TODO: why do we need try/except here?
# noinspection PyBroadException
try:
if not args.get("exclude_dist", None):
dsdl_path = pkg_resources.resource_filename(__name__, "dsdl_files") # @UndefinedVariable
paths = [os.path.join(dsdl_path, "uavcan")] + paths
custom_path = os.path.join(os.path.expanduser("~"), "uavcan_vendor_specific_types")
if os.path.isdir(custom_path):
paths += [f for f in [os.path.join(custom_path, f) for f in os.listdir(custom_path)]
if os.path.isdir(f)]
except Exception:
pass
root_namespace = Namespace()
dtypes = dsdl.parse_namespaces(paths)
for dtype in dtypes:
namespace, _, typename = dtype.full_name.rpartition(".")
root_namespace._path(namespace).__dict__[typename] = dtype
TYPENAMES[dtype.full_name] = dtype
if dtype.default_dtid:
DATATYPES[(dtype.default_dtid, dtype.kind)] = dtype
# Add the base CRC to each data type capable of being transmitted
dtype.base_crc = dsdl.crc16_from_bytes(struct.pack("30} DTID: {: >4} base_crc:{: >8}"
.format(typename, dtype.default_dtid, hex(dtype.base_crc)))
def create_instance_closure(closure_type, _mode=None):
# noinspection PyShadowingNames
def create_instance(*args, **kwargs):
if _mode:
def value(self):
if not self._bits:
return None
int_value = int(self._bits, 2)
if self._type.kind == dsdl.PrimitiveType.KIND_BOOLEAN:
return bool(int_value)
elif self._type.kind == dsdl.PrimitiveType.KIND_UNSIGNED_INT:
return int_value
elif self._type.kind == dsdl.PrimitiveType.KIND_SIGNED_INT:
if int_value >= (1 << (self._type.bitlen - 1)):
int_value = -((1 << self._type.bitlen) - int_value)
return int_value
elif self._type.kind == dsdl.PrimitiveType.KIND_FLOAT:
if self._type.bitlen == 16:
return f32_from_f16(int_value)
elif self._type.bitlen == 32:
return struct.unpack("