Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def report(self, show_stats, show_records):
"""Return a string containing a text report for all records in the map"""
if len(self.__map) == 0:
return "No outstanding transactions found."
rstr = "%d outstanding transactions found" % len(self.__map)
if show_records:
rstr += ":"
for xid, tup in self.__map.iteritems():
rstr += "\n xid=%s:" % jrnl.Utils.format_xid(xid)
for i in tup:
rstr += "\n %s" % str(i[1])
else:
rstr += "."
return rstr
# Test hook
if self._csv_txn_cb != None and self._csv_txn_cb(self._csv_store_chk, hdr):
return True
if hdr.magic[-1] == "a":
self._abort_cnt += 1
else:
self._commit_cnt += 1
if self._tmap.contains(hdr.xid):
mismatched_rids = self._tmap.delete(hdr)
if mismatched_rids != None and len(mismatched_rids) > 0:
self._warning.append("WARNING: transactional dequeues not found in enqueue map; rids=%s" %
mismatched_rids)
else:
self._warning.append("WARNING: %s not found in transaction map" % jrnl.Utils.format_xid(hdr.xid))
if hdr.magic[-1] == "c": # commits only
self._txn_obj_list[hdr.xid] = hdr
return False