Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def client_aus():
return APIClient('username', 'password', 'app_key', 'AUS')
def setUp(self):
client = APIClient('username', 'password', 'app_key', 'UK')
self.keep_alive = KeepAlive(client)
response_result_list = {'result': [{}, {}]}
response = self.base_endpoint.process_response(response_result_list, mock_resource, None, False)
assert type(response) == list
assert response[0] == mock_resource()
response_result = {'result': {}}
response = self.base_endpoint.process_response(response_result, mock_resource, None, False)
assert response == mock_resource()
# lightweight tests
response_list = [{}, {}]
response = self.base_endpoint.process_response(response_list, mock_resource, None, True)
assert response == response_list
client = APIClient('username', 'password', 'app_key', lightweight=True)
base_endpoint = BaseEndpoint(client)
response_list = [{}, {}]
response = base_endpoint.process_response(response_list, mock_resource, None, False)
assert type(response) == list
assert response[0] == mock_resource()
def setUp(self):
self.client = APIClient('bf_username', 'password', 'app_key', os.path.normpath('fail/'))
def setUp(self):
self.client = APIClient("username", "password", "app_key", "UK")
self.race_card = RaceCard(self.client)
def client():
return APIClient('username', 'password', 'app_key', 'UK')
import logging
import betfairlightweight
from betfairlightweight import StreamListener
from betfairlightweight.streaming.stream import MarketStream
"""
Data needs to be downloaded from:
https://historicdata.betfair.com
"""
# setup logging
logging.basicConfig(level=logging.INFO)
# create trading instance (no need to put in correct details)
trading = betfairlightweight.APIClient('username', 'password')
class HistoricalStream(MarketStream):
# create custom listener and stream
def __init__(self, listener):
super(HistoricalStream, self).__init__(listener)
with open('output.txt', 'w') as output:
output.write('Time,MarketId,Status,Inplay,SelectionId,LastPriceTraded\n')
def on_process(self, market_books):
with open('output.txt', 'a') as output:
for market_book in market_books:
for runner in market_book.runners:
# how to get runner details from the market definition
from flumine import Flumine, clients
from flumine.streams.datastream import DataStream
from strategies.marketrecorder import MarketRecorder
logger = logging.getLogger()
custom_format = "%(asctime) %(levelname) %(message)"
log_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(custom_format)
formatter.converter = time.gmtime
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.setLevel(logging.INFO)
trading = betfairlightweight.APIClient("username")
client = clients.BetfairClient(trading)
framework = Flumine(client=client)
strategy = MarketRecorder(
name="WIN",
market_filter=betfairlightweight.filters.streaming_market_filter(
event_type_ids=["7"], country_codes=["GB", "IE"], market_types=["WIN"],
),
stream_class=DataStream,
context={"local_dir": "/tmp", "force_update": False, "remove_file": True,},
)
framework.add_strategy(strategy)
framework.run()
import betfairlightweight
from betfairlightweight import filters
# create trading instance
trading = betfairlightweight.APIClient("username", "password", app_key="appKey")
# login
trading.login()
# update for test
market_id = "1.131347484"
selection_id = 12029579
def place_order():
# placing an order
limit_order = filters.limit_order(size=2.00, price=1.01, persistence_type="LAPSE")
instruction = filters.place_instruction(
order_type="LIMIT",
selection_id=selection_id,
side="LAY",
import betfairlightweight
"""
Historic is the API endpoint that can be used to
download data betfair provide.
https://historicdata.betfair.com/#/apidocs
"""
# setup logging
logging.basicConfig(level=logging.INFO) # change to DEBUG to see log all updates
# create trading instance
username = os.environ.get('username')
trading = betfairlightweight.APIClient(username)
trading.login()
# get my data
my_data = trading.historic.get_my_data()
for i in my_data:
print(i)
# get collection options (allows filtering)
collection_options = trading.historic.get_collection_options("Horse Racing", "Basic Plan", 1, 3, 2017, 1, 3, 2017)
print(collection_options)
# get advance basket data size
basket_size = trading.historic.get_data_size("Horse Racing", "Basic Plan", 1, 3, 2017, 1, 3, 2017)
print(basket_size)
# get file list