Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import itertools
import logging
import requests
from cryptostore.data.store import Store
LOG = logging.getLogger('cryptostore')
def chunk(iterable, length):
return (iterable[i : i + length] for i in range(0, len(iterable), length))
class ElasticSearch(Store):
def __init__(self, config: dict):
self.data = None
self.host = config.host
self.user = config.user
self.token = config.token
self.settings = {'settings': {
"index" : {
"number_of_shards" : config.shards,
"number_of_replicas" : config.replicas,
"refresh_interval": config.refresh_interval
}
}
}
def aggregate(self, data):
self.data = data
'''
Copyright (C) 2018-2020 Bryant Moscon - bmoscon@gmail.com
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
import pandas as pd
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, TICKER, FUNDING, OPEN_INTEREST
from cryptostore.data.store import Store
from cryptostore.engines import StorageEngines
class Arctic(Store):
def __init__(self, connection: str):
self.data = []
self.con = StorageEngines.arctic.Arctic(connection)
def aggregate(self, data):
self.data = data
def write(self, exchange, data_type, pair, timestamp):
chunk_size = None
if not self.data:
return
df = pd.DataFrame(self.data)
self.data = []
df['date'] = pd.to_datetime(df['timestamp'], unit='s')
df['receipt_timestamp'] = pd.to_datetime(df['receipt_timestamp'], unit='s')
associated with this software.
'''
import os
import glob
import pyarrow as pa
import pyarrow.parquet as pq
from cryptostore.data.store import Store
from cryptostore.data.gc import google_cloud_write, google_cloud_read, google_cloud_list
from cryptostore.data.s3 import aws_write, aws_read, aws_list
from cryptostore.data.gd import google_drive_write
from cryptostore.exceptions import InconsistentStorage
class Parquet(Store):
def __init__(self, config=None):
self._write = []
self._read = []
self._list = []
self.bucket = []
self.kwargs = []
self.prefix = []
self.data = None
self.del_file = True
self.file_name = config.get('file_format') if config else None
self.path = config.get('path') if config else None
if config:
self.del_file = config.get('del_file', True)
if 'GCS' in config:
'''
Copyright (C) 2018-2020 Bryant Moscon - bmoscon@gmail.com
Please see the LICENSE file for the terms and conditions
associated with this software.
'''
from cryptostore.data.store import Store
from cryptostore.data.parquet import Parquet
from cryptostore.data.arctic import Arctic
from cryptostore.data.influx import InfluxDB
from cryptostore.data.elastic import ElasticSearch
class Storage(Store):
def __init__(self, config):
self.config = config
if isinstance(config.storage, list):
self.s = [Storage.__init_helper(s, config) for s in config.storage]
else:
self.s = [Storage.__init_helper(config.storage, config)]
@staticmethod
def __init_helper(store, config):
if store == 'parquet':
return Parquet(config.parquet if 'parquet' in config else None)
elif store == 'arctic':
return Arctic(config.arctic)
elif store == 'influx':
return InfluxDB(config.influx)
elif store == 'elastic':
associated with this software.
'''
from decimal import Decimal
from collections import defaultdict
from cryptofeed.defines import TRADES, L2_BOOK, L3_BOOK, TICKER, FUNDING, OPEN_INTEREST
import requests
from cryptostore.data.store import Store
def chunk(iterable, length):
return (iterable[i : i + length] for i in range(0, len(iterable), length))
class InfluxDB(Store):
def __init__(self, config: dict):
self.data = None
self.host = config.host
self.db = config.db
self.addr = f"{config.host}/write?db={config.db}"
if 'create' in config and config.create:
r = requests.post(f'{config.host}/query', data={'q': f'CREATE DATABASE {config.db}'})
r.raise_for_status()
def aggregate(self, data):
self.data = data
def write(self, exchange, data_type, pair, timestamp):
if not self.data:
return
agg = []