Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._reset_idle_timer()
self._timeout_value = None
self._plugin_manager.send_plugin_message(self._identifier, dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout", timeout_value=self._timeout_value))
if event == Events.PRINT_STARTED and self._countdown_active:
for plug in self._settings.get(["arrSmartplugs"]):
if plug["useCountdownRules"] and int(plug["countdownOffDelay"]) > 0:
if "/" in plug["ip"]:
plug_ip, plug_num = plug["ip"].split("/")
else:
plug_ip = plug["ip"]
plug_num = -1
self.sendCommand(json.loads('{"count_down":{"delete_all_rules":null}}'),plug_ip,plug_num)
self._tplinksmartplug_logger.debug("Cleared countdown rules for %s" % plug["ip"])
# Print Done Event
if event == Events.PRINT_DONE and self.print_job_started:
self._tplinksmartplug_logger.debug(payload)
for plug in self._settings.get(["arrSmartplugs"]):
status = self.check_status(plug["ip"])
self.print_job_power += float(self.deep_get(status,["emeter","get_realtime","total_wh"], default=0)) / 1000
self.print_job_power += float(self.deep_get(status,["emeter","get_realtime","total"], default=0))
self._tplinksmartplug_logger.debug(self.print_job_power)
hours = (payload.get("time", 0)/60)/60
self._tplinksmartplug_logger.debug("hours: %s" % hours)
power_used = self.print_job_power * hours
self._tplinksmartplug_logger.debug("power used: %s" % power_used)
power_cost = power_used * self._settings.getFloat(["cost_rate"])
self._tplinksmartplug_logger.debug("power total cost: %s" % power_cost)
self._storage_interface = self._file_manager._storage(payload.get("origin", "local"))
def on_event(self, event, payload):
"""
Callback for general OctoPrint events.
"""
with self._lock:
if event == octoprint.events.Events.CONNECTED:
self._currentAnimation = self._animatePrinterConnected
elif event == octoprint.events.Events.PRINT_STARTED:
self._currentAnimation = self._animatePrintStarted
elif event == octoprint.events.Events.PRINT_DONE:
self._currentAnimation = self._animatePrintDone
elif event == octoprint.events.Events.PRINT_FAILED or event == octoprint.events.Events.PRINT_CANCELLED:
self._currentAnimation = self._animatePrintFailed
elif event == octoprint.events.Events.DISCONNECTED:
self._currentAnimation = None
def on_event(self, event, payload):
if event == octoprint.events.Events.PRINT_STARTED:
self._send_message(payload["origin"], payload["path"], 0)
elif event == octoprint.events.Events.PRINT_DONE:
self._send_message(payload["origin"], payload["path"], 100)
self.stop_filament_detection()
self.cancel_all_events_on_queue()
self.event_queue = []
for rpi_output in self.rpi_outputs:
if rpi_output['shutdown_on_failed']:
shutdown_time = rpi_output['shutdown_time']
if rpi_output['output_type'] == 'pwm' and rpi_output['pwm_temperature_linked']:
rpi_output['duty_cycle'] = rpi_output['default_duty_cycle']
if rpi_output['auto_shutdown'] and not self.is_hour(shutdown_time):
delay_seconds = self.to_float(shutdown_time)
self.schedule_auto_shutdown_outputs(rpi_output, delay_seconds)
if rpi_output['output_type'] == 'temp_hum_control':
rpi_output['temp_ctr_set_value'] = 0
self.run_tasks()
if event == Events.PRINT_DONE:
for notification in self.notifications:
if notification['printFinish']:
file_name = os.path.basename(payload["file"])
elapsed_time_in_seconds = payload["time"]
elapsed_time = octoprint.util.get_formatted_timedelta(timedelta(seconds=elapsed_time_in_seconds))
msg = "Print job finished: " + file_name + "finished printing in " + file_name, elapsed_time
self.send_notification(msg)
payload = {
"action": "showPrintJobDialogAfterClientConnection",
"printJobItem": printJobItem
}
self._sendDataToClient(payload)
except DoesNotExist as e:
self._settings.remove([SettingsKeys.SETTINGS_KEY_SHOW_PRINTJOB_DIALOG_AFTER_PRINT_JOB_ID])
elif Events.PRINT_STARTED == event:
self.alreadyCanceled = False
self._createPrintJobModel(payload)
elif "DisplayLayerProgress_layerChanged" == event or event == "DisplayLayerProgress_heightChanged":
self._updatePrintJobModelWithLayerHeightInfos(payload)
elif Events.PRINT_DONE == event:
self._printJobFinished("success", payload)
elif Events.PRINT_FAILED == event:
if self.alreadyCanceled == False:
self._printJobFinished("failed", payload)
elif Events.PRINT_CANCELLED == event:
self.alreadyCanceled = True
self._printJobFinished("canceled", payload)
pass