How to use the nameko.testing.services.entrypoint_hook function in nameko

To help you get started, we’ve selected a few nameko examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github nameko / nameko-sqlalchemy / test / test_transaction_retry.py View on Github external
def test_retries(self, toxiproxy, disconnect, container):
        if not toxiproxy:
            pytest.skip('Toxiproxy not installed')

        with entrypoint_hook(container, 'create_record') as create_record:
            create_record()

        disconnect(reconnect=True)

        with entrypoint_hook(container, 'get_record_count') as hook:
            assert hook() == 1
github nameko / nameko / test / web / test_websocket.py View on Github external
def test_unicast_unknown(container):
    with entrypoint_hook(container, 'unicast') as unicast:
        assert not unicast(target_socket_id=0, value=42)
github nameko / nameko-sqlalchemy / test / test_database.py View on Github external
def test_successful_write_and_read(slf, container, db_uri):

        # write through the service
        with entrypoint_hook(container, 'write') as write:
            write(key='spam', value='ham')

        # verify changes written to disk
        entries = list(
            create_engine(db_uri).execute(
                'SELECT key, value FROM example LIMIT 1'))
        assert entries == [('spam', 'ham',)]

        # read through the service
        with entrypoint_hook(container, 'read') as read:
            assert read('spam') == 'ham'
github nameko / nameko / test / test_messaging.py View on Github external
# call 1 succeeds
        payload1 = "payload1"
        with entrypoint_waiter(consumer_container, 'recv'):
            with entrypoint_hook(publisher_container, 'send') as send:
                send(payload1)

        assert tracker.call_args_list == [
            call("send", payload1),
            call("recv", payload1),
        ]

        # call 2 succeeds
        payload2 = "payload2"
        with entrypoint_waiter(consumer_container, 'recv'):
            with entrypoint_hook(publisher_container, 'send') as send:
                send(payload2)

        assert tracker.call_args_list == [
            call("send", payload1),
            call("recv", payload1),
            call("send", payload2),
            call("recv", payload2),
        ]
github nameko / nameko-sqlalchemy / test / test_session_scope.py View on Github external
def test_successful_write_and_read(slf, container, db_uri):

        # write through the service
        with entrypoint_hook(container, 'write') as write:
            write(key='spam', value='ham')

        # verify changes written to disk
        entries = list(
            create_engine(db_uri).execute(
                'SELECT key, value FROM example LIMIT 1'))
        assert entries == [('spam', 'ham',)]

        # read through the service
        with entrypoint_hook(container, 'read') as read:
            assert read('spam') == 'ham'
github nameko / nameko-sqlalchemy / test / test_transaction_retry.py View on Github external
def test_raises_without_retry(self, toxiproxy, disconnect, container):
        if not toxiproxy:
            pytest.skip('Toxiproxy not installed')

        with entrypoint_hook(container, 'create_record') as create_record:
            create_record()

        disconnect(reconnect=True)

        with entrypoint_hook(container, 'get_record_count_no_retry') as hook:
            with pytest.raises(OperationalError):
                hook()
github nameko / nameko / test / web / test_websocket.py View on Github external
def test_close(container, websocket):
    ws = websocket()

    ws.rpc('subscribe')

    with entrypoint_hook(container, 'list_subscribers') as list_subscribers:
        subscribers1 = list_subscribers()
        assert subscribers1['test_channel']

    ws.app.close()

    with entrypoint_hook(container, 'list_subscribers') as list_subscribers:
        subscribers2 = list_subscribers()
        assert not subscribers2['test_channel']
github nameko / nameko / test / test_entrypoints.py View on Github external
logger = Logger()

            @dummy(sensitive_arguments=("a", "b.x[0]", "b.x[2]"))
            def method(self, a, b, c):
                return [a, b, c]

        container = container_factory(Service, {})
        entrypoint = get_extension(container, Entrypoint)

        assert entrypoint.sensitive_arguments == ("a", "b.x[0]", "b.x[2]")

        a = "A"
        b = {'x': [1, 2, 3], 'y': [4, 5, 6]}
        c = "C"

        with entrypoint_hook(container, "method") as method:
            assert method(a, b, c) == [a, b, c]

        assert redacted == {
            'a': REDACTED,
            'b': {
                'x': [REDACTED, 2, REDACTED],
                'y': [4, 5, 6]
            },
            'c': 'C'
        }
github nameko / nameko / test / test_serialization.py View on Github external
def publish(consumer, publisher):
        with entrypoint_waiter(consumer, "handle_event"):
            with entrypoint_hook(publisher, "dispatch_event") as dispatch:
                dispatch(payload)
github nameko / nameko / test / test_serialization.py View on Github external
        @event_handler('srcservice', 'eventtype')
        def handle_event(self, event_data):
            handler_called(event_data)

    rabbit_config[SERIALIZER_CONFIG_KEY] = serializer
    container = container_factory(Service, rabbit_config)
    container.start()

    get_messages = sniffer_queue_factory(
        "srcservice.events", routing_key="eventtype")

    serialized = serialized_info[serializer]

    # dispatch an event with a tuple payload
    with entrypoint_waiter(container, "handle_event"):
        with entrypoint_hook(container, "dispatch_event") as dispatch_event:
            dispatch_event(test_data)

    # verify data serialized to expected value
    assert handler_called.call_args == call(serialized['data'])

    # verify sniffed messages serialized as expected
    msg = get_messages()[0]
    assert msg['properties']['content_type'] == serialized['content_type']