Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Use it to get the serial number.
page = OggPage(fileobj)
while not page.packets[0].startswith(b"Speex "):
page = OggPage(fileobj)
# Look for the next page with that serial number, it'll start
# the comment packet.
serial = page.serial
page = OggPage(fileobj)
while page.serial != serial:
page = OggPage(fileobj)
# Then find all the pages with the comment packet.
old_pages = [page]
while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1):
page = OggPage(fileobj)
if page.serial == old_pages[0].serial:
old_pages.append(page)
packets = OggPage.to_packets(old_pages, strict=False)
content_size = get_size(fileobj) - len(packets[0]) # approx
vcomment_data = self.write(framing=False)
padding_left = len(packets[0]) - len(vcomment_data)
info = PaddingInfo(padding_left, content_size)
new_padding = info._get_padding(padding_func)
# Set the new comment packet.
packets[0] = vcomment_data + b"\x00" * new_padding
new_pages = OggPage._from_packets_try_preserve(packets, old_pages)
def __init__(self, fileobj):
page = OggPage(fileobj)
while not page.packets[0].startswith(b"OpusHead"):
page = OggPage(fileobj)
self.serial = page.serial
if not page.first:
raise OggOpusHeaderError(
"page has ID header, but doesn't start a stream")
(version, self.channels, pre_skip, orig_sample_rate, output_gain,
channel_map) = struct.unpack("> 4, version & 0xF
if page.size < default_size and len(page.packets) < 255:
page.packets[-1] += data
else:
# If we've put any packet data into this page yet,
# we need to mark it incomplete. However, we can
# also have just started this packet on an already
# full page, in which case, just start the new
# page with this packet.
if page.packets[-1]:
page.complete = False
if len(page.packets) == 1:
page.position = -1
else:
page.packets.pop(-1)
pages.append(page)
page = OggPage()
page.continued = not pages[-1].complete
page.sequence = pages[-1].sequence + 1
page.packets.append(data)
if len(packet) < wiggle_room:
page.packets[-1] += packet
packet = b""
if page.packets:
pages.append(page)
return pages
def __init__(self, fileobj):
page = OggPage(fileobj)
while not page.packets[0].startswith(b"OpusHead"):
page = OggPage(fileobj)
self.serial = page.serial
if not page.first:
raise OggOpusHeaderError(
"page has ID header, but doesn't start a stream")
(version, self.channels, pre_skip, orig_sample_rate, output_gain,
channel_map) = struct.unpack("> 4
def __get_comment_pages(self, fileobj, info):
# find the first tags page with the right serial
page = OggPage(fileobj)
while ((info.serial != page.serial) or
not page.packets[0].startswith(b"OpusTags")):
page = OggPage(fileobj)
# get all comment pages
pages = [page]
while not (pages[-1].complete or len(pages[-1].packets) > 1):
page = OggPage(fileobj)
if page.serial == pages[0].serial:
pages.append(page)
return pages
"""Write tag data into the Vorbis comment packet/page."""
# Find the old pages in the file; we'll need to remove them,
# plus grab any stray setup packet data out of them.
fileobj.seek(0)
page = OggPage(fileobj)
while not page.packets[0].startswith(b"\x03vorbis"):
page = OggPage(fileobj)
old_pages = [page]
while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1):
page = OggPage(fileobj)
if page.serial == old_pages[0].serial:
old_pages.append(page)
packets = OggPage.to_packets(old_pages, strict=False)
content_size = get_size(fileobj) - len(packets[0]) # approx
vcomment_data = b"\x03vorbis" + self.write()
padding_left = len(packets[0]) - len(vcomment_data)
info = PaddingInfo(padding_left, content_size)
new_padding = info._get_padding(padding_func)
# Set the new comment packet.
packets[0] = vcomment_data + b"\x00" * new_padding
new_pages = OggPage._from_packets_try_preserve(packets, old_pages)
OggPage.replace(fileobj, old_pages, new_pages)
packets = OggPage.to_packets(old_pages)
vcomment_data = b"OpusTags" + self.write(framing=False)
if self._pad_data:
# if we have padding data to preserver we can't add more padding
# as long as we don't know the structure of what follows
packets[0] = vcomment_data + self._pad_data
else:
content_size = get_size(fileobj) - len(packets[0]) # approx
padding_left = len(packets[0]) - len(vcomment_data)
info = PaddingInfo(padding_left, content_size)
new_padding = info._get_padding(padding_func)
packets[0] = vcomment_data + b"\x00" * new_padding
new_pages = OggPage._from_packets_try_preserve(packets, old_pages)
OggPage.replace(fileobj, old_pages, new_pages)
def _inject(self, fileobj):
"""Write tag data into the Speex comment packet/page."""
fileobj.seek(0)
# Find the first header page, with the stream info.
# Use it to get the serial number.
page = OggPage(fileobj)
while not page.packets[0].startswith("Speex "):
page = OggPage(fileobj)
# Look for the next page with that serial number, it'll start
# the comment packet.
serial = page.serial
page = OggPage(fileobj)
while page.serial != serial:
page = OggPage(fileobj)
# Then find all the pages with the comment packet.
old_pages = [page]
while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1):
page = OggPage(fileobj)
if page.serial == old_pages[0].serial:
old_pages.append(page)
page = OggPage(fileobj)
old_pages = [page]
while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1):
page = OggPage(fileobj)
if page.serial == first_page.serial:
old_pages.append(page)
packets = OggPage.to_packets(old_pages, strict=False)
# Set the new comment block.
data = self.write(framing=False)
data = packets[0][:1] + struct.pack(">I", len(data))[-3:] + data
packets[0] = data
new_pages = OggPage.from_packets(packets, old_pages[0].sequence)
OggPage.replace(fileobj, old_pages, new_pages)
fileobj.seek(0)
page = OggPage(fileobj)
while not page.packets[0].startswith("\x81theora"):
page = OggPage(fileobj)
old_pages = [page]
while not (old_pages[-1].complete or len(old_pages[-1].packets) > 1):
page = OggPage(fileobj)
if page.serial == old_pages[0].serial:
old_pages.append(page)
packets = OggPage.to_packets(old_pages, strict=False)
packets[0] = "\x81theora" + self.write(framing=False)
new_pages = OggPage.from_packets(packets, old_pages[0].sequence)
OggPage.replace(fileobj, old_pages, new_pages)