Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
message_expected = {
"type": "heartbeat",
"last_trade_id": 17393422,
"product_id": "ETH-USD",
"sequence": 2,
"time": "2017-06-25T11:23:14.838000Z"
}
mock_connect.return_value.aenter.receive_str.side_effect = [
json.dumps(message_expected),
]
product_ids = ['ETH-USD']
async with gdax.orderbook.OrderBook(product_ids,
use_heartbeat=True) as orderbook:
subscribe_msg = {'type': 'subscribe', 'product_ids': product_ids}
heartbeat_msg = {'type': 'heartbeat', 'on': True}
calls = [call(subscribe_msg), call(heartbeat_msg)]
mock_connect.return_value.aenter.send_json.assert_has_calls(calls)
message = await orderbook.handle_message()
assert message == message_expected
def test_assert_has_awaits(self):
calls = [asynctest.call('bar'), asynctest.call('baz')]
with self.subTest('any_order=False'):
mock = asynctest.mock.CoroutineFunctionMock()
with self.assertRaises(AssertionError):
mock.assert_has_awaits(calls)
yield from mock('foo')
with self.assertRaises(AssertionError):
mock.assert_has_awaits(calls)
yield from mock('bar')
with self.assertRaises(AssertionError):
mock.assert_has_awaits(calls)
yield from mock('baz')
pr_labels_mock.assert_called_once_with(pr_number='test_pr_number', repository='test_repo')
expected_hook_1 = HookDetails(event_type=EventTypes.labeled,
repository='test_repo',
branch='test_branch',
sha='test_sha',
labels='label_one')
expected_hook_2 = HookDetails(event_type=EventTypes.labeled,
repository='test_repo',
branch='test_branch',
sha='test_sha',
labels='label_two')
assert mock_trigger_registered_jobs.call_count == 2
assert mock_trigger_registered_jobs.call_args_list[0] == call(expected_hook_1)
assert mock_trigger_registered_jobs.call_args_list[1] == call(expected_hook_2)
async def test_limit_concurrency(self):
mock = asynctest.CoroutineMock()
v = 0
async def cor():
nonlocal v
new_v = v + 1
await sleep(1)
v = new_v
await mock(v)
await ss.await_all(ss.limit_concurrency((cor() for i in range(10)), 1))
mock.assert_has_awaits([asynctest.call(i) for i in range(1, 11)])
async def test_cannot_trigger_registered_job_by_branch(gh_sut: GithubController,
mock_two_elem_collection: MagicMock,
mock_trigger_registered_job: MagicMock,
mock_cannot_trigger_job_by_branch: MagicMock):
# noinspection PyUnresolvedReferences
with patch.object(gh_sut, '_GithubController__get_collection_for_hook_type',
return_value=mock_two_elem_collection) as get_collection_method:
hook_details = HookDetails(event_type=EventTypes.push,
repository='test_repo',
branch='test_branch',
sha='test_sha')
await gh_sut.trigger_registered_jobs(hook_details)
assert mock_cannot_trigger_job_by_branch.call_count == 2
calls = [call('test_job_2', 'test_branch'),
call('test_job_1', 'test_branch')]
mock_cannot_trigger_job_by_branch.assert_has_calls(calls)
mock_trigger_registered_job.assert_not_called()
get_collection_method.assert_called_once_with(EventTypes.push)
await hass.services.async_call(
DOMAIN,
"get",
{"file_path": "/tmp/some_file", "key": "some_key", "bucket": "some_bucket"},
blocking=True,
)
assert minio_client.fget_object.call_args == call(
"some_bucket", "some_key", "/tmp/some_file"
)
minio_client.reset_mock()
await hass.services.async_call(
DOMAIN, "remove", {"key": "some_key", "bucket": "some_bucket"}, blocking=True
)
assert minio_client.remove_object.call_args == call("some_bucket", "some_key")
minio_client.reset_mock()
return_value='sha')
hook_data = {'comment': {'body': Labels.label_sync},
'repository': {'full_name': 'test_repo'},
'issue': {'number': 1, 'labels': [{'name': 'label1'}, {'name': 'label2'}]}}
await gh_sut.handle_comment(hook_data)
pr_branch_mock.assert_called_once_with(1, 'test_repo')
pr_sha_mock.assert_called_once_with(1, 'test_repo')
assert mock_trigger_registered_jobs.call_count == 2
hook_details_1 = call(HookDetails(event_type=EventTypes.labeled,
branch='test_branch',
repository='test_repo',
sha='sha',
labels='label1'))
hook_details_2 = call(HookDetails(event_type=EventTypes.labeled,
branch='test_branch',
repository='test_repo',
sha='sha',
labels='label2'))
assert mock_trigger_registered_jobs.call_args_list[0] == hook_details_1
assert mock_trigger_registered_jobs.call_args_list[1] == hook_details_2
mocker: MockFixture,
mock_trigger_registered_jobs: MagicMock):
pr_branch_mock = mocker.patch.object(gh_sut, 'get_pr_branch',
return_value='test_branch')
pr_sha_mock = mocker.patch.object(gh_sut, 'get_latest_commit_sha',
return_value='sha')
hook_data = {'comment': {'body': Labels.label_sync},
'repository': {'full_name': 'test_repo'},
'issue': {'number': 1, 'labels': [{'name': 'label1'}, {'name': 'label2'}]}}
await gh_sut.handle_comment(hook_data)
pr_branch_mock.assert_called_once_with(1, 'test_repo')
pr_sha_mock.assert_called_once_with(1, 'test_repo')
assert mock_trigger_registered_jobs.call_count == 2
hook_details_1 = call(HookDetails(event_type=EventTypes.labeled,
branch='test_branch',
repository='test_repo',
sha='sha',
labels='label1'))
hook_details_2 = call(HookDetails(event_type=EventTypes.labeled,
branch='test_branch',
repository='test_repo',
sha='sha',
labels='label2'))
assert mock_trigger_registered_jobs.call_args_list[0] == hook_details_1
assert mock_trigger_registered_jobs.call_args_list[1] == hook_details_2
def test_await_args(self):
with self.subTest('in order'):
mock = asynctest.mock.CoroutineFunctionMock()
t1 = mock('foo')
t2 = mock('bar')
yield from t1
yield from t2
self.assertEqual(mock.await_args, asynctest.call('bar'))
with self.subTest('out of order'):
mock = asynctest.mock.CoroutineFunctionMock()
t1 = mock('foo')
t2 = mock('bar')
yield from t2
yield from t1
self.assertEqual(mock.await_args, asynctest.call('foo'))