Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_order(self, action: int, exchange: 'Exchange', portfolio: 'Portfolio') -> Order:
if action == 0:
return None
(order, size) = self._actions[action]
instrument = order.pair.base if order.side == TradeSide.BUY else order.pair.quote
wallet = portfolio.get_wallet(exchange.id, instrument=instrument)
price = exchange.quote_price(instrument)
size = min(wallet.balance.size, (wallet.balance.size * size))
if size < 10 ** -instrument.precision:
return None
quantity = size * instrument
wallet -= quantity
order = Order(side=order.side,
trade_type=order.trade_type,
pair=order.pair,
price=price,
quantity=quantity,
def adjust_trade(self, trade: Trade) -> Trade:
size_slippage = np.random.uniform(0, self.max_size_slippage_percent / 100)
price_slippage = np.random.uniform(0, self.max_price_slippage_percent / 100)
initial_price = trade.price
trade.size = trade.size * (1 - size_slippage)
if trade.type == TradeType.MARKET:
if trade.side == TradeSide.BUY:
trade.price = max(initial_price * (1 + price_slippage), 1e-3)
else:
trade.price = max(initial_price * (1 - price_slippage), 1e-3)
else:
if trade.side == TradeSide.BUY:
trade.price = max(initial_price * (1 + price_slippage), 1e-3)
if trade.price > initial_price:
trade.size *= min(initial_price / trade.price, 1)
else:
trade.price = max(initial_price * (1 - price_slippage), 1e-3)
if trade.price < initial_price:
trade.size *= min(trade.price / initial_price, 1)
return trade
def execute(self, exchange: 'Exchange'):
self.status = OrderStatus.OPEN
instrument = self.pair.base if self.side == TradeSide.BUY else self.pair.quote
wallet = self.portfolio.get_wallet(exchange.id, instrument=instrument)
if self.path_id not in wallet.locked.keys():
wallet -= self.size * instrument
wallet += self.quantity
if self.portfolio.order_listener:
self.attach(self.portfolio.order_listener)
for listener in self._listeners or []:
listener.on_execute(self, exchange)
return exchange.execute_order(self, self.portfolio)
def reset(self):
self._actions = [None]
for trading_pair, criteria, size in product(self._pairs, self._criteria, self._trade_sizes):
self._actions += [(TradeSide.BUY, trading_pair, criteria, size)]
self._actions += [(TradeSide.SELL, trading_pair, criteria, size)]
def __init__(self,
pairs: Union[List['TradingPair'], 'TradingPair'],
stop_loss_percentages: Union[List[float], float] = [0.02, 0.04, 0.06],
take_profit_percentages: Union[List[float], float] = [0.01, 0.02, 0.03],
trade_sizes: Union[List[float], int] = 10,
trade_side: TradeType = TradeSide.BUY,
trade_type: TradeType = TradeType.MARKET,
order_listener: OrderListener = None):
"""
Arguments:
pairs: A list of trading pairs to select from when submitting an order.
(e.g. TradingPair(BTC, USD), TradingPair(ETH, BTC), etc.)
stop_loss_percentages: A list of possible stop loss percentages for each order.
take_profit_percentages: A list of possible take profit percentages for each order.
trade_sizes: A list of trade sizes to select from when submitting an order.
(e.g. '[1, 1/3]' = 100% or 33% of balance is tradeable. '4' = 25%, 50%, 75%, or 100% of balance is tradeable.)
order_listener (optional): An optional listener for order events executed by this action scheme.
"""
self.pairs = self.default('pairs', pairs)
self.stop_loss_percentages = self.default('stop_loss_percentages', stop_loss_percentages)
self.take_profit_percentages = self.default(
'take_profit_percentages', take_profit_percentages)
def execute_order(self, order: 'Order', portfolio: 'Portfolio'):
if order.type == TradeType.LIMIT and order.side == TradeSide.BUY:
executed_order = self._exchange.create_limit_buy_order(
order.symbol, order.size, order.price)
elif order.type == TradeType.MARKET and order.side == TradeSide.BUY:
executed_order = self._exchange.create_market_buy_order(order.symbol, order.size)
elif order.type == TradeType.LIMIT and order.side == TradeSide.SELL:
executed_order = self._exchange.create_limit_sell_order(
order.symbol, order.size, order.price)
elif order.type == TradeType.MARKET and order.side == TradeSide.SELL:
executed_order = self._exchange.create_market_sell_order(order.symbol, order.size)
else:
return order.copy()
max_wait_time = time.time() + self._max_trade_wait_in_sec
while order['status'] == 'open' and time.time() < max_wait_time:
executed_order = self._exchange.fetch_order(order.id)
if order['status'] == 'open':
self._exchange.cancel_order(order.id)
return None
buy_quantity = size * base_instrument
order = Order(side=self._trade_side,
trade_type=self._trade_type,
pair=pair,
price=price,
quantity=buy_quantity,
portfolio=portfolio)
risk_criteria = StopLoss(direction='either',
up_percent=take_profit,
down_percent=stop_loss)
risk_management = Recipe(side=TradeSide.SELL if self._trade_side == TradeSide.BUY else TradeSide.BUY,
trade_type=TradeType.MARKET,
pair=pair,
criteria=risk_criteria)
order.add_recipe(risk_management)
if self._order_listener is not None:
order.attach(self._order_listener)
return order
if order.type == TradeType.LIMIT and order.price < current_price:
return None
commission = Quantity(order.pair.base, order.size * self._commission, order.path_id)
size = self._contain_size(order.size - commission.size)
if order.type == TradeType.MARKET:
size = self._contain_size(order.price / price * order.size - commission.size)
quantity = Quantity(order.pair.base, size, order.path_id)
trade = Trade(order_id=order.id,
exchange_id=self.id,
step=self.clock.step,
pair=order.pair,
side=TradeSide.BUY,
trade_type=order.type,
quantity=quantity,
price=price,
commission=commission)
# self._slippage_model.adjust_trade(trade)
quote_size = trade.size / trade.price * (order.price / trade.price)
base_wallet -= quantity
base_wallet -= commission
quote_wallet += Quantity(order.pair.quote, quote_size, order.path_id)
return trade
def __call__(self, order: 'Order', exchange: 'Exchange') -> bool:
if not exchange.is_pair_tradeable(order.pair):
return False
price = exchange.quote_price(order.pair)
buy_satisfied = (order.side == TradeSide.BUY and price <= self.limit_price)
sell_satisfied = (order.side == TradeSide.SELL and price >= self.limit_price)
return buy_satisfied or sell_satisfied
def reset(self):
self._actions = [None]
for trading_pair, criteria, size in product(self._pairs, self._criteria, self._trade_sizes):
self._actions += [(TradeSide.BUY, trading_pair, criteria, size)]
self._actions += [(TradeSide.SELL, trading_pair, criteria, size)]