Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._position_for_aspirate(location)
mm_position = self._aspirate_plunger_position(
self.current_volume + volume)
speed = self.speeds['aspirate'] * rate
self.instrument_actuator.push_speed()
self.instrument_actuator.set_speed(speed)
self.instrument_actuator.set_active_current(self._plunger_current)
self.robot.poses = self.instrument_actuator.move(
self.robot.poses,
x=mm_position
)
self.instrument_actuator.pop_speed()
self.current_volume += volume # update after actual aspirate
do_publish(self.broker, commands.aspirate, self.aspirate, 'after',
self, None, self, volume, display_location, rate)
return self
well_edges = [
location.from_center(x=radius, y=0, z=1), # right edge
location.from_center(x=radius * -1, y=0, z=1), # left edge
location.from_center(x=0, y=radius, z=1), # back edge
location.from_center(x=0, y=radius * -1, z=1) # front edge
]
# Apply vertical offset to well edges
well_edges = map(lambda x: x + v_offset, well_edges)
self.robot.gantry.push_speed()
self.robot.gantry.set_speed(speed)
[self.move_to((location, e), strategy='direct') for e in well_edges]
self.robot.gantry.pop_speed()
do_publish(self.broker, commands.touch_tip, self.touch_tip, 'after',
self, None, self, location, radius, v_offset, speed)
return self
mm_position = self._dispense_plunger_position(
self.current_volume - volume)
speed = self.speeds['dispense'] * rate
self.instrument_actuator.push_speed()
self.instrument_actuator.set_speed(speed)
self.instrument_actuator.set_active_current(self._plunger_current)
self.robot.poses = self.instrument_actuator.move(
self.robot.poses,
x=mm_position
)
self.instrument_actuator.pop_speed()
self.current_volume -= volume # update after actual dispense
do_publish(self.broker, commands.dispense, self.dispense, 'after',
self, None, self, volume, display_location, rate)
return self
def home(self) -> 'InstrumentContext':
""" Home the robot.
:returns: This instance.
"""
def home_dummy(mount): pass
cmds.do_publish(self.broker, cmds.home, home_dummy,
'before', None, None, self._mount.name.lower())
self._hw_manager.hardware.home_z(self._mount)
self._hw_manager.hardware.home_plunger(self._mount)
cmds.do_publish(self.broker, cmds.home, home_dummy,
'after', self, None, self._mount.name.lower())
return self
if not isinstance(volume, Number):
if isinstance(volume, (LegacyWell, LegacyLocation)) \
and not location:
location = volume
volume = self._working_volume - self.current_volume
display_location = location if location else self.previous_placeable
if volume != 0:
self._position_for_aspirate(location)
cmds.do_publish(
self._instr_ctx.broker, cmds.aspirate, self.aspirate,
'before', None, None, self, volume,
display_location, rate)
self._hw_manager.hardware.aspirate(self._mount, volume, rate)
cmds.do_publish(
self._instr_ctx.broker, cmds.aspirate, self.aspirate,
'after', self, None, self, volume,
display_location, rate)
return self
self._log.debug("aspirate {} from {} at {}"
.format(volume,
location if location else 'current position',
rate))
if not isinstance(volume, Number):
if isinstance(volume, (LegacyWell, LegacyLocation)) \
and not location:
location = volume
volume = self._working_volume - self.current_volume
display_location = location if location else self.previous_placeable
if volume != 0:
self._position_for_aspirate(location)
cmds.do_publish(
self._instr_ctx.broker, cmds.aspirate, self.aspirate,
'before', None, None, self, volume,
display_location, rate)
self._hw_manager.hardware.aspirate(self._mount, volume, rate)
cmds.do_publish(
self._instr_ctx.broker, cmds.aspirate, self.aspirate,
'after', self, None, self, volume,
display_location, rate)
return self
elif location and isinstance(location, Well):
if 'fixedTrash' in quirks_from_any_parent(location):
target = location.top()
else:
bot = location.bottom()
target = bot._replace(
point=bot.point._replace(z=bot.point.z + 10))
elif not location:
target = self.trash_container.wells()[0].top()
else:
raise TypeError(
"If specified, location should be an instance of "
"types.Location (e.g. the return value from "
"tiprack.wells()[0].top()) or a Well (e.g. tiprack.wells()[0]."
" However, it is a {}".format(location))
cmds.do_publish(self.broker, cmds.drop_tip, self.drop_tip,
'before', None, None, self, location=target)
self.move_to(target)
self._hw_manager.hardware.drop_tip(self._mount, home_after=home_after)
cmds.do_publish(self.broker, cmds.drop_tip, self.drop_tip,
'after', self, None, self, location=target)
if isinstance(target.labware, Well)\
and target.labware.parent.is_tiprack:
# If this is a tiprack we can try and add the tip back to the
# tracker
try:
target.labware.parent.return_tips(
target.labware, self.channels)
except AssertionError:
# Similarly to :py:meth:`return_tips`, the failure case here
# just means the tip can't be reused, so don't actually stop
# the protocol
well_edges = [
location.from_center(x=radius, y=0, z=1), # right edge
location.from_center(x=radius * -1, y=0, z=1), # left edge
location.from_center(x=0, y=radius, z=1), # back edge
location.from_center(x=0, y=radius * -1, z=1) # front edge
]
# Apply vertical offset to well edges
v_well_edges = [
LegacyLocation(ll.labware, ll.offset + new_v_offset)
for ll in well_edges]
self.set_speed(speed)
[self.move_to(loc, strategy='direct')
for loc in v_well_edges]
cmds.do_publish(
self._instr_ctx.broker, cmds.touch_tip, self.touch_tip, 'after',
self, None, self, location, radius, v_offset, speed)
return self