Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
import unittest
import mock
import octoprint.server.util.flask
def _restricted_access(func):
return func
octoprint.server.util.flask.restricted_access = _restricted_access
import octoprint.server
def _require(v):
def __require(func):
return func
return __require
octoprint.server.admin_permission.require = _require
import sarge
import octoprint_firmwareupdater
class TestFlashWithPath(unittest.TestCase):
@mock.patch('shutil.copyfileobj')
@mock.patch('__builtin__.open')
@mock.patch('tempfile.NamedTemporaryFile')
@mock.patch.object(octoprint_firmwareupdater.FirmwareupdaterPlugin, '_check_avrdude')
@mock.patch('shutil.copyfileobj')
@mock.patch('threading.Thread.start')
@mock.patch('octoprint.plugin.BlueprintPlugin.route')
@mock.patch('octoprint_firmwareupdater.flask')
@mock.patch.object(octoprint_firmwareupdater.FirmwareupdaterPlugin, '_send_status')
def test_on_event_connected(self, mock_send_status, mock_printer_callback, mock_time):
# Set Up
self.plugin = octoprint_firmwareupdater.FirmwareupdaterPlugin()
self.plugin.force_check_updates = True
self.plugin._settings = settings_mock()
self.plugin._settings._settings["check_after_connect"] = True
self.plugin._printer = printer_mock()
self.plugin._printer.commands = mock.MagicMock(name='commands')
self.plugin._logger = logger_mock()
# Call test subject
from octoprint.events import Events
self.plugin.on_event(Events.CONNECTED, None)
# Assert
self.assertFalse(self.plugin.force_check_updates)
self.assertIsNone(self.plugin.printer_info)
self.assertIsNone(self.plugin.update_info)
mock_send_status.assert_called_once_with(status_type="check_update_status", status_value="progress", status_description="Retrieving current firmware version from printer...")
self.plugin._printer.commands.assert_called_once_with("M115\n")
self.assertTrue(self.plugin._checking)
def on_settings_save(self, data):
old_debug_logging = self._settings.get_boolean(["debug_logging"])
old_polling_value = self._settings.get_boolean(["pollingEnabled"])
old_polling_timer = self._settings.get(["pollingInterval"])
old_powerOffWhenIdle = self._settings.get_boolean(["powerOffWhenIdle"])
old_idleTimeout = self._settings.get_int(["idleTimeout"])
old_idleIgnoreCommands = self._settings.get(["idleIgnoreCommands"])
old_idleTimeoutWaitTemp = self._settings.get_int(["idleTimeoutWaitTemp"])
octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
self.abortTimeout = self._settings.get_int(["abortTimeout"])
self.powerOffWhenIdle = self._settings.get_boolean(["powerOffWhenIdle"])
self.idleTimeout = self._settings.get_int(["idleTimeout"])
self.idleIgnoreCommands = self._settings.get(["idleIgnoreCommands"])
self._idleIgnoreCommandsArray = self.idleIgnoreCommands.split(',')
self.idleTimeoutWaitTemp = self._settings.get_int(["idleTimeoutWaitTemp"])
if self.powerOffWhenIdle != old_powerOffWhenIdle:
self._plugin_manager.send_plugin_message(self._identifier, dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout", timeout_value=self._timeout_value))
if self.powerOffWhenIdle == True:
self._tplinksmartplug_logger.debug("Settings saved, Automatic Power Off Enabled, starting idle timer...")
self._reset_idle_timer()
self._reset_idle_timer()
self._timeout_value = None
self._plugin_manager.send_plugin_message(self._identifier, dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout", timeout_value=self._timeout_value))
if event == Events.PRINT_STARTED and self._countdown_active:
for plug in self._settings.get(["arrSmartplugs"]):
if plug["useCountdownRules"] and int(plug["countdownOffDelay"]) > 0:
if "/" in plug["ip"]:
plug_ip, plug_num = plug["ip"].split("/")
else:
plug_ip = plug["ip"]
plug_num = -1
self.sendCommand(json.loads('{"count_down":{"delete_all_rules":null}}'),plug_ip,plug_num)
self._tplinksmartplug_logger.debug("Cleared countdown rules for %s" % plug["ip"])
# Print Done Event
if event == Events.PRINT_DONE and self.print_job_started:
self._tplinksmartplug_logger.debug(payload)
for plug in self._settings.get(["arrSmartplugs"]):
status = self.check_status(plug["ip"])
self.print_job_power += float(self.deep_get(status,["emeter","get_realtime","total_wh"], default=0)) / 1000
self.print_job_power += float(self.deep_get(status,["emeter","get_realtime","total"], default=0))
self._tplinksmartplug_logger.debug(self.print_job_power)
hours = (payload.get("time", 0)/60)/60
self._tplinksmartplug_logger.debug("hours: %s" % hours)
power_used = self.print_job_power * hours
self._tplinksmartplug_logger.debug("power used: %s" % power_used)
power_cost = power_used * self._settings.getFloat(["cost_rate"])
self._tplinksmartplug_logger.debug("power total cost: %s" % power_cost)
self._storage_interface = self._file_manager._storage(payload.get("origin", "local"))
if event == Events.PRINT_FAILED and not self._printer.is_closed_or_error():
self._tplinksmartplug_logger.debug("Print cancelled, reseting job_power to 0")
self.print_job_power = 0.0
self.print_job_started = False
return
# Print Started Event
if event == Events.PRINT_STARTED and self._settings.getFloat(["cost_rate"]) > 0:
self.print_job_started = True
self._tplinksmartplug_logger.debug(payload.get("path", None))
for plug in self._settings.get(["arrSmartplugs"]):
status = self.check_status(plug["ip"])
self.print_job_power -= float(self.deep_get(status,["emeter","get_realtime","total_wh"], default=0)) / 1000
self.print_job_power -= float(self.deep_get(status,["emeter","get_realtime","total"], default=0))
self._tplinksmartplug_logger.debug(self.print_job_power)
if event == Events.PRINT_STARTED and self.powerOffWhenIdle == True:
if self._abort_timer is not None:
self._abort_timer.cancel()
self._abort_timer = None
self._tplinksmartplug_logger.debug("Power off aborted because starting new print.")
if self._idleTimer is not None:
self._reset_idle_timer()
self._timeout_value = None
self._plugin_manager.send_plugin_message(self._identifier, dict(powerOffWhenIdle=self.powerOffWhenIdle, type="timeout", timeout_value=self._timeout_value))
if event == Events.PRINT_STARTED and self._countdown_active:
for plug in self._settings.get(["arrSmartplugs"]):
if plug["useCountdownRules"] and int(plug["countdownOffDelay"]) > 0:
if "/" in plug["ip"]:
plug_ip, plug_num = plug["ip"].split("/")
else:
plug_ip = plug["ip"]
matched = pattern.match(line)
if matched:
groupIndex = layerExpression.groupIndex
if layerExpression.type_countable:
self._currentLayerCount = self._currentLayerCount + 1
currentLayer = str(self._currentLayerCount)
else:
currentLayer = str(matched.group(groupIndex))
line = line + LAYER_MESSAGE_PREFIX + currentLayer + "\r\n"
return line
class DisplaylayerprogressPlugin(
octoprint.plugin.StartupPlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
# my stuff
octoprint.plugin.EventHandlerPlugin,
octoprint.plugin.ProgressPlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.BlueprintPlugin
):
# VAR
_tempCurrentHeightFromFile = 0.0
_tempCurrentTotalHeight = 0.0
_currentLayerCount = 0
_layerTotalCount = NOT_PRESENT
_currentLayer = NOT_PRESENT
_progress = str(0)
_currentHeight = NOT_PRESENT
_totalHeight = NOT_PRESENT
# coding=utf-8
from __future__ import absolute_import
import octoprint.plugin
import serial
import binascii
from threading import Thread
import time
from time import sleep
class OctoremotePlugin(octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.StartupPlugin,
octoprint.plugin.ShutdownPlugin):
##~~ SettingsPlugin mixin
def get_settings_defaults(self):
return dict(
comport="COM3",
baudrate=115200,
baudrateOpt=[9600, 19200, 115200],
extrusionAmount=5,
retractionAmount=5,
numberOfTools=3,
movementSteps=[0.1, 1, 10, 100],
movementStep1=0.1,
movementStep2=1,
movementStep3=10,
movementStep4=100,
userCommand1 = "",
from __future__ import absolute_import
from octoprint.events import Events
# import sys
import time
import math
import os
import subprocess
import octoprint.plugin
import re
import logging
import json
import flask
class BetterGrblSupportPlugin(octoprint.plugin.SettingsPlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.StartupPlugin,
octoprint.plugin.EventHandlerPlugin):
def __init__(self):
self.hideTempTab = True
self.hideControlTab = True
self.hideGCodeTab = True
self.customControls = False
self.helloCommand = "M5"
self.statusCommand = "?$G"
self.dwellCommand = "G4 P0"
self.positionCommand = "?"
self.suppressM114 = True
import octoprint.plugin
class OctoscreenPlugin(octoprint.plugin.StartupPlugin,octoprint.plugin.EventHandlerPlugin,octoprint.plugin.SimpleApiPlugin):
_repeat_timer = None
def __init__(self):
self.mqtt_publish = lambda *args, **kwargs: None
self.mqtt_subscribe = lambda *args, **kwargs: None
self.mqtt_unsubscribe = lambda *args, **kwargs: None
def get_api_commands(self):
return dict(dummy=[])
def on_api_get(self, request):
from flask import jsonify
from octoprint.server import fileManager
import octoprint.filemanager
import octoprint.filemanager.util
import octoprint.filemanager.storage