Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_simpletlot_addsplits(self, book_basic):
EUR = book_basic.commodities(namespace="CURRENCY")
racc = book_basic.root_account
a = book_basic.accounts(name="asset")
s = book_basic.accounts(name="broker")
l = Lot(title=u"test mé", account=s, notes=u"ïlya")
for i, am in enumerate([45, -35, -20]):
tr = Transaction(currency=EUR, description="trade stock", notes=u"àçö",
post_date=date(2014, 1, 1 + i),
enter_date=datetime(2014, 1, 1 + i),
splits=[
Split(account=a, value=am * 10, memo=u"mémo asset"),
Split(account=s, value=-am * 10, quantity=-am, memo=u"mémo brok", lot=l),
])
book_basic.flush()
cdty = Commodity(namespace=u"BEL20", mnemonic=u"GnuCash Inc.", fullname=u"GnuCash Inc. stock")
asset = Account(name="asset", type="ASSET", commodity=curr, parent=b.root_account)
foreign_asset = Account(name="foreign asset", type="ASSET", commodity=other_curr, parent=b.root_account)
stock = Account(name="broker", type="STOCK", commodity=cdty, parent=asset)
expense = Account(name="exp", type="EXPENSE", commodity=curr, parent=b.root_account)
income = Account(name="inc", type="INCOME", commodity=curr, parent=b.root_account)
tr1 = Transaction(post_date=datetime(2015, 10, 21),
description="my revenue",
currency=curr,
splits=[
Split(account=asset, value=(1000, 1)),
Split(account=income, value=(-1000, 1)),
]
)
tr2 = Transaction(post_date=datetime(2015, 10, 25),
description="my expense",
currency=curr,
splits=[
Split(account=asset, value=(-100, 1)),
Split(account=expense, value=(20, 1), memo="cost of X"),
Split(account=expense, value=(80, 1), memo="cost of Y"),
]
)
tr_stock = Transaction(post_date=datetime(2015, 10, 29),
description="my purchase of stock",
currency=curr,
splits=[
Split(account=asset, value=(-200, 1)),
Split(account=expense, value=(15, 1), memo="transaction costs"),
Split(account=stock, value=(185, 1), quantity=(6, 1), memo="purchase of stock"),
]
def get_available_splits_for_account(self, account: Account) -> List[Split]:
""" Returns all unused splits in the account. Used for the calculation of avg.price.
The split that has been partially used will have its quantity reduced to available
quantity only. """
available_splits = []
# get all purchase splits in the account
query = (
self.get_splits_query()
.filter(Split.account == account)
)
buy_splits = (
query.filter(Split.quantity > 0)
.join(Transaction)
.order_by(desc(Transaction.post_date))
).all()
buy_q = sum(split.quantity for split in buy_splits)
sell_splits = query.filter(Split.quantity < 0).all()
sell_q = sum(split.quantity for split in sell_splits)
balance = buy_q + sell_q
if balance == 0:
return available_splits
for real_split in buy_splits:
split = splitmapper.map_split(real_split, SplitModel())
if split.quantity < balance:
# take this split and reduce the balance.
balance -= split.quantity
else:
cnt = 0
for index, tran in self.mint.get_detailed_transactions().iterrows():
if cnt > 10:
break
if [tr for tr in book.transactions if tr.num==str(tran['id'])]:
#print 'already exists', tran['odate'], tran['merchant'], tran['amount']
cnt = cnt + 1
continue
if tran['isDuplicate']:
continue
a1 = book.accounts(code=str(tran['categoryId']))
a2 = book.accounts(name=tran['account'])
amount = Decimal("%.2f" % tran['amount'])
piecash.Transaction(currency=USD,
description=tran['merchant'],
splits=[
piecash.Split(account=a1, value=amount),
piecash.Split(account=a2, value=-1 * amount)
],
post_date=tran['odate'].to_pydatetime().date(),
num=str(tran['id']))
book.save()
book.close()
# create a transaction from a1 to a2
tr = Transaction(currency=c1,
description="transfer",
splits=[
Split(account=a1, value=-100),
Split(account=a2, value=100, quantity=30)
])
s.flush()
# ledger_str() returns a representation of the transaction in the ledger-cli format
tr.ledger_str()
# change the book to use the "trading accounts" options
s.book.use_trading_accounts = True
# add a new transaction identical to the previous
tr2 = Transaction(currency=c1,
description="transfer 2",
splits=[
Split(account=a1, value=-100),
Split(account=a2, value=100, quantity=30)
])
tr2.ledger_str()
# when flushing, the trading accounts are created
s.flush()
tr2.ledger_str()
# trying to create an unbalanced transaction trigger an exception
# (there is not automatic creation of an imbalance split)
tr3 = Transaction(currency=c1,
description="transfer imb",
splits=[
Split(account=a1, value=-100),
s.book.use_trading_accounts = True
# add a new transaction identical to the previous
tr2 = Transaction(currency=c1,
description="transfer 2",
splits=[
Split(account=a1, value=-100),
Split(account=a2, value=100, quantity=30)
])
tr2.ledger_str()
# when flushing, the trading accounts are created
s.flush()
tr2.ledger_str()
# trying to create an unbalanced transaction trigger an exception
# (there is not automatic creation of an imbalance split)
tr3 = Transaction(currency=c1,
description="transfer imb",
splits=[
Split(account=a1, value=-100),
Split(account=a2, value=100, quantity=30)
])
print(tr3.ledger_str())
s.flush()
fdsfdsfds
if True:
from piecash import create_book, Account
with create_book(currency="EUR") as s:
# retrieve the default currency
EUR = s.commodities.get(mnemonic="EUR")
# with create_book("test_simple_transaction.gnucash",overwrite=True) as s:
with create_book(currency="EUR") as s:
EUR = s.commodities.get(mnemonic="EUR")
acc1 = Account(name="acc1",
commodity=EUR,
parent=s.book.root_account,
account_type="BANK")
acc2 = Account(name="acc2",
commodity=EUR,
parent=s.book.root_account,
account_type="BANK",
commodity_scu=10)
s.session.flush()
tr = Transaction(currency=EUR,
description="foo",
splits=[
Split(value=Decimal("1.2345"),
account=acc1),
Split(value=Decimal("-1.2345"),
account=acc2),
])
s.session.flush()
print([(sp._quantity_denom, sp._value_denom) for sp in tr.splits])
print(s.session.query(Account.commodity).all())
print(tr.slots)
tr.post_date = datetime.datetime.now()
print(tr.slots)
tr.post_date = None
print(tr.slots)
# s = create_book(postgres_conn="postgres://user:passwd@localhost/gnucash_book1", overwrite=True)
# s = create_book("test.gnucash",overwrite=True)
s = create_book("test.gnucash", overwrite=True)
# s = create_book()
EUR = Commodity.create_from_ISO("EUR")
CAD = Commodity.create_from_ISO("CAD")
USD = Commodity.create_from_ISO("USD")
# EUR.fraction = 100000
acc1 = Account(name="foo EUR", parent=s.book.root_account, account_type="ASSET", placeholder=False, commodity=EUR)
acc2 = Account(name="baz CAD", parent=s.book.root_account, account_type="STOCK", placeholder=False, commodity=CAD)
acc3 = Account(name="baz USD", parent=s.book.root_account, account_type="STOCK", placeholder=False, commodity=USD)
# acc1.commodity_scu = 1000
t = Transaction(description="foo",
post_date=datetime.datetime.now(),
enter_date=datetime.datetime.now(),
currency=EUR)
Split(transaction=t,
value=-25,
quantity=-25,
account=acc1)
Split(transaction=t,
value=15,
quantity=10,
account=acc2)
Split(transaction=t,
value=11,
quantity=12,
account=acc3)
print(t.splits)
# parse periods
period = datetimeutils.parse_period(input_model.period)
date_from = period[0]
date_to = period[1]
logging.debug(f"got range: {input_model.period}. Parsed to {date_from} - {date_to}")
account = svc.accounts.get_by_id(input_model.account_id)
model.start_balance = svc.accounts.get_account_aggregate(
account).get_start_balance(date_from)
model.end_balance = svc.accounts.get_account_aggregate(
account).get_end_balance(date_to)
query = (
svc.book.session.query(Split)
.join(Transaction)
.filter(Split.account_guid == input_model.account_id)
.filter(Transaction.post_date >= date_from.date())
.filter(Transaction.post_date <= date_to.date())
.order_by(Transaction.post_date)
)
model.splits = query.all()
return model
def get_splits_in_period(self, start: Datum, end: Datum) -> List[Split]:
""" returns splits only up to the given date """
# from gnucash_portfolio.lib import generic
query = (
self.book.session.query(Split)
.join(Transaction)
.filter(Split.account == self.account,
Transaction.post_date >= start.value.date(),
Transaction.post_date <= end.value.date()
)
)
# sql = generic.get_sql(query)
return query.all()