Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parse_transaction_log(registry_hive, hive_path, transaction_log_path):
log_size = os.path.getsize(transaction_log_path)
logger.info(f'Log Size: {log_size}')
expected_sequence_number = registry_hive.header.secondary_sequence_num
with open(transaction_log_path, 'rb') as transaction_log:
# Skip the REGF header
transaction_log.seek(512, 0)
# Read the header of the transaction log vector and determine its type
with boomerang_stream(transaction_log) as s:
magic = s.read(4)
if magic == HVLE_TRANSACTION_LOG_MAGIC:
# This is an HvLE block
restored_hive_buffer, recovered_dirty_pages_count = _parse_hvle_block(hive_path, transaction_log, log_size,
expected_sequence_number)
elif magic == DIRT_TRANSACTION_LOG_MAGIC:
# This is an old transaction log - DIRT
hbins_data_size = registry_hive.header.hive_bins_data_size
restored_hive_buffer, recovered_dirty_pages_count = _parse_dirt_block(hive_path, transaction_log,
hbins_data_size)
else:
raise RegistryRecoveryException(f'The transaction log vector magic was not expected: {magic}')
return restored_hive_buffer, recovered_dirty_pages_count
"""
:param hive_path:
:param transaction_log_stream:
:param log_size:
:param expected_sequence_number:
:return:
"""
recovered_dirty_pages_count = 0
restored_hive_buffer = BytesIO(open(hive_path, 'rb').read())
hvle_block_start_offset = transaction_log_stream.tell()
while hvle_block_start_offset < log_size:
logger.info(f'Parsing hvle block at {hvle_block_start_offset}')
with boomerang_stream(transaction_log_stream) as x:
if x.read(4) != b'HvLE':
logger.info('Reached a non HvLE object. stopping')
break
parsed_hvle_block = TRANSACTION_LOG.parse_stream(transaction_log_stream)
logger.info(f'Currently at start of dirty pages: {transaction_log_stream.tell()}')
logger.info(f'seq number: {parsed_hvle_block.sequence_number}')
logger.info(f'dirty pages: {parsed_hvle_block.dirty_pages_count}')
if parsed_hvle_block.sequence_number == expected_sequence_number:
logger.info(f'This hvle block holds valid dirty blocks')
expected_sequence_number += 1
for dirty_page_entry in parsed_hvle_block.dirty_pages_references:
# Write the actual dirty page to the original hive
target_offset = REGF_HEADER_SIZE + dirty_page_entry.offset