How to use djangochannelsrestframework - 10 common examples

To help you get started, we’ve selected a few djangochannelsrestframework examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github hishnash / djangochannelsrestframework / tests / test_generic_consumer.py View on Github external
        @action()
        def test_sync_action(self, pk=None, **kwargs):
            user = self.get_object(pk=pk)

            s = self.get_serializer(
                action_kwargs={'pk': pk},
                instance=user
            )
            return s.data, 200
github hishnash / djangochannelsrestframework / tests / test_consumer.py View on Github external
        @action()
        def test_sync_action(self, pk=None, **kwargs):
            results["test_sync_action"] = pk
            return {"pk": pk, "sync": True}, 200
github hishnash / djangochannelsrestframework / tests / test_consumer.py View on Github external
        @action()
        async def test_async_action(self, pk=None, **kwargs):
            results["test_action"] = pk
            return {"pk": pk}, 200
github hishnash / djangochannelsrestframework / tests / test_model_observer.py View on Github external
        @action()
        async def update_username(self, pk=None, name=None, **kwargs):
            tm = await database_sync_to_async(self.get_object)(pk=pk)
            tm.name = name
            await database_sync_to_async(tm.save)()
            return {'pk': pk}, 200
github hishnash / djangochannelsrestframework / tests / test_observer.py View on Github external
async def test_model_observer_custom_groups_wrapper(settings):
    settings.CHANNEL_LAYERS = {
        "default": {
            "BACKEND": "channels.layers.InMemoryChannelLayer",
            "TEST_CONFIG": {"expiry": 100500,},
        },
    }

    layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)

    class TestConsumer(AsyncAPIConsumer):
        async def accept(self, **kwargs):
            await self.user_change.subscribe(username="test")
            await super().accept()

        @model_observer(get_user_model())
        async def user_change(self, message, **kwargs):
            await self.send_json(message)

        @user_change.groups
        def user_change(self, instance=None, username=None, **kwargs):
            if username:
                yield "-instance-username-{}".format(slugify(username))
            else:
                yield "-instance-username-{}".format(instance.username)

    communicator = WebsocketCommunicator(TestConsumer, "/testws/")
github hishnash / djangochannelsrestframework / tests / test_consumer.py View on Github external
async def test_decorator():

    results = {}

    class AConsumer(AsyncAPIConsumer):
        @action()
        async def test_async_action(self, pk=None, **kwargs):
            results["test_action"] = pk
            return {"pk": pk}, 200

        @action()
        def test_sync_action(self, pk=None, **kwargs):
            results["test_sync_action"] = pk
            return {"pk": pk, "sync": True}, 200

    # Test a normal connection
    communicator = WebsocketCommunicator(AConsumer, "/testws/")

    connected, _ = await communicator.connect()

    assert connected
github hishnash / djangochannelsrestframework / tests / test_observer.py View on Github external
"TEST_CONFIG": {"expiry": 100500,},
        },
    }

    layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)

    class TestConsumer(AsyncAPIConsumer):
        async def accept(self, **kwargs):
            await self.user_change.subscribe()
            await super().accept()

        @model_observer(get_user_model())
        async def user_change(self, message, **kwargs):
            await self.send_json(message)

    class TestConsumer2(AsyncAPIConsumer):
        async def accept(self, **kwargs):
            await self.user_other.subscribe()
            await super().accept()

        @model_observer(get_user_model())
        async def user_other(self, message, **kwargs):
            await self.send_json(message)

    communicator1 = WebsocketCommunicator(TestConsumer, "/testws/")

    connected, _ = await communicator1.connect()

    assert connected

    communicator2 = WebsocketCommunicator(TestConsumer2, "/testws/")
github hishnash / djangochannelsrestframework / tests / test_model_observer.py View on Github external
async def test_unsubscribe_observer_model_instance_mixin(settings):
    settings.CHANNEL_LAYERS={
        "default": {
            "BACKEND": "channels.layers.InMemoryChannelLayer",
            "TEST_CONFIG": {
                "expiry": 100500,
            },
        },
    }

    layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)

    class TestConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):

        queryset = get_user_model().objects.all()
        serializer_class = UserSerializer

        async def accept(self, subprotocol=None):
            await super().accept()

        @action()
        async def update_username(self, pk=None, username=None, **kwargs):
            user = await database_sync_to_async(self.get_object)(pk=pk)
            user.username = username
            await database_sync_to_async(user.save)()
            return {'pk': pk}, 200

    assert not await database_sync_to_async(get_user_model().objects.all().exists)()
github hishnash / djangochannelsrestframework / tests / test_generic_consumer.py View on Github external
async def test_delete_mixin_consumer():

    class UserSerializer(serializers.ModelSerializer):
        class Meta:
            model = get_user_model()
            fields = ('id', 'username', 'email',)

    class AConsumer(DeleteModelMixin, GenericAsyncAPIConsumer):
        queryset = get_user_model().objects.all()
        serializer_class = UserSerializer

    assert not get_user_model().objects.all().exists()

    # Test a normal connection
    communicator = WebsocketCommunicator(AConsumer, "/testws/")
    connected, _ = await communicator.connect()
    assert connected

    await communicator.send_json_to(
        {
            "action": "delete",
            "pk": 100,
            "request_id": 1
        }
github hishnash / djangochannelsrestframework / tests / test_model_observer.py View on Github external
async def test_two_observer_model_instance_mixins(settings):
    settings.CHANNEL_LAYERS={
        "default": {
            "BACKEND": "channels.layers.InMemoryChannelLayer",
            "TEST_CONFIG": {
                "expiry": 100500,
            },
        },
    }

    layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)

    class TestUserConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):

        queryset = get_user_model().objects.all()
        serializer_class = UserSerializer

        async def accept(self, subprotocol=None):
            await super().accept()

        @action()
        async def update_username(self, pk=None, username=None, **kwargs):
            user = await database_sync_to_async(self.get_object)(pk=pk)
            user.username = username
            await database_sync_to_async(user.save)()
            return {'pk': pk}, 200

    class TestOtherConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):