Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
# Validate the parameters.
if since and until and since > until:
warnings.warn("CrashDAO.find() got the 'since' and 'until'"
" arguments reversed, corrected automatically.")
since, until = until, since
if limit is not None and not limit:
warnings.warn("CrashDAO.find() was set a limit of 0 results,"
" returning without executing a query.")
return []
# Build the SQL query.
query = self._session.query(CrashDTO)
if signature is not None:
sig_pickled = pickle.dumps(signature, protocol = 0)
query = query.filter(CrashDTO.signature == sig_pickled)
if since:
query = query.filter(CrashDTO.timestamp >= since)
if until:
query = query.filter(CrashDTO.timestamp < until)
if order:
if order > 0:
query = query.order_by(asc(CrashDTO.timestamp))
else:
query = query.order_by(desc(CrashDTO.timestamp))
else:
# Default ordering is by row ID, to get consistent results.
# Also some database engines require ordering when using offsets.
query = query.order_by(asc(CrashDTO.id))
if offset:
query = query.offset(offset)
def __init__(self, crash):
"""
@type crash: Crash
@param crash: L{Crash} object to store into the database.
"""
# Timestamp and signature.
self.timestamp = datetime.datetime.fromtimestamp( crash.timeStamp )
self.signature = pickle.dumps(crash.signature, protocol = 0)
# Marshalled Crash object, minus the memory dump.
# This code is *not* thread safe!
memoryMap = crash.memoryMap
try:
crash.memoryMap = None
self.data = buffer( Marshaller.dumps(crash) )
finally:
crash.memoryMap = memoryMap
# Exploitability test.
self.exploitability_rating, \
self.exploitability_rule, \
self.exploitability_desc = crash.isExploitable()
# Exploitability test as an integer result (for sorting).
"""
# Validate the parameters.
if since and until and since > until:
warnings.warn("CrashDAO.find() got the 'since' and 'until'"
" arguments reversed, corrected automatically.")
since, until = until, since
if limit is not None and not limit:
warnings.warn("CrashDAO.find() was set a limit of 0 results,"
" returning without executing a query.")
return []
# Build the SQL query.
query = self._session.query(CrashDTO)
if signature is not None:
sig_pickled = pickle.dumps(signature, protocol = 0)
query = query.filter(CrashDTO.signature == sig_pickled)
if since:
query = query.filter(CrashDTO.timestamp >= since)
if until:
query = query.filter(CrashDTO.timestamp < until)
if order:
if order > 0:
query = query.order_by(asc(CrashDTO.timestamp))
else:
query = query.order_by(desc(CrashDTO.timestamp))
else:
# Default ordering is by row ID, to get consistent results.
# Also some database engines require ordering when using offsets.
query = query.order_by(asc(CrashDTO.id))
if offset:
query = query.offset(offset)
@type allow_duplicates: bool
@param allow_duplicates: (Optional)
C{True} to always add the new crash dump.
C{False} to only add the crash dump if no other crash with the
same signature is found in the database.
Sometimes, your fuzzer turns out to be I{too} good. Then you find
youself browsing through gigabytes of crash dumps, only to find
a handful of actual bugs in them. This simple heuristic filter
saves you the trouble by discarding crashes that seem to be similar
to another one you've already found.
"""
# Filter out duplicated crashes, if requested.
if not allow_duplicates:
signature = pickle.dumps(crash.signature, protocol = 0)
if self._session.query(CrashDTO.id) \
.filter_by(signature = signature) \
.count() > 0:
return
# Fill out a new row for the crashes table.
crash_id = self.__add_crash(crash)
# Fill out new rows for the memory dump.
self.__add_memory(crash_id, crash.memoryMap)
# On success set the row ID for the Crash object.
# WARNING: In nested calls, make sure to delete
# this property before a session rollback!
crash._rowid = crash_id
@type allow_duplicates: bool
@param allow_duplicates: (Optional)
C{True} to always add the new crash dump.
C{False} to only add the crash dump if no other crash with the
same signature is found in the database.
Sometimes, your fuzzer turns out to be I{too} good. Then you find
youself browsing through gigabytes of crash dumps, only to find
a handful of actual bugs in them. This simple heuristic filter
saves you the trouble by discarding crashes that seem to be similar
to another one you've already found.
"""
# Filter out duplicated crashes, if requested.
if not allow_duplicates:
signature = pickle.dumps(crash.signature, protocol = 0)
if self._session.query(CrashDTO.id) \
.filter_by(signature = signature) \
.count() > 0:
return
# Fill out a new row for the crashes table.
crash_id = self.__add_crash(crash)
# Fill out new rows for the memory dump.
self.__add_memory(crash_id, crash.memoryMap)
# On success set the row ID for the Crash object.
# WARNING: In nested calls, make sure to delete
# this property before a session rollback!
crash._rowid = crash_id
def count(self, signature = None):
"""
Counts how many crash dumps have been stored in this database.
Optionally filters the count by heuristic signature.
@type signature: object
@param signature: (Optional) Count only the crashes that match
this signature. See L{Crash.signature} for more details.
@rtype: int
@return: Count of crash dumps stored in this database.
"""
query = self._session.query(CrashDTO.id)
if signature:
sig_pickled = pickle.dumps(signature, protocol = 0)
query = query.filter_by(signature = sig_pickled)
return query.count()
def __init__(self, crash):
"""
@type crash: Crash
@param crash: L{Crash} object to store into the database.
"""
# Timestamp and signature.
self.timestamp = datetime.datetime.fromtimestamp( crash.timeStamp )
self.signature = pickle.dumps(crash.signature, protocol = 0)
# Marshalled Crash object, minus the memory dump.
# This code is *not* thread safe!
memoryMap = crash.memoryMap
try:
crash.memoryMap = None
self.data = buffer( Marshaller.dumps(crash) )
finally:
crash.memoryMap = memoryMap
# Exploitability test.
self.exploitability_rating, \
self.exploitability_rule, \
self.exploitability_desc = crash.isExploitable()
# Exploitability test as an integer result (for sorting).
def count(self, signature = None):
"""
Counts how many crash dumps have been stored in this database.
Optionally filters the count by heuristic signature.
@type signature: object
@param signature: (Optional) Count only the crashes that match
this signature. See L{Crash.signature} for more details.
@rtype: int
@return: Count of crash dumps stored in this database.
"""
query = self._session.query(CrashDTO.id)
if signature:
sig_pickled = pickle.dumps(signature, protocol = 0)
query = query.filter_by(signature = sig_pickled)
return query.count()