Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __copy__(self):
"""A shallow copy of this inventory object.
Returns:
An instance of Inventory, equal to this one.
"""
return Inventory(self)
Args:
incomplete_entries: A list of directives, with some postings possibly left
with incomplete amounts as produced by the parser.
options_map: An options dict as produced by the parser.
methods: A mapping of account name to their corresponding booking
method.
Returns:
A triple of
entries: A list of interpolated entries with all their postings completed.
errors: New errors produced during interpolation.
balances: A dict of account name and resulting balances.
"""
new_entries = []
errors = []
balances = collections.defaultdict(inventory.Inventory)
for entry in entries:
if isinstance(entry, Transaction):
# Group postings by currency.
refer_groups, cat_errors = categorize_by_currency(entry, balances)
if cat_errors:
errors.extend(cat_errors)
continue
posting_groups = replace_currencies(entry.postings, refer_groups)
# Get the list of tolerances.
tolerances = interpolate.infer_tolerances(entry.postings, options_map)
# Resolve reductions to a particular lot in their inventory balance.
repl_postings = []
for currency, group_postings in posting_groups:
# Important note: the group of 'postings' here is a subset of
def value_inventory(price_map, date, inv):
"""Convert a position to its market value at a particular date.
Args:
price_map: A mapping of prices as per build_price_map().
date: A datetime.date instance, the date at which to value the inventory.
inv: The inventory to convert.
Returns:
A resulting inventory.
"""
result = inventory.Inventory()
for pos in inv:
units = pos.units
if pos.cost is None:
converted_pos = units
else:
converted_pos = prices.convert_amount(price_map, pos.cost.currency, units, date)
if converted_pos is None:
logging.warning('Could not convert Position "{}" to {}'.format(
units, pos.cost.currency))
converted_pos = units
result.add_amount(converted_pos)
return result
def compute_residual(postings):
"""Compute the residual of a set of complete postings, and the per-currency precision.
This is used to cross-check a balanced transaction.
The precision is the maximum fraction that is being used for each currency
(a dict). We use the currency of the weight amount in order to infer the
quantization precision for each currency. Integer amounts aren't
contributing to the determination of precision.
Args:
postings: A list of Posting instances.
Returns:
An instance of Inventory, with the residual of the given list of postings.
"""
inventory = Inventory()
for posting in postings:
# Skip auto-postings inserted to absorb the residual (rounding error).
if posting.meta and posting.meta.get(AUTOMATIC_RESIDUAL, False):
continue
# Add to total residual balance.
inventory.add_amount(convert.get_weight(posting))
return inventory
def compute_balance_for_accounts(entries, match_account, boundary_entry):
"""Compute the balance for a subste of accounts before and after an entry.
Args:
entries: A list of directives.
match_account: A predicate on an account name that will determine
whether we include this account's cost in our balance.
boundary_entry: A directive which is present in the list of entries. If
this is set to None, we compute the balance over all entries.
Returns:
Two Inventory instances: one computed before applying the boundary entry
and one computed after.
"""
balance_before = inventory.Inventory()
for entry in entries:
if entry is boundary_entry:
break
if not isinstance(entry, data.Transaction):
continue
for posting in entry.postings:
if match_account(posting.account):
balance_before.add_position(posting.position)
balance_after = copy.copy(balance_before)
if boundary_entry is not None:
for posting in boundary_entry.postings:
if match_account(posting.account):
balance_after.add_position(posting.position)
return balance_before, balance_after
if not (date_begin < end_of_quarter < date_end):
continue
dividend_dates.append(end_of_quarter)
# Iterate over all the dates, but merging in the postings for the cash
# account.
min_amount = D('1000.00')
round_amount = D('100.00')
commission = D('8.95')
round_units = D('1')
frac_invest = D('1.00')
frac_dividend = D('0.004')
p_daily_buy = 1./15 # days
p_daily_sell = 1./90 # days
stocks_inventory = inventory.Inventory()
new_entries = []
dividend_date_iter = iter(dividend_dates)
next_dividend_date = next(dividend_date_iter)
for date, balances in iter_dates_with_balance(date_begin, date_end,
entries, [account_cash]):
# Check if we should insert a dividend. Note that we could not factor
# this out because we want to explicitly reinvest the cash dividends and
# we also want the dividends to be proportional to the amount of
# invested stock, so one feeds on the other and vice-versa.
if next_dividend_date and date > next_dividend_date:
# Compute the total balances for the stock accounts in order to
# create a realistic dividend.
total = inventory.Inventory()
for account_stock in accounts_stocks:
total.add_inventory(balances[account_stock])
def is_hashable_type(node):
"""Return true if the node is of a hashable type.
Args:
node: An instance of EvalNode.
Returns:
A boolean.
"""
return not issubclass(node.dtype, inventory.Inventory)
accounts_related = accounts_value | accounts_internal
is_external_flow_entry = lambda entry: (isinstance(entry, data.Transaction) and
any(posting.account not in accounts_related
for posting in entry.postings))
# Create an iterator over the entries we care about.
portfolio_entries = [entry
for entry in entries
if (isinstance(entry, data.Transaction) and
getters.get_entry_accounts(entry) & accounts_value)]
iter_entries = iter(portfolio_entries)
entry = next(iter_entries)
# If a beginning cut-off has been specified, skip the entries before then
# (and make sure to accumulate the initial balance correctly).
balance = inventory.Inventory()
period_begin = entry.date
# Main loop over the entries.
timeline = []
done = False
while True:
balance_begin = copy.copy(balance)
# Consume all internal flow entries, simply accumulating the total balance.
segment_entries = []
while True:
period_end = entry.date
if is_external_flow_entry(entry):
break
if entry:
segment_entries.append(entry)
if args.seed:
random.seed(args.seed)
start = args.start.date()
end = args.end.date()
rows = []
PENNY = D('0.01')
initial_balance = D(random.uniform(30000, 50000)).quantize(PENNY)
date = start
oneday = datetime.timedelta(days=1)
balance = initial_balance
inv = inventory.Inventory()
while date < end:
date += oneday
r = random.random()
s = 0
txn = None
for ttype, p in PROBAB:
if s < random.random() < s + p:
break
s += p
else:
continue
row = None
xid = random.randint(10000000, 100000000-1)
fees = ZERO
currency_map = collections.defaultdict(int)
for drow in drows:
inv = drow[index]
for currency in inv.currencies():
currency_map[currency] += 1
return [InventoryConverter('{} ({})'.format(name, currency), index, currency)
for currency, _ in sorted(currency_map.items(),
key=lambda item: (item[1], item[0]),
reverse=True)]
# A mapping of data types to their converter factory.
CONVERTING_TYPES = {
amount.Amount : convert_col_Amount,
position.Position : convert_col_Position,
inventory.Inventory : convert_col_Inventory,
}