Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@action()
def test_sync_action(self, pk=None, **kwargs):
user = self.get_object(pk=pk)
s = self.get_serializer(
action_kwargs={'pk': pk},
instance=user
)
return s.data, 200
@action()
def test_sync_action(self, pk=None, **kwargs):
results["test_sync_action"] = pk
return {"pk": pk, "sync": True}, 200
@action()
async def test_async_action(self, pk=None, **kwargs):
results["test_action"] = pk
return {"pk": pk}, 200
@action()
async def update_username(self, pk=None, name=None, **kwargs):
tm = await database_sync_to_async(self.get_object)(pk=pk)
tm.name = name
await database_sync_to_async(tm.save)()
return {'pk': pk}, 200
async def test_model_observer_custom_groups_wrapper(settings):
settings.CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
"TEST_CONFIG": {"expiry": 100500,},
},
}
layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)
class TestConsumer(AsyncAPIConsumer):
async def accept(self, **kwargs):
await self.user_change.subscribe(username="test")
await super().accept()
@model_observer(get_user_model())
async def user_change(self, message, **kwargs):
await self.send_json(message)
@user_change.groups
def user_change(self, instance=None, username=None, **kwargs):
if username:
yield "-instance-username-{}".format(slugify(username))
else:
yield "-instance-username-{}".format(instance.username)
communicator = WebsocketCommunicator(TestConsumer, "/testws/")
async def test_decorator():
results = {}
class AConsumer(AsyncAPIConsumer):
@action()
async def test_async_action(self, pk=None, **kwargs):
results["test_action"] = pk
return {"pk": pk}, 200
@action()
def test_sync_action(self, pk=None, **kwargs):
results["test_sync_action"] = pk
return {"pk": pk, "sync": True}, 200
# Test a normal connection
communicator = WebsocketCommunicator(AConsumer, "/testws/")
connected, _ = await communicator.connect()
assert connected
"TEST_CONFIG": {"expiry": 100500,},
},
}
layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)
class TestConsumer(AsyncAPIConsumer):
async def accept(self, **kwargs):
await self.user_change.subscribe()
await super().accept()
@model_observer(get_user_model())
async def user_change(self, message, **kwargs):
await self.send_json(message)
class TestConsumer2(AsyncAPIConsumer):
async def accept(self, **kwargs):
await self.user_other.subscribe()
await super().accept()
@model_observer(get_user_model())
async def user_other(self, message, **kwargs):
await self.send_json(message)
communicator1 = WebsocketCommunicator(TestConsumer, "/testws/")
connected, _ = await communicator1.connect()
assert connected
communicator2 = WebsocketCommunicator(TestConsumer2, "/testws/")
async def test_unsubscribe_observer_model_instance_mixin(settings):
settings.CHANNEL_LAYERS={
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
"TEST_CONFIG": {
"expiry": 100500,
},
},
}
layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)
class TestConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
queryset = get_user_model().objects.all()
serializer_class = UserSerializer
async def accept(self, subprotocol=None):
await super().accept()
@action()
async def update_username(self, pk=None, username=None, **kwargs):
user = await database_sync_to_async(self.get_object)(pk=pk)
user.username = username
await database_sync_to_async(user.save)()
return {'pk': pk}, 200
assert not await database_sync_to_async(get_user_model().objects.all().exists)()
async def test_delete_mixin_consumer():
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = get_user_model()
fields = ('id', 'username', 'email',)
class AConsumer(DeleteModelMixin, GenericAsyncAPIConsumer):
queryset = get_user_model().objects.all()
serializer_class = UserSerializer
assert not get_user_model().objects.all().exists()
# Test a normal connection
communicator = WebsocketCommunicator(AConsumer, "/testws/")
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to(
{
"action": "delete",
"pk": 100,
"request_id": 1
}
async def test_two_observer_model_instance_mixins(settings):
settings.CHANNEL_LAYERS={
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
"TEST_CONFIG": {
"expiry": 100500,
},
},
}
layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)
class TestUserConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
queryset = get_user_model().objects.all()
serializer_class = UserSerializer
async def accept(self, subprotocol=None):
await super().accept()
@action()
async def update_username(self, pk=None, username=None, **kwargs):
user = await database_sync_to_async(self.get_object)(pk=pk)
user.username = username
await database_sync_to_async(user.save)()
return {'pk': pk}, 200
class TestOtherConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):