Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_on_event_connected(self, mock_send_status, mock_printer_callback, mock_time):
# Set Up
self.plugin = octoprint_firmwareupdater.FirmwareupdaterPlugin()
self.plugin.force_check_updates = True
self.plugin._settings = settings_mock()
self.plugin._settings._settings["check_after_connect"] = True
self.plugin._printer = printer_mock()
self.plugin._printer.commands = mock.MagicMock(name='commands')
self.plugin._logger = logger_mock()
# Call test subject
from octoprint.events import Events
self.plugin.on_event(Events.CONNECTED, None)
# Assert
self.assertFalse(self.plugin.force_check_updates)
self.assertIsNone(self.plugin.printer_info)
self.assertIsNone(self.plugin.update_info)
mock_send_status.assert_called_once_with(status_type="check_update_status", status_value="progress", status_description="Retrieving current firmware version from printer...")
self.plugin._printer.commands.assert_called_once_with("M115\n")
self.assertTrue(self.plugin._checking)
def on_event(self, event, payload):
"""
Callback for general OctoPrint events.
"""
with self._lock:
if event == octoprint.events.Events.CONNECTED:
self._currentAnimation = self._animatePrinterConnected
elif event == octoprint.events.Events.PRINT_STARTED:
self._currentAnimation = self._animatePrintStarted
elif event == octoprint.events.Events.PRINT_DONE:
self._currentAnimation = self._animatePrintDone
elif event == octoprint.events.Events.PRINT_FAILED or event == octoprint.events.Events.PRINT_CANCELLED:
self._currentAnimation = self._animatePrintFailed
elif event == octoprint.events.Events.DISCONNECTED:
self._currentAnimation = None
self._tplinksmartplug_logger.debug(self.print_job_power)
hours = (payload.get("time", 0)/60)/60
self._tplinksmartplug_logger.debug("hours: %s" % hours)
power_used = self.print_job_power * hours
self._tplinksmartplug_logger.debug("power used: %s" % power_used)
power_cost = power_used * self._settings.getFloat(["cost_rate"])
self._tplinksmartplug_logger.debug("power total cost: %s" % power_cost)
self._storage_interface = self._file_manager._storage(payload.get("origin", "local"))
self._storage_interface.set_additional_metadata(payload.get("path"), "statistics", dict(lastPowerCost=dict(_default=float('{:.4f}'.format(power_cost)))), merge=True)
self.print_job_power = 0.0
self.print_job_started = False
if self.powerOffWhenIdle == True and event == Events.MOVIE_RENDERING:
self._tplinksmartplug_logger.debug("Timelapse generation started: %s" % payload.get("movie_basename", ""))
self._timelapse_active = True
if self._timelapse_active and event == Events.MOVIE_DONE or event == Events.MOVIE_FAILED:
self._tplinksmartplug_logger.debug("Timelapse generation finished: %s. Return Code: %s" % (payload.get("movie_basename", ""), payload.get("returncode", "completed")))
self._timelapse_active = False
# File Uploaded Event
if event == Events.UPLOAD and self._settings.getBoolean(["event_on_upload_monitoring"]):
if payload.get("print", False) != False: # implemnted in OctoPrint version 1.4.1
self._tplinksmartplug_logger.debug("File uploaded: %s. Turning enabled plugs on." % payload.get("name", ""))
self._tplinksmartplug_logger.debug(payload)
for plug in self._settings.get(['arrSmartplugs']):
self._tplinksmartplug_logger.debug(plug)
if plug["event_on_upload"] == True and not self._printer.is_ready():
self._tplinksmartplug_logger.debug("powering on %s due to %s event." % (plug["ip"], event))
response = self.turn_on(plug["ip"])
token["languageCode"] = language_code
token["date"] = datetime.datetime.now()
updated = True
break
if not found:
self._logger.debug("Adding token for %s." % device_name)
# Token was not found so we need to add it
existing_tokens.append({'apnsToken': new_token, 'deviceName': device_name, 'date': datetime.datetime.now(),
'printerID': printer_id, 'printerName': printer_name, 'languageCode': language_code})
updated = True
if updated:
# Save new settings
self._settings.set(["tokens"], existing_tokens)
self._settings.save()
eventManager().fire(Events.SETTINGS_UPDATED)
self._logger.debug("Tokens saved")
def on_event(self, event, payload) :
# check if event is slicing started
if event == octoprint.events.Events.SLICING_STARTED :
# Set processing slice
self.processingSlice = True
# Otherwise check if event is slicing done, cancelled, or failed
elif event == octoprint.events.Events.SLICING_DONE or event == octoprint.events.Events.SLICING_CANCELLED or event == octoprint.events.Events.SLICING_FAILED :
# Clear processing slice
self.processingSlice = False
# Restore files
self.restoreFiles()
def on_event(self, event, payload):
subscribed_events = Events.FILE_SELECTED + Events.PRINT_STARTED + Events.PRINT_CANCELLED + Events.PRINT_DONE + Events.PRINT_FAILED
if subscribed_events.find(event) == -1:
return
# 'PrintStarted'
if event == Events.PRINT_STARTED:
self.grblState = "Run"
return
# Print ended (finished / failed / cancelled)
if event == Events.PRINT_CANCELLED or event == Events.PRINT_DONE or event == Events.PRINT_FAILED:
self.grblState = "Idle"
return
# 'FileSelected'
if event == Events.FILE_SELECTED: