Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_api_method_init_order(logged_in_client):
request = apimethod.APIMethod(logged_in_client, 'SportsAPING/v1.0/placeOrders',
{'instructions': [1, 2, 3]}, 'exchange')
assert request.instructions_length == 3
def test_api_method_create_resp(logged_in_client):
request_object = type('obj', (object,), {'status_code': 'error'})
request = apimethod.APIMethod(logged_in_client, 'method', 'params', 'exchange')
with pytest.raises(APIError):
request.create_resp(request_object, None)
def test_api_method_errors(logged_in_client):
request = apimethod.APIMethod(logged_in_client, 'method', 'params', 'exchange')
with pytest.raises(BetfairError):
request()
request = apimethod.BettingRequest(logged_in_client, 'method', 'params', 'exchange')
with pytest.raises(BetfairError):
request()
request = apimethod.OrderRequest(logged_in_client, 'method', 'params', 'exchange')
with pytest.raises(BetfairError):
request()
request = apimethod.BettingRequest(logged_in_client, 'method', 'params', 'exchange')
with pytest.raises(BetfairError):
request()
request = apimethod.AccountRequest(logged_in_client, 'method', 'params', 'exchange')
with pytest.raises(BetfairError):
request()
request = apimethod.ScoresRequest(logged_in_client, 'method', 'params', 'exchange')
with pytest.raises(BetfairError):
class KeepAlive(APIMethod):
"""KeepAlive method"""
_error_handler = staticmethod(apierrorhandling.api_keep_alive_error_handling)
_error = KeepAliveError
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
if not session:
session = self._api_client.request
response = session.post(url, headers=self._api_client.keep_alive_headers, cert=self._api_client.cert)
return self.create_resp(response, date_time_sent)
class Logout(APIMethod):
"""Logout method"""
_error_handler = staticmethod(apierrorhandling.api_logout_error_handling)
_error = LogoutError
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
if not session:
session = self._api_client.request
response = session.get(url, headers=self._api_client.keep_alive_headers, cert=self._api_client.cert)
return self.create_resp(response, date_time_sent)
class BettingRequest(APIMethod):
"""Betting method"""
pass
class NavigationRequest(APIMethod):
"""Navigation method"""
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
headers = self._api_client.request_headers
try:
response = self._api_client.request.get(url, headers=headers, timeout=(3.05, 12))
except Exception as e:
raise APIError(None, self.params, e)
return self.create_resp(response, date_time_sent)
class ScoresBroadcastRequest(APIMethod):
_error_handler = None
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
headers = self._api_client.request_headers
data = {'eventIds': '[%s]' % ', '.join(map(str, self.params)),
'alt': 'json',
'productType': 'EXCHANGE',
'regionCode': 'UK',
'locale': 'en_GB'}
response = self._api_client.request.get(url, params=data, headers=headers)
return self.create_resp(response, date_time_sent)
class OrderRequest(APIMethod):
"""Order method"""
_error_handler = staticmethod(apierrorhandling.api_order_error_handling)
class AccountRequest(APIMethod):
"""Account method"""
_timeout = 6.05 # todo account error handling
class ScoresRequest(APIMethod):
"""Scores method"""
pass
class NavigationRequest(APIMethod):
"""Navigation method"""
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
headers = self._api_client.request_headers
try:
response = self._api_client.request.get(url, headers=headers, timeout=(3.05, 12))
except Exception as e:
raise APIError(None, self.params, e)
return self.create_resp(response, date_time_sent)
class ScoresBroadcastRequest(APIMethod):
_error_handler = None
class Logout(APIMethod):
"""Logout method"""
_error_handler = staticmethod(apierrorhandling.api_logout_error_handling)
_error = LogoutError
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
if not session:
session = self._api_client.request
response = session.get(url, headers=self._api_client.keep_alive_headers, cert=self._api_client.cert)
return self.create_resp(response, date_time_sent)
class BettingRequest(APIMethod):
"""Betting method"""
pass
class OrderRequest(APIMethod):
"""Order method"""
_error_handler = staticmethod(apierrorhandling.api_order_error_handling)
class AccountRequest(APIMethod):
"""Account method"""
_timeout = 6.05 # todo account error handling
class ScoresRequest(APIMethod):
"""Scores method"""
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
if not session:
session = self._api_client.request
response = session.get(url, headers=self._api_client.keep_alive_headers, cert=self._api_client.cert)
return self.create_resp(response, date_time_sent)
class BettingRequest(APIMethod):
"""Betting method"""
pass
class OrderRequest(APIMethod):
"""Order method"""
_error_handler = staticmethod(apierrorhandling.api_order_error_handling)
class AccountRequest(APIMethod):
"""Account method"""
_timeout = 6.05 # todo account error handling
class ScoresRequest(APIMethod):
"""Scores method"""
pass
class NavigationRequest(APIMethod):
"""Navigation method"""
class BettingRequest(APIMethod):
"""Betting method"""
pass
class OrderRequest(APIMethod):
"""Order method"""
_error_handler = staticmethod(apierrorhandling.api_order_error_handling)
class AccountRequest(APIMethod):
"""Account method"""
_timeout = 6.05 # todo account error handling
class ScoresRequest(APIMethod):
"""Scores method"""
pass
class NavigationRequest(APIMethod):
"""Navigation method"""
def __call__(self, session=None):
url = self._api_client.get_url(str(self), self.exchange)
date_time_sent = datetime.datetime.now()
headers = self._api_client.request_headers
try:
response = self._api_client.request.get(url, headers=headers, timeout=(3.05, 12))
except Exception as e:
raise APIError(None, self.params, e)
return self.create_resp(response, date_time_sent)