Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
This returns a line like qute without --json-logging would produce.
Args:
colorized: If True, ANSI color codes will be embedded.
"""
r = logging.LogRecord(self.category, self.loglevel, '', self.line,
self.message, (), None)
# Patch some attributes of the LogRecord
if self.line is None:
r.line = 0
r.created = self.timestamp.timestamp()
r.msecs = self.msecs
r.module = self.module
r.funcName = self.function
format_str = log.EXTENDED_FMT
format_str = format_str.replace('{asctime:8}',
'{asctime:8}.{msecs:03.0f}')
# Mark expected errors with (expected) so it's less confusing for tests
# which expect errors but fail due to other errors.
if self.expected and self.loglevel > logging.INFO:
new_color = '{' + log.LOG_COLORS['DEBUG'] + '}'
format_str = format_str.replace('{log_color}', new_color)
format_str = re.sub(r'{levelname:(\d*)}',
# Leave away the padding because (expected) is
# longer anyway.
r'{levelname} (expected)', format_str)
formatter = log.ColoredFormatter(format_str, log.DATEFMT, '{',
use_colors=colorized)
result = formatter.format(r)
# Manually append the stringified traceback if one is present
def init():
"""Disable insecure SSL ciphers on old Qt versions."""
default_ciphers = QSslSocket.defaultCiphers()
log.init.debug("Default Qt ciphers: {}".format(
', '.join(c.name() for c in default_ciphers)))
good_ciphers = []
bad_ciphers = []
for cipher in default_ciphers:
if _is_secure_cipher(cipher):
good_ciphers.append(cipher)
else:
bad_ciphers.append(cipher)
log.init.debug("Disabling bad ciphers: {}".format(
', '.join(c.name() for c in bad_ciphers)))
QSslSocket.setDefaultCiphers(good_ciphers)
log.prompt.debug("Asking question {}, blocking {}, loops {}, queue "
"{}".format(question, blocking, self._loops,
self._queue))
if self._shutting_down:
# If we're currently shutting down we have to ignore this question
# to avoid segfaults - see
# https://github.com/qutebrowser/qutebrowser/issues/95
log.prompt.debug("Ignoring question because we're shutting down.")
question.abort()
return None
if self._question is not None and not blocking:
# We got an async question, but we're already busy with one, so we
# just queue it up for later.
log.prompt.debug("Adding {} to queue.".format(question))
self._queue.append(question)
return None
if blocking:
# If we're blocking we save the old question on the stack, so we
# can restore it after exec, if exec gets called multiple times.
log.prompt.debug("New question is blocking, saving {}".format(
self._question))
old_question = self._question
if old_question is not None:
old_question.interrupted = True
self._question = question
self.show_prompts.emit(question)
if blocking:
def _on_authentication_required(self, url, authenticator):
log.network.debug("Authentication requested for {}, netrc_used {}"
.format(url.toDisplayString(), self.data.netrc_used))
netrc_success = False
if not self.data.netrc_used:
self.data.netrc_used = True
netrc_success = shared.netrc_authentication(url, authenticator)
if not netrc_success:
log.network.debug("Asking for credentials")
answer = shared.authentication_required(
url, authenticator, abort_on=[self.abort_questions])
if not netrc_success and answer is None:
log.network.debug("Aborting auth")
try:
sip.assign(authenticator, QAuthenticator()) # type: ignore
except AttributeError:
# WORKAROUND for
# https://www.riverbankcomputing.com/pipermail/pyqt/2016-December/038400.html
self._show_error_page(url, "Authentication required")
return True # so on_reply_finished aborts
log.downloads.debug("{}: Handling redirect".format(self))
self._redirects += 1
new_request.setUrl(new_url)
old_reply = self._reply
assert old_reply is not None
old_reply.finished.disconnect(self._on_reply_finished)
self._read_timer.stop()
self._reply = None
if self.fileobj is not None:
self.fileobj.seek(0)
log.downloads.debug("redirected: {} -> {}".format(
old_reply.url(), new_request.url()))
new_reply = old_reply.manager().get(new_request)
self._init_reply(new_reply)
old_reply.deleteLater()
return True
world = int(script.jsworld)
if not 0 <= world <= qtutils.MAX_WORLD_ID:
log.greasemonkey.error(
"script {} has invalid value for '@qute-js-world'"
": {}, should be between 0 and {}"
.format(
script.name,
script.jsworld,
qtutils.MAX_WORLD_ID))
continue
except ValueError:
try:
world = _JS_WORLD_MAP[usertypes.JsWorld[
script.jsworld.lower()]]
except KeyError:
log.greasemonkey.error(
"script {} has invalid value for '@qute-js-world'"
": {}".format(script.name, script.jsworld))
continue
new_script.setWorldId(world)
new_script.setSourceCode(script.code())
new_script.setName("GM-{}".format(script.name))
new_script.setRunsOnSubFrames(script.runs_on_sub_frames)
# Override the @run-at value parsed by QWebEngineScript if desired.
if injection_point:
new_script.setInjectionPoint(injection_point)
elif script.needs_document_end_workaround():
log.greasemonkey.debug("Forcing @run-at document-end for {}"
.format(script.name))
new_script.setInjectionPoint(QWebEngineScript.DocumentReady)
def _connect_userjs_signals(self, frame):
"""Connect userjs related signals to `frame`.
Connect the signals used as triggers for injecting user
JavaScripts into the passed QWebFrame.
"""
log.greasemonkey.debug("Connecting to frame {} ({})"
.format(frame, frame.url().toDisplayString()))
frame.loadFinished.connect(
functools.partial(self._inject_userjs, frame))
def _load_state_geometry(self):
"""Load the geometry from the state file."""
try:
data = configfiles.state['geometry']['inspector']
geom = base64.b64decode(data, validate=True)
except KeyError:
# First start
pass
except binascii.Error:
log.misc.exception("Error while reading geometry")
else:
log.init.debug("Loading geometry from {}".format(geom))
ok = self.restoreGeometry(geom)
if not ok:
log.init.warning("Error while loading geometry.")
return
if self._socket is not None:
log.ipc.debug("Got new connection but ignoring it because we're "
"still handling another one (0x{:x}).".format(
id(self._socket)))
return
socket = self._server.nextPendingConnection()
if socket is None:
log.ipc.debug("No new connection to handle.")
return
log.ipc.debug("Client connected (socket 0x{:x}).".format(id(socket)))
self._timer.start()
self._socket = socket
socket.readyRead.connect(self.on_ready_read)
if socket.canReadLine():
log.ipc.debug("We can read a line immediately.")
self.on_ready_read()
socket.error.connect(self.on_error)
if socket.error() not in [QLocalSocket.UnknownSocketError,
QLocalSocket.PeerClosedError]:
log.ipc.debug("We got an error immediately.")
self.on_error(socket.error())
socket.disconnected.connect(self.on_disconnected)
if socket.state() == QLocalSocket.UnconnectedState:
log.ipc.debug("Socket was disconnected immediately.")
self.on_disconnected()
def on_destroyed(self, name):
"""Schedule removing of a destroyed QObject.
We don't remove the destroyed object immediately because it might still
be destroying its children, which might still use the object
registry.
"""
log.destroy.debug("schedule removal: {}".format(name))
QTimer.singleShot(0, functools.partial(self._on_destroyed, name))