Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if event == Events.PRINT_FAILED and not self._printer.is_closed_or_error():
self._tplinksmartplug_logger.debug("Print cancelled, reseting job_power to 0")
self.print_job_power = 0.0
self.print_job_started = False
return
# Print Started Event
if event == Events.PRINT_STARTED and self._settings.getFloat(["cost_rate"]) > 0:
self.print_job_started = True
self._tplinksmartplug_logger.debug(payload.get("path", None))
for plug in self._settings.get(["arrSmartplugs"]):
status = self.check_status(plug["ip"])
self.print_job_power -= float(self.deep_get(status,["emeter","get_realtime","total_wh"], default=0)) / 1000
self.print_job_power -= float(self.deep_get(status,["emeter","get_realtime","total"], default=0))
self._tplinksmartplug_logger.debug(self.print_job_power)
if event == Events.PRINT_STARTED and self.powerOffWhenIdle == True:
if self._abort_timer is not None:
self._abort_timer.cancel()
self._abort_timer = None
self._tplinksmartplug_logger.debug("Power off aborted because starting new print.")
if self._idleTimer is not None:
self._reset_idle_timer()
self._timeout_value = None
self._plugin_manager.send_plugin_message(self._identifier, dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout", timeout_value=self._timeout_value))
if event == Events.PRINT_STARTED and self._countdown_active:
for plug in self._settings.get(["arrSmartplugs"]):
if plug["useCountdownRules"] and int(plug["countdownOffDelay"]) > 0:
if "/" in plug["ip"]:
plug_ip, plug_num = plug["ip"].split("/")
else:
plug_ip = plug["ip"]
def on_event(self, event, payload):
"""
Callback for general OctoPrint events.
"""
with self._lock:
if event == octoprint.events.Events.CONNECTED:
self._currentAnimation = self._animatePrinterConnected
elif event == octoprint.events.Events.PRINT_STARTED:
self._currentAnimation = self._animatePrintStarted
elif event == octoprint.events.Events.PRINT_DONE:
self._currentAnimation = self._animatePrintDone
elif event == octoprint.events.Events.PRINT_FAILED or event == octoprint.events.Events.PRINT_CANCELLED:
self._currentAnimation = self._animatePrintFailed
elif event == octoprint.events.Events.DISCONNECTED:
self._currentAnimation = None
def on_event(self, event, payload):
if event == Events.CONNECTED:
self.update_ui()
if event == Events.CLIENT_OPENED:
self.update_ui()
if event == Events.PRINT_RESUMED:
self.start_filament_detection()
if event == Events.PRINT_STARTED:
self.print_complete = False
self.cancel_all_events_on_queue()
self.event_queue = []
self.start_filament_detection()
for rpi_output in self.rpi_outputs:
if rpi_output['auto_startup']:
delay_seconds = self.get_startup_delay_from_output(rpi_output)
self.schedule_auto_startup_outputs(rpi_output, delay_seconds)
if rpi_output['toggle_timer']:
if rpi_output['output_type'] == 'regular' or rpi_output['output_type'] == 'pwm':
self.toggle_output(rpi_output['index_id'], True)
if self.is_hour(rpi_output['shutdown_time']):
shutdown_delay_seconds = self.get_shutdown_delay_from_output(rpi_output)
self.schedule_auto_shutdown_outputs(rpi_output, shutdown_delay_seconds)
self.run_tasks()
self.update_ui()
def on_event(self, event, payload):
if event in (Events.FILE_SELECTED, Events.PRINT_STARTED):
self.object_list = []
self.lastE = 0
selectedFile = payload.get("file", "")
if not selectedFile:
path = payload.get("path", "")
if payload.get("origin") == "local":
# Get full path to local file
path = self._file_manager.path_on_disk(FileDestinations.LOCAL, path)
selectedFile = path
with open(selectedFile, "r") as f:
i = 0
for line in f:
try:
obj = self.process_line(line)
if obj:
obj["id"] = i
elif self._settings.get([SETTINGS_KEY_TOTAL_HEIGHT_METHODE]) == HEIGHT_METHODE_Z_EXPRESSION:
if not self._totalHeightFromExpression == NOT_PRESENT:
self._totalHeight = str("%.2f" % float(self._totalHeightFromExpression))
else:
self._totalHeight = NOT_PRESENT
self._updateDisplay(UPDATE_DISPLAY_REASON_FRONTEND_CALL)
self._logger.info("File select-event processing done!'")
elif event == Events.FILE_DESELECTED or \
event == Events.DISCONNECTING:
self._resetCurrentValues()
self._resetTotalValues()
self._updateDisplay(UPDATE_DISPLAY_REASON_FRONTEND_CALL)
elif event == Events.PRINT_STARTED:
self._initializeEventLogger()
self._isPrinterRunning = True
self._logger.info("Printing started. Detailed progress started." + str(payload))
self._updateDisplayCommandQueue.printJobStarted()
self._sentGCodeHookCommandQueue.printJobStarted()
self._sendingGCodeHookCommandQueue.printJobStarted()
self._resetCurrentValues()
# which M600 layers should be processed
self._m600LayerProcessingList = list(self._m600LayerList)
self._updateDisplay(UPDATE_DISPLAY_REASON_FRONTEND_CALL)
self._checkLayerExpressionValid()
elif event in (Events.PRINT_DONE, Events.PRINT_FAILED, Events.PRINT_CANCELLED):
def on_event(self, event, payload):
if event == octoprint.events.Events.PRINT_STARTED:
self._send_message(payload["origin"], payload["path"], 0)
elif event == octoprint.events.Events.PRINT_DONE:
self._send_message(payload["origin"], payload["path"], 100)
elif event == Events.PRINT_CANCELLED or event == Events.PRINT_FAILED:
self.send_event("canPrint", True)
if self.user and self.astroprintCloud.currentlyPrinting:
self.astroprintCloud.updatePrintJob("failed", self.materialCounter.totalConsumedFilament)
self.astroprintCloud.currentlyPrinting = None
self.cameraManager.stop_timelapse()
self._analyzed_job_layers = None
elif event == Events.PRINT_DONE:
if self.user and self.astroprintCloud.currentlyPrinting:
self.astroprintCloud.updatePrintJob("success", self.materialCounter.totalConsumedFilament)
self.astroprintCloud.currentlyPrinting = None
self.cameraManager.stop_timelapse()
self.send_event("canPrint", True)
elif event == Events.PRINT_STARTED:
self.send_event("canPrint", False)
if self.user:
self.astroprintCloud.printStarted(payload['name'], payload['path'])
self.materialCounter.startPrint()
self._printerListener.startPrint(payload['file'])
if event in printEvents:
self.sendSocketInfo()
if self.user and self.astroprintCloud:
self.astroprintCloud.sendCurrentData()
return
def on_event(self, event, payload):
if event == Events.PRINT_STARTED:
self._logger.info("Printing started. Detailed progress started.")
self._etl_format = self._settings.get(["etl_format"])
self._eta_strftime = self._settings.get(["eta_strftime"])
self._messages = self._settings.get(["messages"])
self._repeat_timer = octoprint.util.RepeatedTimer(self._settings.get_int(["time_to_change"]), self.do_work)
self._repeat_timer.start()
elif event in (Events.PRINT_DONE, Events.PRINT_FAILED, Events.PRINT_CANCELLED):
if self._repeat_timer != None:
self._repeat_timer.cancel()
self._repeat_timer = None
self._logger.info("Printing stopped. Detailed progress stopped.")
self._printer.commands("M117 Print Done")
elif event == Events.CONNECTED:
ip = self._get_host_ip()
if not ip:
return
if self._settings.get_boolean([SettingsKeys.SETTINGS_KEY_SHOW_PRINTJOB_DIALOG_AFTER_PRINT]):
printJobId = self._settings.get_int([SettingsKeys.SETTINGS_KEY_SHOW_PRINTJOB_DIALOG_AFTER_PRINT_JOB_ID])
if (not printJobId == None):
try:
printJobModel = self._databaseManager.loadPrintJob(printJobId)
printJobItem = TransformPrintJob2JSON.transformPrintJobModel(printJobModel)
payload = {
"action": "showPrintJobDialogAfterClientConnection",
"printJobItem": printJobItem
}
self._sendDataToClient(payload)
except DoesNotExist as e:
self._settings.remove([SettingsKeys.SETTINGS_KEY_SHOW_PRINTJOB_DIALOG_AFTER_PRINT_JOB_ID])
elif Events.PRINT_STARTED == event:
self.alreadyCanceled = False
self._createPrintJobModel(payload)
elif "DisplayLayerProgress_layerChanged" == event or event == "DisplayLayerProgress_heightChanged":
self._updatePrintJobModelWithLayerHeightInfos(payload)
elif Events.PRINT_DONE == event:
self._printJobFinished("success", payload)
elif Events.PRINT_FAILED == event:
if self.alreadyCanceled == False:
self._printJobFinished("failed", payload)
elif Events.PRINT_CANCELLED == event:
self.alreadyCanceled = True
self._printJobFinished("canceled", payload)
pass
def on_event(self, event, payload):
subscribed_events = Events.FILE_SELECTED + Events.PRINT_STARTED + Events.PRINT_CANCELLED + Events.PRINT_DONE + Events.PRINT_FAILED
if subscribed_events.find(event) == -1:
return
# 'PrintStarted'
if event == Events.PRINT_STARTED:
self.grblState = "Run"
return
# Print ended (finished / failed / cancelled)
if event == Events.PRINT_CANCELLED or event == Events.PRINT_DONE or event == Events.PRINT_FAILED:
self.grblState = "Idle"
return
# 'FileSelected'
if event == Events.FILE_SELECTED:
selected_file = self._settings.global_get_basefolder("uploads") + '/' + payload['path']
f = open(selected_file, 'r')
minX = float("inf")
minY = float("inf")
maxX = float("-inf")