Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def paint_webp(self, img_data, x, y, width, height, options, callbacks):
dec_webp = get_codec("dec_webp")
if dec_webp:
return self.paint_webp_using_cwebp(img_data, x, y, width, height, options, callbacks)
return self.paint_webp_using_webm(img_data, x, y, width, height, options, callbacks)
def init_encodings(self):
#core encodings: all the specific encoding formats we can encode:
self.core_encodings = ["rgb24", "rgb32"]
#encodings: the format families we can encode (same as core, except for rgb):
self.encodings = ["rgb"]
def add_encodings(encodings):
for e in encodings:
if e not in self.encodings:
self.encodings.append(e)
if e not in self.core_encodings:
self.core_encodings.append(e)
#video encoders (actual encodings supported are queried):
for codec_name in ("enc_vpx", "enc_x264", "enc_nvenc"):
codec = get_codec(codec_name)
if codec:
#codec.get_type() #ie: "vpx", "x264" or "nvenc"
log("init_encodings() codec %s found, adding: %s", codec.get_type(), codec.get_encodings())
add_encodings(codec.get_encodings()) #ie: ["vp8"] or ["h264"]
for module, encodings in {
"enc_webp" : ["webp"],
"PIL" : ["png", "png/L", "png/P", "jpeg"],
}.items():
if not has_codec(module):
log("init_encodings() codec module %s is missing, not adding: %s", module, encodings)
continue
add_encodings(encodings)
self.lossless_encodings = [x for x in self.core_encodings if (x.startswith("png") or x.startswith("rgb"))]
self.lossless_mode_encodings = []
def webp_encode(self, coding, image, options):
enc_webp = get_codec("enc_webp")
webp_handlers = get_codec("webp_bitmap_handlers")
assert enc_webp and webp_handlers, "webp components are missing"
BitmapHandler = webp_handlers.BitmapHandler
handler_encs = {
"RGB" : (BitmapHandler.RGB, "EncodeRGB", "EncodeLosslessRGB", False),
"BGR" : (BitmapHandler.BGR, "EncodeBGR", "EncodeLosslessBGR", False),
"RGBA": (BitmapHandler.RGBA, "EncodeRGBA", "EncodeLosslessRGBA", True),
"RGBX": (BitmapHandler.RGBA, "EncodeRGBA", "EncodeLosslessRGBA", False),
"BGRA": (BitmapHandler.BGRA, "EncodeBGRA", "EncodeLosslessBGRA", True),
"BGRX": (BitmapHandler.BGRA, "EncodeBGRA", "EncodeLosslessBGRA", False),
}
pixel_format = image.get_pixel_format()
h_e = handler_encs.get(pixel_format)
assert h_e is not None, "cannot handle rgb format %s with webp!" % pixel_format
bh, lossy_enc, lossless_enc, has_alpha = h_e
q = self.get_current_quality()
def paint_with_video_decoder(self, decoder_name, coding, img_data, x, y, width, height, options, callbacks):
assert x==0 and y==0
decoder_module = get_codec(decoder_name)
assert decoder_module, "decoder module not found for %s" % decoder_name
assert hasattr(decoder_module, "Decoder"), "decoder module %s does not have 'Decoder' factory function!" % decoder_module
assert hasattr(decoder_module, "get_colorspaces"), "decoder module %s does not have 'get_colorspaces' function!" % decoder_module
factory = getattr(decoder_module, "Decoder")
get_colorspaces = getattr(decoder_module, "get_colorspaces")
try:
self._decoder_lock.acquire()
if self._backing is None:
log("window %s is already gone!", self.wid)
fire_paint_callbacks(callbacks, False)
return False
enc_width, enc_height = options.get("scaled_size", (width, height))
input_colorspace = options.get("csc")
if not input_colorspace:
# Backwards compatibility with pre 0.10.x clients
# We used to specify the colorspace as an avutil PixelFormat constant
def init_csc_option(self, csc_name):
csc_module = get_codec(csc_name)
debug("init_csc_option(%s) module=%s", csc_name, csc_module)
if csc_module is None:
return
csc_type = csc_module.get_type()
try:
csc_module.init_module()
except Exception, e:
log.warn("cannot use %s module %s: %s", csc_type, csc_module, e, exc_info=True)
return
in_cscs = csc_module.get_input_colorspaces()
for in_csc in in_cscs:
csc_specs = self._csc_encoder_specs.setdefault(in_csc, [])
out_cscs = csc_module.get_output_colorspaces(in_csc)
debug("init_csc_option(..) %s.get_output_colorspaces(%s)=%s", csc_module.get_type(), in_csc, out_cscs)
for out_csc in out_cscs:
spec = csc_module.get_spec(in_csc, out_csc)
def paint_webp(self, img_data, x, y, width, height, options, callbacks):
""" can be called from any thread """
dec_webp = get_codec("dec_webp")
assert dec_webp is not None, "webp decoder not found"
if options.get("has_alpha", False):
decode = dec_webp.DecodeRGBA
rowstride = width*4
paint_rgb = self.do_paint_rgb32
else:
decode = dec_webp.DecodeRGB
rowstride = width*3
paint_rgb = self.do_paint_rgb24
if DRAW_DEBUG:
log.info("paint_webp(%s) using decode=%s, paint=%s",
("%s bytes" % len(img_data), x, y, width, height, options, callbacks), decode, paint_rgb)
rgb_data = decode(img_data)
pixels = str(rgb_data.bitmap)
self.idle_add(paint_rgb, pixels, x, y, width, height, rowstride, options, callbacks)
return False
def init_video_encoder_option(self, encoder_name):
encoder_module = get_codec(encoder_name)
debug("init_video_encoder_option(%s) module=%s", encoder_name, encoder_module)
if not encoder_module:
return
encoder_type = encoder_module.get_type()
try:
encoder_module.init_module()
except Exception, e:
log.warn("cannot use %s module %s: %s", encoder_type, encoder_module, e, exc_info=True)
return
colorspaces = encoder_module.get_colorspaces()
debug("init_video_encoder_option(%s) %s input colorspaces=%s", encoder_module, encoder_type, colorspaces)
encodings = encoder_module.get_encodings()
debug("init_video_encoder_option(%s) %s encodings=%s", encoder_module, encoder_type, encodings)
for encoding in encodings:
encoder_specs = self._video_encoder_specs.setdefault(encoding, {})
for colorspace in colorspaces:
def init_encodings(self):
def add_encodings(encodings):
for ce in encodings:
e = {"rgb32" : "rgb", "rgb24" : "rgb"}.get(ce, ce)
if e not in self.encodings:
self.encodings.append(e)
if ce not in self.core_encodings:
self.core_encodings.append(ce)
add_encodings(["rgb24", "rgb32"])
#video encoders (empty when first called - see threaded_init)
add_encodings(getVideoHelper().get_encodings()) #ie: ["vp8", "h264"]
#Pithon Imaging Libary:
PIL = get_codec("PIL")
if PIL:
add_encodings(get_PIL_encodings(PIL))
#webp: only check for "enc_webm" because "enc_webp" needs a fallback (either "PIL" or "enc_webm")
if has_codec("enc_webm"):
add_encodings(["webp"])
self.lossless_encodings = [x for x in self.core_encodings if (x.startswith("png") or x.startswith("rgb"))]
self.lossless_mode_encodings = []
if has_codec("enc_webm_lossless"):
self.lossless_mode_encodings.append("webp")
self.lossless_encodings.append("webp")
self.default_encoding = [x for x in PREFERED_ENCODING_ORDER if x in self.encodings][0]
def paint_webp(self, img_data, x, y, width, height, options, callbacks):
""" can be called from any thread """
dec_webp = get_codec("dec_webp")
assert dec_webp is not None, "webp decoder not found"
if options.get("has_alpha", False):
decode = dec_webp.DecodeRGBA
rowstride = width*4
paint_rgb = self.do_paint_rgb32
else:
decode = dec_webp.DecodeRGB
rowstride = width*3
paint_rgb = self.do_paint_rgb24
log("paint_webp(%s) using decode=%s, paint=%s",
("%s bytes" % len(img_data), x, y, width, height, options, callbacks), decode, paint_rgb)
rgb_data = decode(img_data)
pixels = str(rgb_data.bitmap)
self.idle_add(paint_rgb, pixels, x, y, width, height, rowstride, options, callbacks)
return False
def init_encoders(self):
self._encoders["rgb24"] = self.rgb_encode
self._encoders["rgb32"] = self.rgb_encode
for x in get_PIL_encodings(get_codec("PIL")):
if x in self.server_core_encodings:
self._encoders[x] = self.PIL_encode
#prefer this one over PIL supplied version:
if "webp" in self.server_core_encodings:
self._encoders["webp"] = self.webp_encode
if self._mmap and self._mmap_size>0:
self._encoders["mmap"] = self.mmap_encode