Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_request(self, mock_post, mock_headers, mock_check_status_code):
params = {'test': 'me'}
method = 'test'
url = 'https://historicdata.betfair.com/api/test'
self.historic.request(method=method, params=params, session=None)
mock_post.assert_called_with(
url, headers=mock_headers, data=json.dumps(params),
timeout=(self.historic.connect_timeout, self.historic.read_timeout)
)
assert mock_post.call_count == 1
assert mock_check_status_code.call_count == 1
def request(self, method, params, session):
"""
:param str method: Betfair api-ng method to be used.
:param dict params: Params to be used in request
:param Session session: Requests session to be used, reduces latency.
"""
session = session or self.client.session
date_time_sent = datetime.datetime.utcnow()
try:
response = session.post(
'%s%s' % (self.url, method),
data=json.dumps(params),
headers=self.headers,
timeout=(self.connect_timeout, self.read_timeout)
)
except ConnectionError:
raise APIError(None, method, params, 'ConnectionError')
except Exception as e:
raise APIError(None, method, params, e)
elapsed_time = (datetime.datetime.utcnow()-date_time_sent).total_seconds()
check_status_code(response)
try:
response_data = response.json()
except ValueError:
raise InvalidResponse(response.text)
return response_data, elapsed_time
def _send(self, message):
"""If not running connects socket and
authenticates. Adds CRLF and sends message
to Betfair.
:param message: Data to be sent to Betfair.
"""
if not self._running:
self._connect()
self.authenticate()
message_dumped = json.dumps(message) + self.__CRLF
try:
self._socket.send(message_dumped.encode())
except (socket.timeout, socket.error) as e:
self.stop()
raise SocketError('[Connect: %s]: Socket %s' % (self._unique_id, e))
def json(self):
return json.dumps(self._data)
def create_req(method, params):
"""
:param method: Betfair api-ng method to be used.
:param params: Params to be used in request.
:return: Json payload.
"""
return json.dumps(
{
'jsonrpc': '2.0',
'method': method,
'params': params,
'id': 1
}