Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_insert_entry_align(tmpdir):
file_content = dedent(
"""
2016-02-26 * "Uncle Boons" "Eating out alone"
Liabilities:US:Chase:Slate -24.84 USD
Expenses:Food:Restaurant 24.84 USD
"""
)
samplefile = tmpdir.mkdir("fava_util_file3").join("example.beancount")
samplefile.write(file_content)
postings = [
data.Posting(
"Liabilities:US:Chase:Slate",
amount.Amount(D("-10.00"), "USD"),
None,
None,
None,
None,
),
data.Posting(
"Expenses:Food",
amount.Amount(D("10.00"), "USD"),
None,
None,
None,
None,
),
]
irows = iter(section)
prev_balance = Amount(Decimal(), cash_currency)
prev_date = datetime.date(1970, 1, 1)
Tuple = csv_utils.csv_parse_header(next(irows))
matcher = Matcher()
for index, row in enumerate(itertools.starmap(Tuple, irows)):
# Skip the empty balances; these aren't interesting.
if re.search('balance at the start of business day', row.description):
continue
# Skip end lines that cannot be parsed.
if not row.date:
continue
# Get the row's date and fileloc.
fileloc = data.FileLocation(filename, index)
date = datetime.datetime.strptime(row.date, '%d/%m/%y').date()
# Insert some Balance entries every time the day changed.
if ((debug and date != prev_date) or
(not debug and date.month != prev_date.month)):
prev_date = date
fileloc = data.FileLocation(filename, index)
new_entries.append(Balance(fileloc, date, config['asset_cash'], prev_balance, None))
# Create a new transaction.
narration = "({0.type}) {0.description}".format(row)
links = set([row.ref])
entry = Transaction(fileloc, date, flags.FLAG_IMPORT, None, narration, None, links, [])
amount = convert_number(row.amount)
txn = data.Transaction(meta, date, self.FLAG, payee, narration,
tags, data.EMPTY_SET, [])
# Attach one posting to the transaction
amount_debit, amount_credit = get_amounts(iconfig, row)
# Skip empty transactions
if amount_debit is None and amount_credit is None:
continue
for amount in [amount_debit, amount_credit]:
if amount is None:
continue
units = Amount(amount, self.currency)
txn.postings.append(
data.Posting(account, units, None, None, None, None))
# Attach the other posting(s) to the transaction.
if isinstance(self.categorizer, collections.abc.Callable):
txn = self.categorizer(txn)
# Add the transaction to the output list
entries.append(txn)
# Figure out if the file is in ascending or descending order.
first_date = parse_date_liberally(get(first_row, Col.DATE),
self.dateutil_kwds)
last_date = parse_date_liberally(get(last_row, Col.DATE),
self.dateutil_kwds)
is_ascending = first_date < last_date
# Reverse the list if the file is in descending order
def generate_table(self, entries, _, __):
all_postings = [posting
for entry in entries
if isinstance(entry, data.Transaction)
for posting in entry.postings]
postings_by_account = misc_utils.groupby(lambda posting: posting.account,
all_postings)
nb_postings_by_account = {key: len(postings)
for key, postings in postings_by_account.items()}
rows = sorted(nb_postings_by_account.items(), key=lambda x: x[1], reverse=True)
rows = [(name, str(count)) for (name, count) in rows]
rows.append(('~Total~', str(sum(nb_postings_by_account.values()))))
return table.create_table(rows, [(0, 'Account'),
(1, 'Num Postings', '{:>}'.format)])
is_file = False
else:
# If we're parsing a string, the CWD is the current process
# working directory.
cwd = os.getcwd()
source_filename = None
if is_file:
# All filenames here must be absolute.
assert path.isabs(source)
filename = path.normpath(source)
# Check for file previously parsed... detect duplicates.
if filename in filenames_seen:
parse_errors.append(
LoadError(data.new_metadata("", 0),
'Duplicate filename parsed: "{}"'.format(filename),
None))
continue
# Check for a file that does not exist.
if not path.exists(filename):
parse_errors.append(
LoadError(data.new_metadata("", 0),
'File "{}" does not exist'.format(filename), None))
continue
# Parse a file from disk directly.
filenames_seen.add(filename)
with misc_utils.log_time('beancount.parser.parser.parse_file',
log_timings, indent=2):
(src_entries,
include in the output. A reasonable example would be
('Assets', 'Liabilities'). If not specified, include all account types.
price_map: A dict of prices, as built by prices.build_price_map().
date: A datetime.date instance, the date at which to price the
holdings. If left unspecified, we use the latest price information.
Returns:
A list of dicts, with the following fields:
"""
# Remove the entries inserted by unrealized gains/losses. Those entries do
# affect asset accounts, and we don't want them to appear in holdings.
#
# Note: Perhaps it would make sense to generalize this concept of "inserted
# unrealized gains."
simple_entries = [entry
for entry in entries
if (not isinstance(entry, data.Transaction) or
entry.flag != flags.FLAG_UNREALIZED)]
# Realize the accounts into a tree (because we want the positions by-account).
root_account = realization.realize(simple_entries)
# For each account, look at the list of positions and build a list.
holdings = []
for real_account in sorted(list(realization.iter_children(root_account)),
key=lambda ra: ra.account):
if included_account_types:
# Skip accounts of invalid types, we only want to reflect the requested
# account types, typically assets and liabilities.
account_type = account_types.get_account_type(real_account.account)
if account_type not in included_account_types:
continue
number DECIMAL(16, 6),
currency CHARACTER(10),
cost_number DECIMAL(16, 6),
cost_currency CHARACTER(10),
cost_date DATE,
cost_label VARCHAR,
price_number DECIMAL(16, 6),
price_currency CHARACTER(10),
FOREIGN KEY(id) REFERENCES entries(id)
);
""")
postings_count = iter(itertools.count())
with connection:
for eid, entry in enumerate(entries):
if not isinstance(entry, data.Transaction):
continue
connection.execute("""
insert into entry values (?, ?, ?, ?, ?);
""", (eid, entry.date, 'txn', entry.meta["filename"], entry.meta["lineno"]))
connection.execute("""
insert into transactions_detail values (?, ?, ?, ?, ?, ?);
""", (eid, entry.flag, entry.payee, entry.narration,
','.join(entry.tags or ()), ','.join(entry.links or ())))
for posting in entry.postings:
pid = next(postings_count)
units = posting.units
cost = posting.cost
price = posting.price
connection.execute("""
def apply_filter(self, entries, options_map):
component = self.component
component_entries = [entry
for entry in entries
if data.has_entry_account_component(entry, component)]
return component_entries, None
def get_entry_text_description(entry):
"""Return the text of a description.
Args:
entry: A directive, of any type.
Returns:
A string to use for the filling the description field in text reports.
"""
if isinstance(entry, data.Transaction):
description = ' | '.join([field
for field in [entry.payee, entry.narration]
if field is not None])
elif isinstance(entry, data.Balance):
if entry.diff_amount is None:
description = 'PASS - In {}'.format(entry.account)
else:
description = ('FAIL - In {}; '
'expected = {}, difference = {}').format(
entry.account,
entry.amount,
entry.diff_amount)
elif isinstance(entry, (data.Open, data.Close)):
description = entry.account
elif isinstance(entry, data.Note):
description = entry.comment
elif isinstance(entry, data.Document):
description = entry.filename
else:
description = '-'