Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delay_remove_race(self):
self.called = False
self.delay = DelayManager(self.machine.delayRegistry)
self.delay.add(ms=6000, name="first", callback=self.delay_first)
self.delay.add(ms=6000, name="second", callback=self.delay_second)
self.advance_time_and_run(10)
def __init__(self, machine: MachineController, name: str) -> None:
"""Initialise digital output."""
self.hw_driver = None # type: Union[DriverPlatformInterface, LightPlatformInterface]
self.platform = None # type: Union[DriverPlatform, LightsPlatform]
self.type = None # type: str
super().__init__(machine, name)
self.delay = DelayManager(self.machine)
def __init__(self, machine: MachineController, name: str, player: Player, config: dict):
"""Initialise counter."""
if 'events_when_hit' not in config:
# for compatibility post the same default as previously for
# counters. This one is deprecated.
config['events_when_hit'] = ['counter_' + name + '_hit']
# this is the one moving forward
config['events_when_hit'].append('logicblock_' + name + '_hit')
super().__init__(machine, name, player, config)
self.log = logging.getLogger('Counter.' + name)
self.log.debug("Creating Counter LogicBlock")
self.delay = DelayManager(self.machine.delayRegistry)
self.ignore_hits = False
self.hit_value = -1
if not self.config['player_variable']:
self.config['player_variable'] = self.name + '_count'
self.hit_value = self.config['count_interval']
if self.config['direction'] == 'down' and self.hit_value > 0:
self.hit_value *= -1
elif self.config['direction'] == 'up' and self.hit_value < 0:
self.hit_value *= -1
if not self.config['persist_state'] or not self.player.is_player_var(self.config['player_variable']):
self.player[self.config['player_variable']] = self.config['starting_count']
def __init__(self, machine, name):
"""Initialise shot."""
# If this device is setup in a machine-wide config, make sure it has
# a default enable event.
super(Shot, self).__init__(machine, name)
self.delay = mpf.core.delays.DelayManager(self.machine)
self.active_sequences = list()
"""List of tuples: (id, current_position_index, next_switch)"""
self.active_delays = set()
self.running_show = None
self._handlers = []
async def _initialize(self):
await super()._initialize()
self.ticks_remaining = 0
self.max_value = self.config['max_value']
self.direction = self.config['direction']
self.tick_secs = None
self.timer = None
self.event_keys = list()
self.delay = DelayManager(self.machine)
self.restart_on_complete = self.config['restart_on_complete']
self.end_value = None
self.start_value = None
self.ticks = None
if self.config['debug']:
self.configure_logging('Timer.' + self.name,
'full', 'full')
else:
self.configure_logging('Timer.' + self.name,
self.config['console_log'],
self.config['file_log'])
self.debug_log("----------- Initial Values -----------")
self.debug_log("running: %s", self.running)
def __init__(self, machine, name):
"""Initialise magnet."""
super().__init__(machine, name)
self.delay = DelayManager(machine)
self._active = False
self._release_in_progress = False
def __init__(self, machine, name):
"""Initialise servo."""
self.hw_servo = None
self.platform = None # type: ServoPlatform
self._position = None
self.speed_limit = None
self.acceleration_limit = None
self._ball_search_started = False
self.delay = DelayManager(machine)
super().__init__(machine, name)
def __init__(self, number, config, platform):
"""Initialise output."""
super().__init__(config, number)
self.platform = platform # type: RaspberryPiHardwarePlatform
self.gpio = int(self.number)
self.delay = DelayManager(self.platform.machine)
def __init__(self, machine, name):
"""Initialise multiball."""
self.ball_locks = None
self.source_playfield = None
super().__init__(machine, name)
self.delay = DelayManager(machine)
self.balls_added_live = 0
self.balls_live_target = 0
self.shoot_again = False
self.register_boot_hold('init')
self._done = False
self.monitors = dict()
self.plugins = list()
self.scriptlets = list()
self.modes = DeviceCollection(self, 'modes', None)
self.game = None
self.active_debugger = dict()
self.machine_vars = CaseInsensitiveDict()
self.machine_var_monitor = False
self.machine_var_data_manager = None
self.thread_stopper = threading.Event()
self.delayRegistry = DelayManagerRegistry(self)
self.delay = DelayManager(self.delayRegistry)
self.crash_queue = queue.Queue()
self.config = None
self.events = None
self.machine_config = None
self._set_machine_path()
self.config_validator = ConfigValidator(self)
self._load_config()
self.clock = self._load_clock()
self._crash_queue_checker = self.clock.schedule_interval(self._check_crash_queue, 1)
self.hardware_platforms = dict()