Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._initializeEventLogger()
self._isPrinterRunning = True
self._logger.info("Printing started. Detailed progress started." + str(payload))
self._updateDisplayCommandQueue.printJobStarted()
self._sentGCodeHookCommandQueue.printJobStarted()
self._sendingGCodeHookCommandQueue.printJobStarted()
self._resetCurrentValues()
# which M600 layers should be processed
self._m600LayerProcessingList = list(self._m600LayerList)
self._updateDisplay(UPDATE_DISPLAY_REASON_FRONTEND_CALL)
self._checkLayerExpressionValid()
elif event in (Events.PRINT_DONE, Events.PRINT_FAILED, Events.PRINT_CANCELLED):
self._logger.info("Printing stopped. Detailed progress stopped.")
self._updateDisplayCommandQueue.printJobStopped()
self._sentGCodeHookCommandQueue.printJobStopped()
self._sendingGCodeHookCommandQueue.printJobStopped()
# send to navbar
self._updateDisplay(UPDATE_DISPLAY_REASON_FRONTEND_CALL)
self._eventLogging("event print done!")
# not needed could be done via standard code-settings self._sendCommandToPrinter("M117 Print Done")
self._isPrinterRunning = False
elif event == Events.CLIENT_OPENED or event == Events.SETTINGS_UPDATED:
self._initDesktopPinterDisplay()
self._updateDisplayCommandQueue.addToQueue(UPDATE_DISPLAY_REASON_FRONTEND_CALL)
selectedFile = path
with open(selectedFile, "r") as f:
i = 0
for line in f:
try:
obj = self.process_line(line)
if obj:
obj["id"] = i
self.object_list.append(obj)
i = i + 1
except (ValueError, RuntimeError):
print("Error")
# Send objects to server
self._updateobjects()
elif event in (Events.PRINT_DONE, Events.PRINT_FAILED, Events.PRINT_CANCELLED, Events.FILE_DESELECTED):
self.object_list = []
self.trackE = False
self.lastE = 0
self._plugin_manager.send_plugin_message(self._identifier, dict(objects=self.object_list))
self.active_object = 'None'
if self._settings.get(['shownav']):
self._plugin_manager.send_plugin_message(self._identifier, dict(navBarActive=self.active_object))
self.update_ui()
elif event == Events.PRINT_DONE:
self.stop_filament_detection()
self.print_complete = True
for rpi_output in self.rpi_outputs:
shutdown_time = rpi_output['shutdown_time']
if rpi_output['output_type'] == 'pwm' and rpi_output['pwm_temperature_linked']:
rpi_output['duty_cycle'] = rpi_output['default_duty_cycle']
if rpi_output['auto_shutdown'] and not self.is_hour(shutdown_time):
delay_seconds = self.to_float(shutdown_time)
self.schedule_auto_shutdown_outputs(rpi_output, delay_seconds)
self.run_tasks()
self.update_ui()
elif event in (Events.PRINT_CANCELLED, Events.PRINT_FAILED):
self.stop_filament_detection()
self.cancel_all_events_on_queue()
self.event_queue = []
for rpi_output in self.rpi_outputs:
if rpi_output['shutdown_on_failed']:
shutdown_time = rpi_output['shutdown_time']
if rpi_output['output_type'] == 'pwm' and rpi_output['pwm_temperature_linked']:
rpi_output['duty_cycle'] = rpi_output['default_duty_cycle']
if rpi_output['auto_shutdown'] and not self.is_hour(shutdown_time):
delay_seconds = self.to_float(shutdown_time)
self.schedule_auto_shutdown_outputs(rpi_output, delay_seconds)
if rpi_output['output_type'] == 'temp_hum_control':
rpi_output['temp_ctr_set_value'] = 0
self.run_tasks()
if event == Events.PRINT_DONE:
if event == Events.ERROR and self._settings.getBoolean(["event_on_error_monitoring"]) == True:
self._tplinksmartplug_logger.debug("powering off due to %s event." % event)
for plug in self._settings.get(['arrSmartplugs']):
if plug["event_on_error"] == True:
self._tplinksmartplug_logger.debug("powering off %s due to %s event." % (plug["ip"], event))
response = self.turn_off(plug["ip"])
if response["currentState"] == "off":
self._plugin_manager.send_plugin_message(self._identifier, response)
# Client Opened Event
if event == Events.CLIENT_OPENED:
if self._settings.get_boolean(["powerOffWhenIdle"]):
self._reset_idle_timer()
self._plugin_manager.send_plugin_message(self._identifier, dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout", timeout_value=self._timeout_value))
return
# Cancelled Print Interpreted Event
if event == Events.PRINT_FAILED and not self._printer.is_closed_or_error():
self._tplinksmartplug_logger.debug("Print cancelled, reseting job_power to 0")
self.print_job_power = 0.0
self.print_job_started = False
return
# Print Started Event
if event == Events.PRINT_STARTED and self._settings.getFloat(["cost_rate"]) > 0:
self.print_job_started = True
self._tplinksmartplug_logger.debug(payload.get("path", None))
for plug in self._settings.get(["arrSmartplugs"]):
status = self.check_status(plug["ip"])
self.print_job_power -= float(self.deep_get(status,["emeter","get_realtime","total_wh"], default=0)) / 1000
self.print_job_power -= float(self.deep_get(status,["emeter","get_realtime","total"], default=0))
self._tplinksmartplug_logger.debug(self.print_job_power)
if event == Events.PRINT_STARTED and self.powerOffWhenIdle == True:
if self._abort_timer is not None:
def on_event(self, event, payload):
if event == Events.PRINT_STARTED:
self._logger.info("Printing started. Detailed progress started.")
self._etl_format = self._settings.get(["etl_format"])
self._eta_strftime = self._settings.get(["eta_strftime"])
self._messages = self._settings.get(["messages"])
self._repeat_timer = octoprint.util.RepeatedTimer(self._settings.get_int(["time_to_change"]), self.do_work)
self._repeat_timer.start()
elif event in (Events.PRINT_DONE, Events.PRINT_FAILED, Events.PRINT_CANCELLED):
if self._repeat_timer != None:
self._repeat_timer.cancel()
self._repeat_timer = None
self._logger.info("Printing stopped. Detailed progress stopped.")
self._printer.commands("M117 Print Done")
elif event == Events.CONNECTED:
ip = self._get_host_ip()
if not ip:
return
self._printer.commands("M117 IP {}".format(ip))
def on_event(self, event, payload):
"""
Callback for general OctoPrint events.
"""
with self._lock:
if event == octoprint.events.Events.CONNECTED:
self._currentAnimation = self._animatePrinterConnected
elif event == octoprint.events.Events.PRINT_STARTED:
self._currentAnimation = self._animatePrintStarted
elif event == octoprint.events.Events.PRINT_DONE:
self._currentAnimation = self._animatePrintDone
elif event == octoprint.events.Events.PRINT_FAILED or event == octoprint.events.Events.PRINT_CANCELLED:
self._currentAnimation = self._animatePrintFailed
elif event == octoprint.events.Events.DISCONNECTED:
self._currentAnimation = None
]
if event in cameraSuccessEvents:
self.cameraManager.cameraConnected()
elif event in cameraFailEvents:
self.cameraManager.cameraError()
elif event == Events.FILE_REMOVED:
if payload['storage'] == 'local':
self.astroprintCloud.db.deletePrintFile(payload['path'])
elif event == Events.CONNECTED:
self.send_event("canPrint", True)
elif event == Events.PRINT_CANCELLED or event == Events.PRINT_FAILED:
self.send_event("canPrint", True)
if self.user and self.astroprintCloud.currentlyPrinting:
self.astroprintCloud.updatePrintJob("failed", self.materialCounter.totalConsumedFilament)
self.astroprintCloud.currentlyPrinting = None
self.cameraManager.stop_timelapse()
self._analyzed_job_layers = None
elif event == Events.PRINT_DONE:
if self.user and self.astroprintCloud.currentlyPrinting:
self.astroprintCloud.updatePrintJob("success", self.materialCounter.totalConsumedFilament)
self.astroprintCloud.currentlyPrinting = None
self.cameraManager.stop_timelapse()
self.send_event("canPrint", True)
elif event == Events.PRINT_STARTED:
self.send_event("canPrint", False)
"printJobItem": printJobItem
}
self._sendDataToClient(payload)
except DoesNotExist as e:
self._settings.remove([SettingsKeys.SETTINGS_KEY_SHOW_PRINTJOB_DIALOG_AFTER_PRINT_JOB_ID])
elif Events.PRINT_STARTED == event:
self.alreadyCanceled = False
self._createPrintJobModel(payload)
elif "DisplayLayerProgress_layerChanged" == event or event == "DisplayLayerProgress_heightChanged":
self._updatePrintJobModelWithLayerHeightInfos(payload)
elif Events.PRINT_DONE == event:
self._printJobFinished("success", payload)
elif Events.PRINT_FAILED == event:
if self.alreadyCanceled == False:
self._printJobFinished("failed", payload)
elif Events.PRINT_CANCELLED == event:
self.alreadyCanceled = True
self._printJobFinished("canceled", payload)
pass