Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_duplicateprice_different_source(self, book_basic):
EUR = book_basic.commodities(namespace="CURRENCY")
USD = book_basic.currencies(mnemonic="USD")
p = Price(commodity=USD, currency=EUR, date=date(2014, 2, 22), value=Decimal('0.54321'), source="user:price")
p1 = Price(commodity=USD, currency=EUR, date=date(2014, 2, 22), value=Decimal('0.12345'), source="other:price")
book_basic.flush()
assert USD.prices.filter_by(value=Decimal('0')).all() == []
assert USD.prices.filter_by(value=Decimal('0.12345')).one() == p1
# validation should work as different sources
book_basic.validate()
def test_create_duplicateprice(self, book_basic):
EUR = book_basic.commodities(namespace="CURRENCY")
USD = book_basic.currencies(mnemonic="USD")
p = Price(commodity=USD, currency=EUR, date=date(2014, 2, 22), value=Decimal('0.54321'))
p1 = Price(commodity=USD, currency=EUR, date=date(2014, 2, 22), value=Decimal('0.12345'))
book_basic.flush()
assert USD.prices.filter_by(value=Decimal('0')).all() == []
assert USD.prices.filter_by(value=Decimal('0.12345')).one() == p1
with pytest.raises(ValueError):
book_basic.validate()
def test_create_duplicateprice_different_source(self, book_basic):
EUR = book_basic.commodities(namespace="CURRENCY")
USD = book_basic.currencies(mnemonic="USD")
p = Price(commodity=USD, currency=EUR, date=date(2014, 2, 22), value=Decimal('0.54321'), source="user:price")
p1 = Price(commodity=USD, currency=EUR, date=date(2014, 2, 22), value=Decimal('0.12345'), source="other:price")
book_basic.flush()
assert USD.prices.filter_by(value=Decimal('0')).all() == []
assert USD.prices.filter_by(value=Decimal('0.12345')).one() == p1
# validation should work as different sources
book_basic.validate()
def import_price(self, price: PriceModel):
""" Import individual price """
# Handle yahoo-style symbols with extension.
symbol = price.symbol
if "." in symbol:
symbol = price.symbol.split(".")[0]
stock = SecuritiesAggregate(self.book).get_by_symbol(symbol)
# get_aggregate_for_symbol
if stock is None:
logging.warning("security %s not found in book.", price.symbol)
return False
# check if there is already a price for the date
existing_prices = stock.prices.filter(Price.date == price.datetime.date()).all()
if not existing_prices:
# Create new price for the commodity (symbol).
self.__create_price_for(stock, price)
else:
logging.warning("price already exists for %s on %s",
stock.mnemonic, price.datetime.strftime("%Y-%m-%d"))
existing_price = existing_prices[0]
# update price
existing_price.value = price.value
return True
def __init__(self):
self.filename: str = None
self.prices: List[Price] = []
def get_latest_price(self) -> Price:
""" Returns the latest rate compared to default currency """
default_currency = self.book.default_currency
# Ensure that the rate is against the default currency only.
query = (
self.currency.prices
.filter(Price.currency == default_currency)
.order_by(desc(Price.date))
)
latest_price = query.first()
return latest_price
if self.transaction.currency != self.account.commodity:
# let us also add a Price
from piecash import Price
value = (self.value / self.quantity).quantize(Decimal("0.000001"))
try:
# find existing price if any and if so, do nothing
pr = self.book.prices(commodity=self.account.commodity,
currency=self.transaction.currency,
date=self.transaction.post_date,
)
except KeyError:
# otherwise, add a price in the database
pr = Price(commodity=self.account.commodity,
currency=self.transaction.currency,
date=self.transaction.post_date,
value=value,
type="transaction",
source="user:split-register")
# and an action if not yet defined
if self.action == "":
self.action = "Sell" if self.quantity.is_signed() else "Buy"
logging.info("Adding a new price for %s, %s, %s",
commodity.mnemonic, price.datetime.strftime("%Y-%m-%d"), price.value)
# safety check. Compare currencies.
sec_svc = SecurityAggregate(self.book, commodity)
currency = sec_svc.get_currency()
if currency != price.currency:
raise ValueError(
"Requested currency does not match the currency previously used",
currency, price.currency)
# Description of the source field values:
# https://www.gnucash.org/docs/v2.6/C/gnucash-help/tool-price.html
new_price = Price(commodity, currency, price.datetime.date(), price.value,
source="Finance::Quote")
commodity.prices.append(new_price)
def import_fx_rates(self, rates: List[PriceModel]):
""" Imports the given prices into database. Write operation! """
have_new_rates = False
base_currency = self.get_default_currency()
for rate in rates:
assert isinstance(rate, PriceModel)
currency = self.get_by_symbol(rate.symbol)
amount = rate.value
# Do not import duplicate prices.
# todo: if the price differs, update it!
# exists_query = exists(rates_query)
has_rate = currency.prices.filter(Price.date == rate.datetime.date()).first()
# has_rate = (
# self.book.session.query(Price)
# .filter(Price.date == rate.date.date())
# .filter(Price.currency == currency)
# )
if not has_rate:
log(INFO, "Creating entry for %s, %s, %s, %s",
base_currency.mnemonic, currency.mnemonic, rate.datetime.date(), amount)
# Save the price in the exchange currency, not the default.
# Invert the rate in that case.
inverted_rate = 1 / amount
inverted_rate = inverted_rate.quantize(Decimal('.00000000'))
price = Price(commodity=currency,
currency=base_currency,
date=rate.datetime.date(),
def get_latest_rate(self, other: Commodity) -> Price:
""" Fetches the latest available rate for the currency pair """
query = (
self.currency.prices
.filter(Price.commodity == self.currency,
Price.currency == other)
)
return query.first()