Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ExtractDigitalSignature(SignedFile, SignatureFile=None):
"""Extracts the digital signature from file SignedFile
When SignatureFile is not None, writes the signature to SignatureFile
Returns the signature
"""
pe = pefile.PE(SignedFile)
address = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].VirtualAddress
size = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].Size
if address == 0:
print 'Error: source file not signed'
sys.exit(1)
return
signature = pe.write()[address + 8:]
if SignatureFile:
f = file(SignatureFile, 'wb+')
f.write(signature)
f.close()
return signature
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
if not exp.name:
name = f'ord{exp.ordinal}'
else:
name = exp.name
self.event['symbols']['exported'].append(name)
self.event['symbols']['table'].append({
'address': exp.address,
'symbol': name,
'type': 'export',
})
self.event['total']['libraries'] = len(self.event['symbols']['libraries'])
self.event['total']['symbols'] = len(self.event['symbols']['table'])
security = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']]
sec_addr = security.VirtualAddress
if security.Size > 0:
data = pe.write()[sec_addr + 8:]
if len(data) > 0:
self.flags.append('signed')
extract_file = strelka.File(
name='signature',
source=f'{self.name}::Security',
)
for c in strelka.chunk_string(data):
self.upload_to_coordinator(
extract_file.pointer,
c,
expire_at,
)
# it into python data structures for ease of processing.
# To write the file again, only the mmap'd data is written back,
# so modifying the parsed python objects generally has no effect.
# However, parsed raw data ends up in pe.Structure instances,
# and these all get serialized back when the file gets written.
# So things that are in a Structure must have their data set
# through the Structure, while other data must bet set through
# the set_bytes_*() methods.
pe = pefile.PE(input_image, fast_load=True)
if architecture == 'x64' or architecture == 'arm64':
assert pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE_PLUS
else:
assert pe.PE_TYPE == pefile.OPTIONAL_HEADER_MAGIC_PE
pe.parse_data_directories(directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT']])
found_elf = False
for i, peimport in enumerate(pe.DIRECTORY_ENTRY_IMPORT):
if peimport.dll.lower() == 'chrome_elf.dll':
assert not found_elf, 'only one chrome_elf.dll import expected'
found_elf = True
if i > 0:
swap = pe.DIRECTORY_ENTRY_IMPORT[0]
# Morally we want to swap peimport.struct and swap.struct here,
# but the pe module doesn't expose a public method on Structure
# to get all data of a Structure without explicitly listing all
# field names.
# NB: OriginalFirstThunk and Characteristics are an union both at
# offset 0, handling just one of them is enough.
peimport.struct.OriginalFirstThunk, swap.struct.OriginalFirstThunk = \
self.imphash = None
self.pe_type = None
self.section_names = None
class PEParser(interface.FileObjectParser):
"""Parser for Portable Executable (PE) files."""
NAME = 'pe'
DESCRIPTION = 'Parser for Portable Executable (PE) files.'
_PE_DIRECTORIES = [
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT']]
@classmethod
def GetFormatSpecification(cls):
"""Retrieves the format specification."""
format_specification = specification.FormatSpecification(cls.NAME)
format_specification.AddNewSignature(b'MZ', offset=0)
return format_specification
def _GetSectionNames(self, pefile_object):
"""Retrieves all PE section names.
Args:
pefile_object (pefile.PE): pefile object.
Returns:
list[str]: names of the sections.
def obtain_image(self, image_address):
peheader = self.read_memory(image_address, 1, 0x600)
pe = pefile.PE(data = peheader, fast_load = True)
directory_index = pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']
dir_entry = pe.OPTIONAL_HEADER.DATA_DIRECTORY[directory_index]
peheader += b'\0' * (dir_entry.VirtualAddress - 0x600)
peheader += self.read_memory(image_address + dir_entry.VirtualAddress, 1, dir_entry.Size)
return peheader
def get_debug_data(pe, type = DEBUG_TYPE[u'IMAGE_DEBUG_TYPE_CODEVIEW']):
retval = None
if not hasattr(pe, u'DIRECTORY_ENTRY_DEBUG'):
# fast loaded - load directory
pe.parse_data_directories(DIRECTORY_ENTRY[u'IMAGE_DIRECTORY_ENTRY_DEBUG'])
if not hasattr(pe, u'DIRECTORY_ENTRY_DEBUG'):
raise PENoDebugDirectoryEntriesError()
else:
for entry in pe.DIRECTORY_ENTRY_DEBUG:
off = entry.struct.PointerToRawData
size = entry.struct.SizeOfData
if entry.struct.Type == type:
retval = pe.__data__[off:off + size]
break
return retval
data = FILE.read()
FILE.close()
except:
continue
if data == None or len(data) == 0:
out.append("Cannot read %s (maybe empty?)" % file)
out.append("")
continue
try:
pe = pefile.PE(data=data, fast_load=True)
pe.parse_data_directories( directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
except:
out.append("Cannot parse %s (maybe not PE?)" % file)
out.append("")
continue
# source: https://code.google.com/p/pyew/
def get_filearch(data):
if pe.FILE_HEADER.Machine == 0x14C: # IMAGE_FILE_MACHINE_I386
processor="intel"
type = 32
return "32 Bits binary"
elif pe.FILE_HEADER.Machine == 0x8664: # IMAGE_FILE_MACHINE_AMD64
processor="intel"
type = 64
return "64 Bits binary"
def load_dyn_sym(self):
try:
self.pe.parse_data_directories(
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'])
except Exception as e:
raise ExcPEFail(e)
if hasattr(self.pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
if imp.name is None:
continue
name = imp.name
# erocarrera/pefile returns a bytes but mlaferrera/prefile
# returns a string.
if isinstance(name, bytes):
name = name.decode()
def __init__(self, data, bv):
self.bv = bv
self.data = data
self.status = False
self.good_ep_sections = ['.text', '.code', 'CODE', 'INIT', 'PAGE']
self.alerts_api = ['accept', 'AddCredentials', 'bind', 'CertDeleteCertificateFromStore', 'CheckRemoteDebuggerPresent', 'closesocket', 'connect', 'ConnectNamedPipe', 'CopyFile', 'CreateFile', 'CreateProcess', 'CreateToolhelp32Snapshot', 'CreateFileMapping', 'CreateRemoteThread', 'CreateDirectory', 'CreateService', 'CreateThread', 'CryptEncrypt', 'DeleteFile', 'DeviceIoControl', 'DisconnectNamedPipe', 'DNSQuery', 'EnumProcesses', 'ExitThread', 'FindWindow', 'FindResource', 'FindFirstFile', 'FindNextFile', 'FltRegisterFilter', 'FtpGetFile', 'FtpOpenFile', 'GetCommandLine', 'GetThreadContext', 'GetDriveType', 'GetFileSize', 'GetFileAttributes', 'GetHostByAddr', 'GetHostByName', 'GetHostName', 'GetModuleHandle', 'GetProcAddress', 'GetTempFileName', 'GetTempPath', 'GetTickCount', 'GetUpdateRect', 'GetUpdateRgn', 'GetUserNameA', 'GetUrlCacheEntryInfo', 'GetComputerName', 'GetVersionEx', 'GetModuleFileName', 'GetStartupInfo', 'GetWindowThreadProcessId', 'HttpSendRequest', 'HttpQueryInfo', 'IcmpSendEcho', 'IsDebuggerPresent', 'InternetCloseHandle', 'InternetConnect', 'InternetCrackUrl', 'InternetQueryDataAvailable', 'InternetGetConnectedState', 'InternetOpen', 'InternetQueryDataAvailable', 'InternetQueryOption', 'InternetReadFile', 'InternetWriteFile', 'LdrLoadDll', 'LoadLibrary', 'LoadLibraryA', 'LockResource', 'listen', 'MapViewOfFile', 'OutputDebugString', 'OpenFileMapping', 'OpenProcess', 'Process32First', 'Process32Next', 'recv', 'ReadProcessMemory', 'RegCloseKey', 'RegCreateKey', 'RegDeleteKey', 'RegDeleteValue', 'RegEnumKey', 'RegOpenKey', 'send', 'sendto', 'SetKeyboardState', 'SetWindowsHook', 'ShellExecute', 'Sleep', 'socket', 'StartService', 'TerminateProcess', 'UnhandledExceptionFilter', 'URLDownload', 'VirtualAlloc', 'VirtualProtect', 'VirtualAllocEx', 'WinExec', 'WriteProcessMemory', 'WriteFile', 'WSASend', 'WSASocket', 'WSAStartup', 'ZwQueryInformation']
self.alerts_imp = ['ntoskrnl.exe', 'hal.dll', 'ndis.sys']
self.mgc_path="c:\\Windows\\System32\\magic.mgc"
try:
self.pe = pefile.PE(data=data, fast_load=True)
self.pe.parse_data_directories( directories=[
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_TLS'],
pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
self.status = True
except:
print("Cannot parse the file (maybe not PE?)")
def get_certificate(pe):
# TODO: this only extract the raw list of certificate data.
# I need to parse them, extract single certificates and perhaps return
# the PEM data of the first certificate only.
pe_security_dir = pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']
address = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pe_security_dir].VirtualAddress
# size = pe.OPTIONAL_HEADER.DATA_DIRECTORY[pe_security_dir].Size
if address:
return pe.write()[address + 8:]
else:
return None