Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def on_startup(self, host, port):
self.hx = HX711(20, 21)
self.hx.set_reading_format("LSB", "MSB")
self.hx.reset()
self.hx.power_up()
self.t = octoprint.util.RepeatedTimer(3.0, self.check_weight)
self.t.start()
def on_event(self, event, payload):
if event == "PrintStarted":
self._repeat_timer = octoprint.util.RepeatedTimer(60, self.send_progress)
self._repeat_timer.start()
self._logger.info("Octoscreen Plugin progress reporting started.")
if event in ["PrintFailed", "PrintDone"] :
if self._repeat_timer != None:
self._repeat_timer.cancel()
self._repeat_timer = None
def _restartTimer(self):
# stop the timer
if self._checkTempTimer:
self._logger.debug(u"Stopping Timer...")
self._checkTempTimer.cancel()
self._checkTempTimer = None
# start a new timer
interval = self._settings.get_int(['interval'])
if self._settings.get_boolean(['enabled']) and interval:
self._logger.debug(u"Starting Timer...")
self._checkTempTimer = RepeatedTimer(interval, self.CheckTemps, None, None, True)
self._checkTempTimer.start()
def _timer_start(self):
if self._abort_timer is not None:
return
self._tplinksmartplug_logger.debug("Starting abort power off timer.")
self._timeout_value = self.abortTimeout
self._abort_timer = RepeatedTimer(1, self._timer_task)
self._abort_timer.start()
def start_heartbeat_timer(self):
heartbeat_event = self._settings.get(["supported_events"], merged=True).get(
"Heartbeat"
)
heartbeat_enabled = heartbeat_event.get("Enabled")
if not heartbeat_enabled or heartbeat_enabled == False:
return
heartbeat_timer_interval = int(heartbeat_event.get("IntervalTime"))
if heartbeat_timer_interval > 0:
self._logger.debug(
"Starting heartbeat timer: " + str(heartbeat_timer_interval) + "min(s)"
)
self.heartbeat_timer = RepeatedTimer(
heartbeat_timer_interval * 60,
self.heartbeat_timer_tick,
run_first=False,
)
self.heartbeat_timer.start()
def _restart_timer(self):
# stop the timer
if self._checkTempTimer:
self._logger.debug(u"Stopping Timer...")
self._checkTempTimer.cancel()
self._checkTempTimer = None
# start a new timer
interval = self._settings.get_int(['temp_interval'])
if interval:
self._logger.debug(u"Starting Timer...")
self._checkTempTimer = RepeatedTimer(interval, self.run_timer_job, None, None, True)
self._checkTempTimer.start()