Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_retrieve_mixin_consumer():
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = get_user_model()
fields = ('id', 'username', 'email',)
class AConsumer(RetrieveModelMixin, GenericAsyncAPIConsumer):
queryset = get_user_model().objects.all()
serializer_class = UserSerializer
assert not get_user_model().objects.all().exists()
# Test a normal connection
communicator = WebsocketCommunicator(AConsumer, "/testws/")
connected, _ = await communicator.connect()
assert connected
await communicator.send_json_to(
{
"action": "retrieve",
"pk": 100,
"request_id": 1
}
for base in bases:
for attr_name in dir(base):
attr = getattr(base, attr_name)
if isinstance(attr, _GenericModelObserver):
body[attr_name] = attr.bind_to_model(model_cls=queryset.model)
return super().__new__(mcs, name, bases, body)
class ObserverConsumerMixin(metaclass=ObserverAPIConsumerMetaclass):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.subscribed_requests = {} # type: Dict[function, str]
class ObserverModelInstanceMixin(ObserverConsumerMixin, RetrieveModelMixin):
@action()
async def subscribe_instance(self, request_id=None, **kwargs):
if request_id is None:
raise ValueError("request_id must have a value set")
# subscribe!
instance = await database_sync_to_async(self.get_object)(**kwargs)
await self.handle_instance_change.subscribe(instance=instance)
self.subscribed_requests[self.__class__.handle_instance_change] = request_id
return None, status.HTTP_201_CREATED
@action()
async def unsubscribe_instance(self, request_id=None, **kwargs):
if request_id is None:
raise ValueError("request_id must have a value set")
# subscribe!