Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(default_conf, mocker, caplog) -> None:
start_polling = MagicMock()
mocker.patch('freqtrade.rpc.telegram.Updater', MagicMock(return_value=start_polling))
Telegram(get_patched_freqtradebot(mocker, default_conf))
assert start_polling.call_count == 0
# number of handles registered
assert start_polling.dispatcher.add_handler.call_count > 0
assert start_polling.start_polling.call_count == 1
message_str = "rpc.telegram is listening for following commands: [['status'], ['profit'], " \
"['balance'], ['start'], ['stop'], ['forcesell'], ['forcebuy'], " \
"['performance'], ['daily'], ['count'], ['reload_conf'], ['show_config'], " \
"['stopbuy'], ['whitelist'], ['blacklist'], ['edge'], ['help'], ['version']]"
assert log_has(message_str, caplog)
def test_send_msg_sell_notification(default_conf, mocker) -> None:
msg_mock = MagicMock()
mocker.patch.multiple(
'freqtrade.rpc.telegram.Telegram',
_init=MagicMock(),
_send_msg=msg_mock
)
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
telegram = Telegram(freqtradebot)
old_convamount = telegram._fiat_converter.convert_amount
telegram._fiat_converter.convert_amount = lambda a, b, c: -24.812
telegram.send_msg({
'type': RPCMessageType.SELL_NOTIFICATION,
'exchange': 'Binance',
'pair': 'KEY/ETH',
'gain': 'loss',
'limit': 3.201e-05,
'amount': 1333.3333333333335,
'order_type': 'market',
'open_rate': 7.5e-05,
'current_rate': 3.201e-05,
'profit_amount': -0.05746268,
'profit_percent': -0.57405275,
'stake_currency': 'ETH',
'fiat_currency': 'USD',
def test_reload_conf_handle(default_conf, update, mocker) -> None:
msg_mock = MagicMock()
mocker.patch.multiple(
'freqtrade.rpc.telegram.Telegram',
_init=MagicMock(),
_send_msg=msg_mock
)
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
telegram = Telegram(freqtradebot)
freqtradebot.state = State.RUNNING
assert freqtradebot.state == State.RUNNING
telegram._reload_conf(update=update, context=MagicMock())
assert freqtradebot.state == State.RELOAD_CONF
assert msg_mock.call_count == 1
assert 'reloading config' in msg_mock.call_args_list[0][0][0]
def test_whitelist_dynamic(default_conf, update, mocker) -> None:
msg_mock = MagicMock()
mocker.patch.multiple(
'freqtrade.rpc.telegram.Telegram',
_init=MagicMock(),
_send_msg=msg_mock
)
mocker.patch('freqtrade.exchange.Exchange.exchange_has', MagicMock(return_value=True))
default_conf['pairlists'] = [{'method': 'VolumePairList',
'number_assets': 4
}]
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
telegram = Telegram(freqtradebot)
telegram._whitelist(update=update, context=MagicMock())
assert msg_mock.call_count == 1
assert ("Using whitelist `['VolumePairList']` with 4 pairs\n"
"`ETH/BTC, LTC/BTC, XRP/BTC, NEO/BTC`" in msg_mock.call_args_list[0][0][0])
def test_edge_disabled(default_conf, update, mocker) -> None:
msg_mock = MagicMock()
mocker.patch.multiple(
'freqtrade.rpc.telegram.Telegram',
_init=MagicMock(),
_send_msg=msg_mock
)
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
telegram = Telegram(freqtradebot)
telegram._edge(update=update, context=MagicMock())
assert msg_mock.call_count == 1
assert "Edge is not enabled." in msg_mock.call_args_list[0][0][0]
def test_cleanup(default_conf, mocker) -> None:
updater_mock = MagicMock()
updater_mock.stop = MagicMock()
mocker.patch('freqtrade.rpc.telegram.Updater', updater_mock)
telegram = Telegram(get_patched_freqtradebot(mocker, default_conf))
telegram.cleanup()
assert telegram._updater.stop.call_count == 1
def test_send_msg_buy_notification_no_fiat(default_conf, mocker) -> None:
del default_conf['fiat_display_currency']
msg_mock = MagicMock()
mocker.patch.multiple(
'freqtrade.rpc.telegram.Telegram',
_init=MagicMock(),
_send_msg=msg_mock
)
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
telegram = Telegram(freqtradebot)
telegram.send_msg({
'type': RPCMessageType.BUY_NOTIFICATION,
'exchange': 'Bittrex',
'pair': 'ETH/BTC',
'limit': 1.099e-05,
'order_type': 'limit',
'stake_amount': 0.001,
'stake_amount_fiat': 0.0,
'stake_currency': 'BTC',
'fiat_currency': None
})
assert msg_mock.call_args[0][0] \
== '*Bittrex:* Buying ETH/BTC\n' \
'at rate `0.00001099\n' \
def test_performance_handle(default_conf, update, ticker, fee,
limit_buy_order, limit_sell_order, mocker) -> None:
msg_mock = MagicMock()
mocker.patch.multiple(
'freqtrade.rpc.telegram.Telegram',
_init=MagicMock(),
_send_msg=msg_mock
)
mocker.patch.multiple(
'freqtrade.exchange.Exchange',
get_ticker=ticker,
get_fee=fee,
)
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
patch_get_signal(freqtradebot, (True, False))
telegram = Telegram(freqtradebot)
# Create some test data
freqtradebot.create_trades()
trade = Trade.query.first()
assert trade
# Simulate fulfilled LIMIT_BUY order for trade
trade.update(limit_buy_order)
# Simulate fulfilled LIMIT_SELL order for trade
trade.update(limit_sell_order)
trade.close_date = datetime.utcnow()
trade.is_open = False
telegram._performance(update=update, context=MagicMock())
assert msg_mock.call_count == 1
get_ticker=ticker,
buy=MagicMock(return_value={'id': 'mocked_order_id'}),
get_fee=fee,
)
msg_mock = MagicMock()
mocker.patch.multiple(
'freqtrade.rpc.telegram.Telegram',
_init=MagicMock(),
_send_msg=msg_mock
)
default_conf['stake_amount'] = 15.0
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
patch_get_signal(freqtradebot, (True, False))
telegram = Telegram(freqtradebot)
freqtradebot.state = State.STOPPED
# Status table is also enabled when stopped
telegram._status_table(update=update, context=MagicMock())
assert msg_mock.call_count == 1
assert 'no active order' in msg_mock.call_args_list[0][0][0]
msg_mock.reset_mock()
freqtradebot.state = State.RUNNING
telegram._status_table(update=update, context=MagicMock())
assert msg_mock.call_count == 1
assert 'no active order' in msg_mock.call_args_list[0][0][0]
msg_mock.reset_mock()
# Create some test data
freqtradebot.create_trades()
def __init__(self, freqtrade) -> None:
""" Initializes all enabled rpc modules """
self.registered_modules: List[RPC] = []
# Enable telegram
if freqtrade.config.get('telegram', {}).get('enabled', False):
logger.info('Enabling rpc.telegram ...')
from freqtrade.rpc.telegram import Telegram
self.registered_modules.append(Telegram(freqtrade))
# Enable Webhook
if freqtrade.config.get('webhook', {}).get('enabled', False):
logger.info('Enabling rpc.webhook ...')
from freqtrade.rpc.webhook import Webhook
self.registered_modules.append(Webhook(freqtrade))
# Enable local rest api server for cmd line control
if freqtrade.config.get('api_server', {}).get('enabled', False):
logger.info('Enabling rpc.api_server')
from freqtrade.rpc.api_server import ApiServer
self.registered_modules.append(ApiServer(freqtrade))