Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _client_from_JSON(self, datum):
l = self._location_from_JSON
cs = [SourceCondition(c["condition"], location = l(c["location"]))
for c in datum["conditions"]]
return ServiceClientCall(datum["name"], datum["namespace"],
datum["type"], control_depth = datum["depth"],
repeats = datum["repeats"],
conditions = cs,
location = l(datum["location"]))
def _srv_from_JSON(self, datum):
l = self._location_from_JSON
cs = [SourceCondition(c["condition"], location = l(c["location"]))
for c in datum["conditions"]]
return ServiceServerCall(datum["name"], datum["namespace"],
datum["type"], control_depth = datum["depth"],
repeats = datum["repeats"],
conditions = cs,
location = l(datum["location"]))
def _on_subscription(self, node, ns, call, topic_pos=0, queue_pos=1,
msg_type=None):
if len(call.arguments) <= 1:
return
name = self._extract_topic(call, topic_pos=topic_pos)
msg_type = msg_type or self._extract_message_type(call)
queue_size = self._extract_queue_size(call, queue_pos=queue_pos)
depth = get_control_depth(call, recursive=True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location=location)
for c in get_conditions(call, recursive=True)]
sub = Subscription(name, ns, msg_type, queue_size, location=location,
control_depth=depth, conditions=conditions,
repeats=is_under_loop(call, recursive=True))
node.subscribe.append(sub)
self.log.debug("Found Subscription on %s/%s (%s)", ns, name, msg_type)
def _condition(self, condition, sub, line, col):
assert isinstance(condition, tuple)
value = sub.resolve(condition[1], conversion=bool)
if value is None:
stmt = "if" if condition[0] else "unless"
loc = self.configuration.roslaunch[-1].location
loc.line = line
loc.column = col
# not sure if tag is part of the configuration
return SourceCondition(condition[1], # UnresolvedValue
location=loc, statement=stmt)
if value is condition[0]:
# tag is part of the configuration
return True
# tag is not part of the configuration
return False
def _write_from_JSON(self, datum):
l = self._location_from_JSON
cs = [SourceCondition(c["condition"], location = l(c["location"]))
for c in datum["conditions"]]
return WriteParameterCall(datum["name"], datum["namespace"],
datum["type"], control_depth = datum["depth"],
repeats = datum["repeats"],
conditions = cs,
location = l(datum["location"]))
def _on_read_param(self, node, ns, call):
if len(call.arguments) < 1:
return
name = self._extract_topic(call)
depth = get_control_depth(call, recursive = True)
location = self._call_location(call)
conditions = [SourceCondition(pretty_str(c), location = location)
for c in get_conditions(call, recursive = True)]
read = ReadParameterCall(name, ns, None, location = location,
control_depth = depth, conditions = conditions,
repeats = is_under_loop(call, recursive = True))
node.read_param.append(read)
self.log.debug("Found Read on %s/%s (%s)", ns, name, "string")
def _read_from_JSON(self, datum):
l = self._location_from_JSON
cs = [SourceCondition(c["condition"], location = l(c["location"]))
for c in datum["conditions"]]
return ReadParameterCall(datum["name"], datum["namespace"],
datum["type"], control_depth = datum["depth"],
repeats = datum["repeats"],
conditions = cs,
location = l(datum["location"]))