Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'''
from yapic import json
import logging
from decimal import Decimal
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed
from cryptofeed.defines import TRADES, BUY, SELL, BID, ASK, TICKER, L2_BOOK, KRAKEN
from cryptofeed.standards import pair_exchange_to_std
LOG = logging.getLogger('feedhandler')
class Kraken(Feed):
id = KRAKEN
def __init__(self, pairs=None, channels=None, callbacks=None, depth=1000, **kwargs):
super().__init__('wss://ws.kraken.com', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
self.book_depth = depth
def __reset(self):
self.l2_book = {}
self.channel_map = {}
async def subscribe(self, websocket):
self.__reset()
if self.config:
for chan in self.config:
sub = {"name": chan}
if 'book' in chan:
'''
from yapic import json
import logging
import requests
from decimal import Decimal
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed
from cryptofeed.defines import TRADES, BUY, SELL, BID, ASK, TICKER, FUNDING, L2_BOOK, KRAKEN_FUTURES, OPEN_INTEREST
from cryptofeed.standards import timestamp_normalize
LOG = logging.getLogger('feedhandler')
class KrakenFutures(Feed):
id = KRAKEN_FUTURES
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://futures.kraken.com/ws/v1', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
instruments = self.get_instruments()
if self.config:
config_instruments = list(self.config.values())
self.pairs = [
pair for inner in config_instruments for pair in inner]
for pair in self.pairs:
if pair not in instruments:
raise ValueError(f"{pair} is not active on {self.id}")
self.__reset()
DEC_S: Enable all decimal as strings.
TIME_S: Enable all times as date strings.
TIMESTAMP: Timestamp in milliseconds.
SEQ_ALL: Enable sequencing BETA FEATURE
CHECKSUM: Enable checksum for every book iteration.
Checks the top 25 entries for each side of book.
Checksum is a signed int.
"""
DEC_S = 8
TIME_S = 32
TIMESTAMP = 32768
SEQ_ALL = 65536
CHECKSUM = 131072
class Bitfinex(Feed):
id = BITFINEX
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
if channels is not None and FUNDING in channels:
if len(channels) > 1:
raise ValueError("Funding channel must be in a separate feedhanlder on Bitfinex or you must use config")
super().__init__('wss://api.bitfinex.com/ws/2', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
self.__reset()
def __reset(self):
self.l2_book = {}
self.l3_book = {}
'''
channel map maps channel id (int) to a dict of
symbol: channel's currency
channel: channel name
'''
from yapic import json
import logging
from decimal import Decimal
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed
from cryptofeed.defines import TICKER, L2_BOOK, TRADES, BUY, SELL, BID, ASK, DSX as DSX_id
from cryptofeed.standards import pair_exchange_to_std, timestamp_normalize
LOG = logging.getLogger('feedhandler')
class DSX(Feed):
id = DSX_id
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://api.dsxglobal.com/api/2/ws',
pairs=pairs,
channels=channels,
callbacks=callbacks,
**kwargs)
async def _ticker(self, msg: dict, timestamp: float):
await self.callback(TICKER, feed=self.id,
pair=pair_exchange_to_std(msg['symbol']),
bid=Decimal(msg['bid']),
ask=Decimal(msg['ask']),
timestamp=timestamp_normalize(self.id, msg['timestamp']),
receipt_timestamp=timestamp)
from yapic import json
import logging
from decimal import Decimal
import zlib
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed
from cryptofeed.defines import TRADES, BUY, SELL, BID, ASK, TICKER, L2_BOOK, OKCOIN, OPEN_INTEREST, FUNDING, TICKER_FUTURES, TICKER_SWAP, TRADES_FUTURES, TRADES_SWAP
from cryptofeed.standards import pair_exchange_to_std, timestamp_normalize
LOG = logging.getLogger('feedhandler')
class OKCoin(Feed):
id = OKCOIN
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://real.okcoin.com:8443/ws/v3', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
self.book_depth = 200
self.open_interest = {}
def __reset(self):
self.l2_book = {}
self.open_interest = {}
async def subscribe(self, websocket):
self.__reset()
if self.config:
for chan in self.config:
args = [f"{chan}:{pair}" for pair in self.config[chan]]
from decimal import Decimal
from time import time as time
from datetime import datetime
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed, RestFeed
from cryptofeed.defines import FTX as FTX_id
from cryptofeed.defines import TRADES, BUY, SELL, BID, ASK, TICKER, L2_BOOK, FUNDING, LIQUIDATIONS, OPEN_INTEREST
from cryptofeed.standards import pair_exchange_to_std, timestamp_normalize
LOG = logging.getLogger('feedhandler')
class FTX(Feed):
id = FTX_id
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://ftexchange.com/ws/', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
def __reset(self):
self.l2_book = {}
self.funding = {}
self.open_interest = {}
async def subscribe(self, websocket):
self.websocket = websocket
self.__reset()
for chan in self.channels if self.channels else self.config:
if chan == FUNDING:
asyncio.create_task(self._funding(self.pairs if self.pairs else self.config[chan]))
from decimal import Decimal
import requests
import zlib
import base64
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed
from cryptofeed.defines import BITTREX, BUY, SELL, TRADES, BID, ASK, L2_BOOK, TICKER
from cryptofeed.standards import timestamp_normalize, pair_exchange_to_std
LOG = logging.getLogger('feedhandler')
class Bittrex(Feed):
id = BITTREX
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://socket.bittrex.com/signalr', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
r = requests.get('https://socket.bittrex.com/signalr/negotiate', params={'connectionData': json.dumps([{'name': 'c2'}]), 'clientProtocol': 1.5})
token = r.json()['ConnectionToken']
url = requests.Request('GET', 'https://socket.bittrex.com/signalr/connect', params={'transport': 'webSockets', 'connectionToken': token, 'connectionData': json.dumps([{"name": "c2"}]), 'clientProtocol': 1.5}).prepare().url
url = url.replace('https://', 'wss://')
self.address = url
def __reset(self):
self.l2_book = {}
async def ticker(self, msg: dict, timestamp: float):
for t in msg['D']:
if (not self.config and t['M'] in self.pairs) or ('SubscribeToSummaryDeltas' in self.config and t['M'] in self.config['SubscribeToSummaryDeltas']):
'''
from yapic import json
import logging
from decimal import Decimal
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed
from cryptofeed.defines import L2_BOOK, BUY, SELL, BID, ASK, TRADES, GEMINI
from cryptofeed.standards import pair_exchange_to_std, timestamp_normalize
LOG = logging.getLogger('feedhandler')
class Gemini(Feed):
id = GEMINI
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://api.gemini.com/v2/marketdata/',
pairs=pairs,
channels=channels,
callbacks=callbacks,
**kwargs)
def __reset(self, pairs):
for pair in pairs:
self.l2_book[pair_exchange_to_std(pair)] = {BID: sd(), ASK: sd()}
async def _book(self, msg: dict, timestamp: float):
pair = pair_exchange_to_std(msg['symbol'])
# Gemini sends ALL data for the symbol, so if we don't actually want
import logging
from decimal import Decimal
import time
import requests
from sortedcontainers import SortedDict as sd
from cryptofeed.feed import Feed
from cryptofeed.defines import L2_BOOK, L3_BOOK, BUY, SELL, BID, ASK, TRADES, TICKER, COINBASE
from cryptofeed.standards import timestamp_normalize, pair_exchange_to_std
LOG = logging.getLogger('feedhandler')
class Coinbase(Feed):
id = COINBASE
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://ws-feed.pro.coinbase.com', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
self.__reset()
def __reset(self):
self.order_map = {}
self.seq_no = {}
self.l3_book = {}
self.l2_book = {}
async def _ticker(self, msg: dict, timestamp: float):
'''
{
'type': 'ticker',
import logging
from decimal import Decimal
from sortedcontainers import SortedDict as sd
from cryptofeed.exceptions import MissingSequenceNumber
from cryptofeed.feed import Feed
from cryptofeed.defines import BITCOINCOM
from cryptofeed.defines import TRADES, BUY, SELL, BID, ASK, TICKER, L2_BOOK
from cryptofeed.standards import pair_exchange_to_std, timestamp_normalize
LOG = logging.getLogger('feedhandler')
class BitcoinCom(Feed):
id = BITCOINCOM
def __init__(self, pairs=None, channels=None, callbacks=None, **kwargs):
super().__init__('wss://api.exchange.bitcoin.com/api/2/ws', pairs=pairs, channels=channels, callbacks=callbacks, **kwargs)
self.__reset()
def __reset(self):
self.l2_book = {}
self.seq_no = {}
async def subscribe(self, websocket):
self.websocket = websocket
self.__reset()
for chan in self.channels if self.channels else self.config:
for pair in self.pairs if self.pairs else self.config[chan]:
await websocket.send(json.dumps(