Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def createFilePreProcessor(self, path, file_object, blinks=None, printer_profile=None, allow_overwrite=True, *args,
**kwargs):
fileName = file_object.filename
if not octoprint.filemanager.valid_file_type(fileName, type="gcode"):
return file_object
addLayerIndicators = self._settings.get_boolean([SETTINGS_KEY_ADD_LAYER_INDICATORS])
if addLayerIndicators == False:
return file_object
# check filesize
# filePath = file_object.path
# if (filePath != None):
# fileSize = os.path.getsize(filePath)
# if fileSize > (50 *1024 *1024):
# # send notification to the user, file is to big
# # start extra thread for processing
# return file_object
self._logger.info("FilePreProcessor. Checking LayerExpressions.")
result = self._checkLayerExpressionValid()
UPDATE_DISPLAY_REASON_FRONTEND_CALL = "frontEndCall"
UPDATE_DISPLAY_REASON_HEIGHT_CHANGED = "heightChanged"
UPDATE_DISPLAY_REASON_PROGRESS_CHANGED = "progressChanged"
UPDATE_DISPLAY_REASON_LAYER_CHANGED = "layerChanged"
UPDATE_DISPLAY_REASON_FEEDRATE_CHANGED = "feedrateChanged"
UPDATE_DISPLAY_REASON_FANSPEED_CHANGED = "fanspeedChanged"
# Same as setup.py 'plugin_identifier'
PLUGIN_KEY_PREFIX = "DisplayLayerProgress_"
MOVEMENT_ABSOLUTE = "g90_abs"
MOVEMENT_RELATIVE = "g91_real"
class LayerDetectorFileProcessor(octoprint.filemanager.util.LineProcessorStream):
def __init__(self, fileBufferedReader, allLayerExpressions, logger):
super(LayerDetectorFileProcessor, self).__init__(fileBufferedReader)
self._allLayerExpressions = allLayerExpressions
self._currentLayerCount = 0
self._logger = logger
def process_line(self, origLine):
if not len(origLine):
return None
# line = origLine.decode('utf-8') # convert byte -> str
# line = stringUtils.to_native_str(origLine)
# print (origLine)
isBytesLineForPy3 = type(origLine) is bytes and not (type(origLine) is str)
# if (isBytesLineForPy3):
if self._settings.get_boolean(["use_fallback_when_no_file_selected"]):
temperatures = self.get_fallback_temperatures()
else:
raise PreheatError("No gcode file loaded.")
elif self._printer.get_current_job()["file"]["origin"] == octoprint.filemanager.FileDestinations.SDCARD:
temperatures = self.get_fallback_temperatures()
if len(temperatures) == 0:
raise PreheatError("Can't read the temperature from a gcode file stored on the SD card.")
else:
self._logger.info("Can't read the temperatures from the SD card, using fallback temperatures.")
else:
file_name = self._printer.get_current_job()["file"]["path"]
path_on_disk = octoprint.server.fileManager.path_on_disk(octoprint.filemanager.FileDestinations.LOCAL, file_name)
temperatures = self.read_temperatures_from_file(path_on_disk)
if len(temperatures) == 0:
temperatures = self.get_fallback_temperatures()
if len(temperatures) == 0:
raise PreheatError("Could not find any preheat commands in the gcode file. You can configure fallback temperatures for this case.")
else:
self._logger.info("Could not find any preheat commands in the gcode file, using fallback temperatures.")
offsets = self._printer.get_current_data()["offsets"]
for tool in temperatures:
if tool in offsets:
temperatures[tool] += offsets[tool]
return temperatures
import logging
import logging.handlers
import octoprint.plugin
import octoprint.filemanager
import octoprint.filemanager.util
import octoprint.printer
import octoprint.util
import re, os, sys
import flask
import time
from flask.ext.login import current_user
from octoprint.events import Events
from octoprint.filemanager import FileDestinations
class ModifyComments(octoprint.filemanager.util.LineProcessorStream):
def __init__(self, fileBufferedReader, object_regex, reptag):
super(ModifyComments, self).__init__(fileBufferedReader)
self.patterns = []
for each in object_regex:
if each["objreg"]:
regex = re.compile(each["objreg"])
self.patterns.append(regex)
self._reptag = "@{0}".format(reptag)
def process_line(self, line):
try:
# if line is of type bytes then convert to string
line = line.decode("utf-8", "strict")
except (UnicodeDecodeError, AttributeError):
pass
def get_temperatures(self):
if not self._settings.get_boolean(["enable_bed"]) and \
not self._settings.get_boolean(["enable_tool"]) and \
not self._settings.get_boolean(["enable_chamber"]):
raise PreheatError("Preheating is disabled in the plugin settings.")
if (self._printer.get_current_job()["file"]["path"] == None):
if self._settings.get_boolean(["use_fallback_when_no_file_selected"]):
temperatures = self.get_fallback_temperatures()
else:
raise PreheatError("No gcode file loaded.")
elif self._printer.get_current_job()["file"]["origin"] == octoprint.filemanager.FileDestinations.SDCARD:
temperatures = self.get_fallback_temperatures()
if len(temperatures) == 0:
raise PreheatError("Can't read the temperature from a gcode file stored on the SD card.")
else:
self._logger.info("Can't read the temperatures from the SD card, using fallback temperatures.")
else:
file_name = self._printer.get_current_job()["file"]["path"]
path_on_disk = octoprint.server.fileManager.path_on_disk(octoprint.filemanager.FileDestinations.LOCAL, file_name)
temperatures = self.read_temperatures_from_file(path_on_disk)
if len(temperatures) == 0:
temperatures = self.get_fallback_temperatures()
if len(temperatures) == 0:
raise PreheatError("Could not find any preheat commands in the gcode file. You can configure fallback temperatures for this case.")
name = file if fileType == "design" else file.printFileName
filepath = ("%s/%s" %(self.plugin._basefolder, name))
fileObject = octoprint.filemanager.util.DiskFileWrapper(name, filepath)
try:
self._file_manager.add_file(FileDestinations.LOCAL, name, fileObject, allow_overwrite=True)
if fileType == "printFile":
self.db.savePrintFile(file)
if printNow:
self.printFileIsDownloaded(file)
return None
except octoprint.filemanager.storage.StorageError as e:
if os.path.exists(filepath):
os.remove(filepath)
if e.code == octoprint.filemanager.storage.StorageError.INVALID_FILE:
payload = {
"id" : file.printFileId,
"type" : "error",
"reason" : e.code
}
self.bm.triggerEvent('onDownload', payload)
return None
elif e.code == octoprint.filemanager.storage.StorageError.ALREADY_EXISTS:
payload = {
"id" : file.printFileId,
"type" : "error",
"reason" : e.code
}
self.bm.triggerEvent('onDownload', payload)
return None
else:
def wrapAndSave(self, fileType, file, printNow=False):
name = file if fileType == "design" else file.printFileName
filepath = ("%s/%s" %(self.plugin._basefolder, name))
fileObject = octoprint.filemanager.util.DiskFileWrapper(name, filepath)
try:
self._file_manager.add_file(FileDestinations.LOCAL, name, fileObject, allow_overwrite=True)
if fileType == "printFile":
self.db.savePrintFile(file)
if printNow:
self.printFileIsDownloaded(file)
return None
except octoprint.filemanager.storage.StorageError as e:
if os.path.exists(filepath):
os.remove(filepath)
if e.code == octoprint.filemanager.storage.StorageError.INVALID_FILE:
payload = {
"id" : file.printFileId,
"type" : "error",
def modify_file(self, path, file_object, blinks=None, printer_profile=None, allow_overwrite=True, *args, **kwargs):
if not octoprint.filemanager.valid_file_type(path, type="gcode"):
return file_object
import os
name, _ = os.path.splitext(file_object.filename)
modfile = octoprint.filemanager.util.StreamWrapper(file_object.filename,
ModifyComments(file_object.stream(), self.object_regex,
self.reptag))
return modfile
# Return error
return flask.jsonify(dict(value = "Error"))
# Check if model location is invalid
if modelModified and (flask.request.values["Model Location"] != "local" and flask.request.values["Model Location"] != "sdcard") :
# Return error
return flask.jsonify(dict(value = "Error"))
# Set model location
if modelModified :
if flask.request.values["Model Location"] == "local" :
modelLocation = self._file_manager.path_on_disk(octoprint.filemanager.destinations.FileDestinations.LOCAL, flask.request.values["Model Path"] + flask.request.values["Model Name"]).replace('\\', '/')
elif flask.request.values["Model Location"] == "sdcard" :
modelLocation = self._file_manager.path_on_disk(octoprint.filemanager.destinations.FileDestinations.SDCARD, flask.request.values["Model Path"] + flask.request.values["Model Name"]).replace('\\', '/')
# Check if slicer profile, model, or printer profile doesn't exist
if (modelModified and not os.path.isfile(modelLocation)) or not self._printer_profile_manager.exists(flask.request.values["Printer Profile Name"]) :
# Return error
return flask.jsonify(dict(value = "Error"))
# Move original model to temporary location
if modelModified :
fd, modelTemp = tempfile.mkstemp()
os.close(fd)
shutil.copy(modelLocation, modelTemp)
fd, temp = tempfile.mkstemp()
os.close(fd)
def _get_metadata(self):
try:
self._metadata = self._file_manager.get_metadata(self._origin, self._path)
except octoprint.filemanager.NoSuchStorage:
#The metadata is not found or maybe not yet written.
self._metadata = None
if not self._metadata or not "analysis" in self._metadata or not "progress" in self._metadata["analysis"]:
self._progress = None
else:
self._progress = self._metadata["analysis"]["progress"]