Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class CbfsListing(Command):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._header_re = re.compile(r'^.*: ([^,]+, bootblocksize [0-9]+, romsize [0-9]+, offset 0x[0-9A-Fa-f]+)$')
@tool_required('cbfstool')
def cmdline(self):
return ['cbfstool', self.path, 'print']
def filter(self, line):
return self._header_re.sub('\\1', line.decode('utf-8')).encode('utf-8')
class CbfsContainer(Archive):
@tool_required('cbfstool')
def entries(self, path):
cmd = ['cbfstool', path, 'print']
output = subprocess.check_output(cmd, shell=False).decode('utf-8')
header = True
for line in output.rstrip('\n').split('\n'):
if header:
if line.startswith('Name'):
header = False
continue
name = line.split()[0]
if name == '(empty)':
continue
yield name
def open_archive(self):
return (self._mode, self._major, self._minor)
def is_device(self):
return True
SQUASHFS_LS_MAPPING = {
'd': SquashfsDirectory,
'l': SquashfsSymlink,
'c': SquashfsDevice,
'b': SquashfsDevice,
'-': SquashfsRegularFile
}
class SquashfsContainer(Archive):
@tool_required('unsquashfs')
def entries(self, path):
# We pass `-d ''` in order to get a listing with the names we actually
# need to use when extracting files
cmd = ['unsquashfs', '-d', '', '-lls', path]
output = subprocess.check_output(cmd, shell=False).decode('utf-8')
header = True
for line in output.rstrip('\n').split('\n'):
if header:
if line == '':
header = False
continue
if len(line) > 0 and line[0] in SQUASHFS_LS_MAPPING:
try:
cls = SQUASHFS_LS_MAPPING[line[0]]
yield cls, cls.parse(line)
import re
import os.path
import logging
import subprocess
import collections
from diffoscope.tools import tool_required
from .utils.file import File
from .utils.archive import Archive
from .utils.filenames import get_compressed_content_name
logger = logging.getLogger(__name__)
class Bzip2Container(Archive):
def open_archive(self):
return self
def close_archive(self):
pass
def get_members(self):
return collections.OrderedDict({'bzip2-content': self.get_member(self.get_member_names()[0])})
def get_member_names(self):
return [get_compressed_content_name(self.source.path, '.bz2')]
@tool_required('bzip2')
def extract(self, member_name, dest_dir):
dest_path = os.path.join(dest_dir, member_name)
logger.debug('bzip2 extracting to %s', dest_path)
import re
import os.path
import logging
import subprocess
import collections
from diffoscope.tools import tool_required
from .utils.file import File
from .utils.archive import Archive
from .utils.filenames import get_compressed_content_name
logger = logging.getLogger(__name__)
class DexContainer(Archive):
@property
def path(self):
return self._path
def open_archive(self):
return self
def close_archive(self):
pass
def get_members(self):
return collections.OrderedDict({'dex-content': self.get_member(self.get_member_names()[0])})
def get_member_names(self):
return [get_compressed_content_name(self.source.path, '.dex') + '.jar']
import re
import os.path
import logging
import subprocess
import collections
from diffoscope.tools import tool_required
from .utils.file import File
from .utils.archive import Archive
from .utils.filenames import get_compressed_content_name
logger = logging.getLogger(__name__)
class XzContainer(Archive):
def open_archive(self):
return self
def close_archive(self):
pass
def get_members(self):
return collections.OrderedDict({'xz-content': self.get_member(self.get_member_names()[0])})
def get_member_names(self):
return [get_compressed_content_name(self.source.path, '.xz')]
@tool_required('xz')
def extract(self, member_name, dest_dir):
dest_path = os.path.join(dest_dir, member_name)
logger.debug('xz extracting to %s', dest_path)
import os.path
import logging
import subprocess
from diffoscope.tools import tool_required
from diffoscope.tempfiles import get_temporary_directory
from diffoscope.difference import Difference
from .utils.file import File
from .utils.archive import Archive
from .zip import Zipinfo, ZipinfoVerbose
logger = logging.getLogger(__name__)
class ApkContainer(Archive):
@property
def path(self):
return self._path
@tool_required('apktool')
def open_archive(self):
self._members = []
self._unpacked = os.path.join(
get_temporary_directory().name,
os.path.basename(self.source.name),
)
logger.debug("Extracting %s to %s", self.source.name, self._unpacked)
subprocess.check_call((
'apktool', 'd', '-k', '-m', '-o', self._unpacked, self.source.path,
class LibarchiveDevice(Device, LibarchiveMember):
def __init__(self, container, entry):
LibarchiveMember.__init__(self, container, entry)
self._mode = entry.mode
self._major = entry.rdevmajor
self._minor = entry.rdevminor
def get_device(self):
return (self._mode, self._major, self._minor)
def is_device(self):
return True
class LibarchiveContainer(Archive):
def open_archive(self):
# libarchive is very very stream oriented an not for random access
# so we are going to reopen the archive everytime
# not nice, but it'll work
return True
def close_archive(self):
pass
def get_member_names(self):
self.ensure_unpacked()
return self._members.keys()
def extract(self, member_name, dest_dir):
self.ensure_unpacked()
return self._members[member_name]
s.write(u"\n")
return s.getvalue()
def compare_rpm_headers(path1, path2):
# compare headers
with get_temporary_directory() as rpmdb_dir:
rpm.addMacro("_dbpath", rpmdb_dir)
ts = rpm.TransactionSet()
ts.setVSFlags(-1)
header1 = get_rpm_header(path1, ts)
header2 = get_rpm_header(path2, ts)
return Difference.from_text(header1, header2, path1, path2, source="header")
class RpmContainer(Archive):
def open_archive(self):
return self
def close_archive(self):
pass
def get_member_names(self):
return ['content']
@tool_required('rpm2cpio')
def extract(self, member_name, dest_dir):
assert member_name == 'content'
dest_path = os.path.join(dest_dir, 'content')
cmd = ['rpm2cpio', self.source.path]
with open(dest_path, 'wb') as dest:
subprocess.check_call(cmd, shell=False, stdout=dest, stderr=subprocess.PIPE)
return None
def has_same_content_as(self, other):
return False
def is_directory(self):
return True
def get_member_names(self):
raise ValueError("Zip archives are compared as a whole.") # noqa
def get_member(self, member_name):
raise ValueError("Zip archives are compared as a whole.") # noqa
class ZipContainer(Archive):
def open_archive(self):
return zipfile.ZipFile(self.source.path, 'r')
def close_archive(self):
self.archive.close()
def get_member_names(self):
return self.archive.namelist()
def extract(self, member_name, dest_dir):
# We don't really want to crash if the filename in the zip archive
# can't be encoded using the filesystem encoding. So let's replace
# any weird character so we can get to the bytes.
targetpath = os.path.join(dest_dir, os.path.basename(member_name)).encode(sys.getfilesystemencoding(), errors='replace')
with self.archive.open(member_name) as source, open(targetpath, 'wb') as target:
shutil.copyfileobj(source, target)
import logging
import subprocess
import collections
from diffoscope.tools import tool_required
from diffoscope.difference import Difference
from .utils.file import File
from .utils.archive import Archive
from .utils.filenames import get_compressed_content_name
logger = logging.getLogger(__name__)
class GzipContainer(Archive):
def open_archive(self):
return self
def close_archive(self):
pass
def get_members(self):
return collections.OrderedDict({'gzip-content': self.get_member(self.get_member_names()[0])})
def get_member_names(self):
return [get_compressed_content_name(self.source.path, '.gz')]
@tool_required('gzip')
def extract(self, member_name, dest_dir):
dest_path = os.path.join(dest_dir, member_name)
logger.debug('gzip extracting to %s', dest_path)