Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from octobot.channels.octobot_channel import OctoBotChannelProducer
from octobot.constants import PROJECT_NAME, LONG_VERSION, CONFIG_KEY
from octobot_backtesting.api.backtesting import is_backtesting_enabled
from octobot_commons.enums import OctoBotChannelSubjects
from octobot_services.api.interfaces import create_interface_factory, initialize_global_project_data, is_enabled, \
start_interfaces, is_enabled_in_backtesting
from octobot_services.api.notification import create_notifier_factory, is_enabled_in_config, \
process_pending_notifications
from octobot_services.interfaces.util.bot import get_bot_api
from octobot_services.managers.interface_manager import stop_interfaces
from octobot_services.consumers.octobot_channel_consumer import OctoBotChannelServiceActions as ServiceActions, \
OctoBotChannelServiceDataKeys as ServiceKeys
from octobot_tentacles_manager.api.configurator import is_tentacle_activated_in_tentacles_setup_config
class InterfaceProducer(OctoBotChannelProducer):
"""Initializer class:
- Initialize services, constants and tools
"""
def __init__(self, channel, octobot):
super().__init__(channel)
self.octobot = octobot
self.interfaces = []
self.notifiers = []
self.to_create_notifiers_count = 0
async def start(self):
in_backtesting = is_backtesting_enabled(self.octobot.config)
await self._create_interfaces(in_backtesting)
await self._create_notifiers(in_backtesting)
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from octobot_commons.constants import CONFIG_ENABLED_OPTION
from octobot_trading.constants import CONFIG_EXCHANGES
from octobot.channels.octobot_channel import OctoBotChannelProducer
from octobot_commons.enums import OctoBotChannelSubjects
from octobot_trading.consumers.octobot_channel_consumer import OctoBotChannelTradingActions as TradingActions, \
OctoBotChannelTradingDataKeys as TradingKeys
class ExchangeProducer(OctoBotChannelProducer):
def __init__(self, channel, octobot, backtesting, ignore_config=False):
super().__init__(channel)
self.octobot = octobot
self.ignore_config = ignore_config
self.backtesting = backtesting
self.exchange_manager_ids = []
async def start(self):
for exchange_name in self.octobot.config[CONFIG_EXCHANGES]:
if self.octobot.config[CONFIG_EXCHANGES][exchange_name].get(CONFIG_ENABLED_OPTION, True):
await self.create_exchange(exchange_name, self.backtesting)
async def create_exchange(self, exchange_name, backtesting):
await self.send(bot_id=self.octobot.bot_id,
subject=OctoBotChannelSubjects.CREATION.value,
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
from octobot.channels.octobot_channel import OctoBotChannelProducer
from octobot.constants import CONFIG_KEY
from octobot_backtesting.api.backtesting import is_backtesting_enabled
from octobot_commons.enums import OctoBotChannelSubjects
from octobot_services.api.service_feeds import create_service_feed_factory, stop_service_feed
from octobot_services.consumers.octobot_channel_consumer import OctoBotChannelServiceActions as ServiceActions, \
OctoBotChannelServiceDataKeys as ServiceKeys
from octobot_tentacles_manager.api.configurator import is_tentacle_activated_in_tentacles_setup_config
class ServiceFeedProducer(OctoBotChannelProducer):
"""EvaluatorFactory class:
- Create service feeds
"""
def __init__(self, channel, octobot):
super().__init__(channel)
self.octobot = octobot
self.started = False
self.service_feeds = []
async def start(self):
in_backtesting = is_backtesting_enabled(self.octobot.config)
service_feed_factory = create_service_feed_factory(self.octobot.config,
self.octobot.async_loop,
self.octobot.bot_id)