Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# coding=utf-8
from __future__ import absolute_import
import octoprint.plugin
import re
class bedlevelvisualizer(octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.SettingsPlugin):
def __init__(self):
self.processing = False
self.mesh = []
##~~ SettingsPlugin
def get_settings_defaults(self):
return dict(command="G28\nG29 T",flipX=False,flipY=False,stripFirst=False,stored_mesh=[],save_mesh=True,prusa_mode=False,mesh_timestamp="",report_flag="",report_types=["Bed Topography Report:","Bed Topography Report for CSV:","Bilinear Leveling Grid:","Subdivided with CATMULL ROM Leveling Grid:","Measured points:"])
##~~ StartupPlugin
def on_after_startup(self):
self._logger.info("OctoPrint-BedLevelVisualizer loaded!")
# coding=utf-8
from __future__ import absolute_import
import octoprint.plugin
from octoprint.server import user_permission
import docker
class youtubelive(octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.EventHandlerPlugin):
def __init__(self):
self.client = docker.from_env()
self.container = None
##~~ StartupPlugin
def on_after_startup(self):
self._logger.info("OctoPrint-YouTubeLive loaded! Checking stream status.")
try:
self.container = self.client.containers.get('YouTubeLive')
# coding=utf-8
from __future__ import absolute_import
### (Don't forget to remove me)
# This is a basic skeleton for your plugin's __init__.py. You probably want to adjust the class name of your plugin
# as well as the plugin mixins it's subclassing from. This is really just a basic skeleton to get you started,
# defining your plugin as a template plugin, settings and asset plugin. Feel free to add or remove mixins
# as necessary.
#
# Take a look at the documentation on what other plugin mixins are available.
import octoprint.plugin
from octoprint.server import user_permission
import docker
class WebcamStreamerPlugin(octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.EventHandlerPlugin):
def __init__(self):
# Docker connection and container object
self.client = None
self.image = None
self.container = None
self.frame_rate_default = 5
self.ffmpeg_cmd_default = (
"ffmpeg -re -f mjpeg -framerate 5 -i {webcam_url} " # Video input
"-ar 44100 -ac 2 -acodec pcm_s16le -f s16le -ac 2 -i /dev/zero " # Audio input
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# coding=utf-8
from __future__ import absolute_import
import octoprint.plugin
from octoprint.util import RepeatedTimer
from easyprocess import EasyProcess
class TemperatureFailsafe(octoprint.plugin.AssetPlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.ShutdownPlugin,
octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin):
def __init__(self):
self._checkTempTimer = None
def _restartTimer(self):
# stop the timer
if self._checkTempTimer:
self._logger.debug(u"Stopping Timer...")
self._checkTempTimer.cancel()
self._checkTempTimer = None
# start a new timer
interval = self._settings.get_int(['interval'])
if self._settings.get_boolean(['enabled']) and interval:
self._logger.debug(u"Starting Timer...")
m = self.Z_COORD_RE.match(line)
if m:
z = float(m.groups()[0])
m = self.E_COORD_RE.match(line)
if m:
e = float(m.groups()[0])
m = self.SPEED_VAL_RE.match(line)
if m:
speed = float(m.groups()[0])
return x, y, z, e, speed
class CancelobjectPlugin(octoprint.plugin.StartupPlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.EventHandlerPlugin):
def __init__(self):
# self._logger = logging.getLogger("octoprint.plugins.cancelobject")
self.object_list = []
self.skipping = False
self.startskip = False
self.endskip = False
self.active_object = None
self.object_regex = []
self.reptag = None
self.ignored = []
import octoprint.plugin
import httplib2
import octoprint.server.util.flask
from octoprint.server import admin_permission
from octoprint.events import Events
from subprocess import call, Popen, PIPE
import threading
import time
import re
class BigBoxFirmwarePlugin(octoprint.plugin.BlueprintPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.EventHandlerPlugin,
octoprint.plugin.StartupPlugin):
def __init__(self):
self.templates = ('Configuration.h', 'Configuration_adv.h', 'pins_RUMBA.h')
self.depList = ['avr-libc', 'avrdude', 'make']
self.depInstalled = False
def on_startup(self, host, port):
self.depInstalled = self.check_dep()
self.getDefLib()
def on_after_startup(self):
dataFolder = self.get_plugin_data_folder()
profileFolder = dataFolder + '/profiles'
defaultProfileFolder = self._basefolder + '/default_profiles'
if not os.path.isdir(profileFolder):
for parameter in valid_commands[command]:
if not parameter in data:
return None, None, make_response("Mandatory parameter %s missing for command %s" % (parameter, command), 400)
return command, data, None
def create_ws_token(public_key = None, api_key = None):
from itsdangerous import URLSafeTimedSerializer
s = URLSafeTimedSerializer(api_key)
return s.dumps({ 'public_key': public_key })
class AstroprintPlugin(octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.BlueprintPlugin,
octoprint.plugin.EventHandlerPlugin,
octoprint.printer.PrinterCallback):
##~~ SettingsPlugin mixin
def initialize(self):
self.user = {}
self.designs = None
self.db = None
self.astroprintCloud = None
self.cameraManager = None
self.materialCounter= None
self._printerListener = None
to the decorated function such that each call is less than 5 seconds apart,
only the last one will happen."""
def new_decorator(f, *args, **kwargs):
def to_do_later(*args, **kwargs):
if to_do_later.__timer is not None:
to_do_later.__timer.cancel()
to_do_later.__timer = Timer(seconds, f, args, kwargs)
to_do_later.__timer.start()
to_do_later.__timer = None
return to_do_later
return new_decorator
class PrintTimeGeniusPlugin(octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.StartupPlugin,
octoprint.plugin.ShutdownPlugin,
octoprint.plugin.EventHandlerPlugin,
octoprint.plugin.BlueprintPlugin):
def __init__(self):
self._logger = logging.getLogger(__name__)
self._current_history = {}
dd = lambda: defaultdict(dd)
self._current_config = PrinterConfig() # dict of timing-relevant config commands
self._old_printer_config = self._current_config.as_list() # Cache of the config that is on disk.
##~~ SettingsPlugin mixin
def get_settings_defaults(self):
current_path = os.path.dirname(os.path.realpath(__file__))
built_in_analyzers = [
(gettext("All gcode analyzers (usually not as good as marlin-calc)"),
'"{{python}}" "{analyzer}" "{{{{gcode}}}}"'.format(
# coding=utf-8
from __future__ import absolute_import
import octoprint.plugin
import octoprint.events
import time
import threading
from .piglow import PiGlow
class OctoGlowPlugin(octoprint.plugin.EventHandlerPlugin,
octoprint.plugin.ProgressPlugin,
octoprint.plugin.StartupPlugin):
"""
Plugin for animating the LEDs on a PiGlow board based on OctoPrint events.
"""
def __init__(self):
self._lock = threading.Lock()
self._currentAnimation = None
self._printProgress = 0
def on_after_startup(self):
"""
Callback for just after launch of OctoPrint.
"""
self._piglow = PiGlow()
self._piglow.all(0)
self._animator = threading.Thread(target=self._animate)
# coding=utf-8
from __future__ import absolute_import
from subprocess import check_output
from subprocess import call, Popen, PIPE
from octoprint.settings import settings, valid_boolean_trues
import flask
import octoprint.plugin
import os
import re
class UsbcontrolPlugin(octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.SimpleApiPlugin,
octoprint.plugin.StartupPlugin,
octoprint.plugin.TemplatePlugin):
def get_settings_defaults(self):
s = settings()
return dict(
usb2 = True,
usb3 = True if s.get(["plugins", "usbcontrol", "isRaspi3Bplus"]) else False,
usb4 = True if s.get(["plugins", "usbcontrol", "isRaspi3Bplus"]) else False,
usb5 = True if s.get(["plugins", "usbcontrol", "isRaspi3Bplus"]) else False,
all = True,
init = False,
isRaspi2B = False,
isRaspi3B = False,
isRaspi3Bplus = False,
cpuRevision = 'unknown',
piModel = 'unknown'