Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
reader = csv.DictReader(csvfile)
if reader.fieldnames != expected_field_names:
raise RuntimeError(
'Actual field names %r != expected field names %r' %
(reader.fieldnames, expected_field_names))
for line_i, row in enumerate(reader):
transactions.append(
FundTransaction(
account=account,
date=datetime.datetime.strptime(row['Date'],
date_format).date(),
description=row['Category'],
memo=row['Description'],
price=parse_amount(row['Price']),
amount=parse_amount(row['Amount']),
units=Amount(parse_number(row['Shares']), row['Fund']),
balance=Amount(
parse_number(row['Total Shares']), row['Fund']),
filename=filename,
line=line_i + 1,
))
return transactions
Args:
amount1: An instance of Amount.
amount2: An instance of Amount.
Returns:
An instance of Amount, with the difference between the two amount's
numbers, in the same currency.
"""
assert isinstance(amount1.number, Decimal), (
"Amount1's number is not a Decimal instance: {}".format(amount1.number))
assert isinstance(amount2.number, Decimal), (
"Amount2's number is not a Decimal instance: {}".format(amount2.number))
if amount1.currency != amount2.currency:
raise ValueError(
"Unmatching currencies for operation on {} and {}".format(
amount1, amount2))
return Amount(amount1.number - amount2.number, amount1.currency)
"""
new_entries = []
for entry in entries:
if isinstance(entry, data.Transaction):
for posting in entry.postings:
if posting.meta and posting.meta.get('closing', False):
# Remove the metadata.
meta = posting.meta.copy()
del meta['closing']
entry = entry._replace(meta=meta)
# Insert a balance.
date = entry.date + datetime.timedelta(days=1)
balance = data.Balance(data.new_metadata("", 0),
date, posting.account,
amount.Amount(ZERO, posting.units.currency),
None, None)
new_entries.append(balance)
new_entries.append(entry)
return new_entries, []
errors.append(
BookConversionError(posting.meta,
"Could not match position {}".format(posting), None))
break
# Reduce the pending lots.
number = min(match_number, fnumber[0])
cost = fcost
match_number -= number
fnumber[0] -= number
if fnumber[0] == ZERO:
pending_lots.pop(0)
# Add a corresponding posting.
rposting = posting._replace(
units=amount.Amount(-number, posting.units.currency),
cost=copy.copy(cost))
new_postings.append(rposting)
# Update the P/L.
pnl += number * (posting.price.number - cost.number)
# Add to the list of matches.
matches.append(((findex, fposting),
(eindex, rposting)))
return new_postings, matches, pnl, errors
for position in positions))
new_entries = []
price_map = {}
for currency, cost_currency in instruments:
# print('------------------------------------------------------------------------------------------------------------------------')
# print(currency, cost_currency)
price_list = fetch_google_historical(currency, cost_currency)
if price_list is None:
continue
for date, price in price_list:
fileloc = FileLocation('', 0)
new_entries.append(
Price(fileloc, date, currency, Amount(price, cost_currency)))
for entry in new_entries:
print(printer.format_entry(entry), end='')
for position in positions:
curkey = (position['currency'], position['cost_currency'])
try:
date, close = price_map[curkey]
position['price_date'] = date
position['price_number'] = close
except KeyError:
pass
# If requested, dump the updated list of positions and prices.
if opts.dump:
write_positions_to_csv(opts.dump, positions)
def _serialise_posting(posting):
"""Serialise a posting."""
if isinstance(posting.units, Amount):
position_str = position.to_string(posting)
else:
position_str = ""
if posting.price is not None:
position_str += " @ {}".format(posting.price.to_string())
return {"account": posting.account, "amount": position_str}
if reader.fieldnames != expected_field_names:
raise RuntimeError(
'Actual field names %r != expected field names %r' %
(reader.fieldnames, expected_field_names))
for line_i, row in enumerate(reader):
transactions.append(
FundTransaction(
account=account,
date=datetime.datetime.strptime(row['Date'],
date_format).date(),
description=row['Category'],
memo=row['Description'],
price=parse_amount(row['Price']),
amount=parse_amount(row['Amount']),
units=Amount(parse_number(row['Shares']), row['Fund']),
balance=Amount(
parse_number(row['Total Shares']), row['Fund']),
filename=filename,
line=line_i + 1,
))
return transactions
funding_source_inventory -= units
transaction.postings.append(
Posting(
meta=meta,
account=account,
units=units,
cost=None,
price=None,
flag=None,
))
for currency in funding_source_inventory:
transaction.postings.append(
Posting(
account=funding_source_account,
units=Amount(currency=currency, number=funding_source_inventory[currency]),
cost=None,
price=None,
flag=None,
meta=collections.OrderedDict(funding_source_metadata),
))
return ImportResult(
date=transaction.date,
info=dict(
type='application/json',
filename=json_path,
),
entries=[transaction])
def amounts_map(entry):
"""Compute a mapping of (account, currency) -> Decimal balances.
Args:
entry: A Transaction instance.
Returns:
A dict of account -> Amount balance.
"""
amounts = collections.defaultdict(D)
for posting in entry.postings:
# Skip interpolated postings.
if posting.meta and interpolate.AUTOMATIC_META in posting.meta:
continue
currency = isinstance(posting.units, amount.Amount) and posting.units.currency
if isinstance(currency, str):
key = (posting.account, currency)
amounts[key] += posting.units.number
return amounts