Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Represents a registry hive
:param hive_path: Path to the registry hive
:param hive_type: The hive type can be specified if this is a partial hive,
or for some other reason regipy cannot identify the hive type
:param partial_hive_path: The path from which the partial hive actually starts, for example:
hive_type=ntuser partial_hive_path="/Software" would mean
this is actually a HKCU hive, starting from HKCU/Software
"""
self.partial_hive_path = None
self.hive_type = None
with open(hive_path, 'rb') as f:
self._stream = BytesIO(f.read())
with boomerang_stream(self._stream) as s:
self.header = REGF_HEADER.parse_stream(s)
# Get the first cell in root HBin, which is the root NKRecord:
root_hbin = self.get_hbin_at_offset()
root_hbin_cell = next(root_hbin.iter_cells(s))
self.root = NKRecord(root_hbin_cell, s)
self.name = self.header.file_name
if hive_type:
if hive_type.lower() in SUPPORTED_HIVE_TYPES:
self.hive_type = hive_type
else:
raise UnidentifiedHiveException(f'{hive_type} is not a supported hive type: '
f'only the following are supported: {SUPPORTED_HIVE_TYPES}')
else:
try:
Get the values of a subkey. Will raise if no values exist
:param as_json: Whether to normalize the data as JSON or not
:param max_len: Max length of value to return
:return: List of values for the subkey
"""
if not self.values_count:
return
# Get the offset of the values key. We skip 4 because of Cell Header
target_offset = REGF_HEADER_SIZE + 4 + self.header.values_list_offset
self._stream.seek(target_offset)
for _ in range(self.values_count):
is_corrupted = False
vk_offset = Int32ul.parse_stream(self._stream)
with boomerang_stream(self._stream) as substream:
actual_vk_offset = REGF_HEADER_SIZE + 4 + vk_offset
substream.seek(actual_vk_offset)
try:
vk = VALUE_KEY.parse_stream(substream)
except ConstError:
logger.error(f'Could not parse VK at {substream.tell()}, registry hive is probably corrupted.')
return
value = self.read_value(vk, substream)
if vk.name_size == 0:
value_name = '(default)'
else:
value_name = vk.name.decode(errors='replace')
# If the value is bigger than this value, it means this is a DEVPROP structure
def _parse_indirect_block(stream, value):
# This is an indirect datablock (Bigger than 16344, therefor we handle it differently)
# The value inside the vk entry actually contains a pointer to the buffers containing the data
big_data_block_header = BIG_DATA_BLOCK.parse(value.value)
# Go to the start of the segment offset list
stream.seek(REGF_HEADER_SIZE + big_data_block_header.offset_to_list_of_segments)
buffer = BytesIO()
# Read them sequentially until we got all the size of the VK
value_size = value.size
while value_size > 0:
data_segment_offset = Int32ul.parse_stream(stream)
with boomerang_stream(stream) as tmpstream:
tmpstream.seek(REGF_HEADER_SIZE + 4 + data_segment_offset)
tmpbuffer = tmpstream.read(min(0x3fd8, value_size))
value_size -= len(tmpbuffer)
buffer.write(tmpbuffer)
buffer.seek(0)
return buffer.read()