Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_run_with_consume_later_error(self):
# consumer with ConsumeLater error at first call
def _consumer(message, **kwargs):
_consumer._call_count = getattr(_consumer, '_call_count', 0) + 1
if _consumer._call_count == 1:
raise ConsumeLater()
Channel('test').send({'test': 'test'}, immediately=True)
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
channel_layer.router.add_route(route('test', _consumer))
old_send = channel_layer.send
channel_layer.send = mock.Mock(side_effect=old_send) # proxy 'send' for counting
worker = PatchedWorker(channel_layer)
worker.termed = 2 # first loop with error, second with sending
worker.run()
self.assertEqual(getattr(_consumer, '_call_count', None), 2)
self.assertEqual(channel_layer.send.call_count, 1)
def test_channel_full(self):
"""
Tests that when channel capacity is hit when processing due messages,
message is requeued instead of dropped
"""
for i in range(10):
Channel('asgi.delay').send({
'channel': 'test',
'delay': 1000,
'content': {'test': 'value'}
}, immediately=True)
worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER])
worker.termed = 10
worker.run()
for i in range(1):
Channel('asgi.delay').send({
'channel': 'test',
'delay': 1000,
'content': {'test': 'value'}
}, immediately=True)
worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER])
worker.termed = 1
worker.run()
self.assertEqual(DelayedMessage.objects.count(), 11)
async def test_model_observer_custom_groups_wrapper_with_split_function_api(settings):
settings.CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
"TEST_CONFIG": {"expiry": 100500,},
},
}
layer = channel_layers.make_test_backend(DEFAULT_CHANNEL_LAYER)
class TestConsumer(AsyncAPIConsumer):
async def accept(self, **kwargs):
await self.user_change.subscribe(username="test")
await super().accept()
@model_observer(get_user_model())
async def user_change(self, message, **kwargs):
await self.send_json(message)
@user_change.groups_for_signal
def user_change(self, instance=None, **kwargs):
yield "-instance-username-{}".format(instance.username)
@user_change.groups_for_consumer
def user_change(self, username=None, **kwargs):
def send(self):
layer = getattr(settings, 'CQ_CHANNEL_LAYER', DEFAULT_CHANNEL_LAYER)
logger.debug('Sending CQ message on "{}" layer.'.format(layer))
try:
Channel('cq-tasks', alias=layer).send({
'task_id': str(self.id),
}, immediately=True)
except RedisChannelLayer.ChannelFull:
with cache.lock(str(self.id), timeout=2):
self.status = self.STATUS_RETRY
self.save(update_fields=('status',))
def get_context(self):
consumers = []
for channels, consumer, filters, prefix in get_routes(DEFAULT_CHANNEL_LAYER):
if any((in_debug(channel) for channel in channels)):
name = name_that_thing(consumer)
if name in 'channels.routing.null_consumer' or is_no_debug(consumer):
continue
consumers.append({
'name': name,
'channels': channels,
'prefix': filters_to_string(prefix),
'filters': filters_to_string(filters),
'group': get_consumer_group(name),
})
return {
'consumers': consumers,
'profile': get_setting_value('PROFILE_CONSUMERS'),
}
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument(
'--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER,
help='Channel layer alias to use, if not the default.',
)
parser.add_argument(
'--sleep', action='store', dest='sleep', default=1, type=float,
help='Amount of time to sleep between checks, in seconds.',
)
def handle(self, *args, **options):
self.verbosity = options.get("verbosity", 1)
self.logger = setup_logger('django.channels', self.verbosity)
self.channel_layer = channel_layers[options.get("layer", DEFAULT_CHANNEL_LAYER)]
# Check that handler isn't inmemory
if self.channel_layer.local_only():
raise CommandError(
"You cannot span multiple processes with the in-memory layer. " +
"Change your settings to use a cross-process channel layer."
)
self.options = options
self.logger.info("Running delay against channel layer %s", self.channel_layer)
try:
worker = Worker(
channel_layer=self.channel_layer,
database_sleep_duration=options['sleep'],
)
worker.run()
except KeyboardInterrupt:
pass
def send(self, channel_layer=None, requeue_delay=1000):
"""
Sends the message on the configured channel with the stored content.
Deletes the DelayedMessage record if successfully sent.
Args:
channel_layer: optional channel_layer to use
requeue_delay: if the channel is full, milliseconds to wait before requeue
"""
channel_layer = channel_layer or channel_layers[DEFAULT_CHANNEL_LAYER]
try:
Channel(self.channel_name, channel_layer=channel_layer).send(json.loads(self.content), immediately=True)
self.delete()
except channel_layer.ChannelFull:
self.delay = requeue_delay
self.save()
def ready(self):
if not apps.is_installed('debug_toolbar'):
return
# patch routes: adding debug routes to default layer
for route in routes.debug_channel_routes:
channel_layers[DEFAULT_CHANNEL_LAYER].router.add_route(route)
# patch layers: substitution by debug layer with events monitoring
for alias in getattr(settings, "CHANNEL_LAYERS", {}).keys():
new_backend = layer_factory(channel_layers[alias], alias)
channel_layers.set(alias, new_backend)
# patch routes: wrap routes debug decorator
for alias in getattr(settings, "CHANNEL_LAYERS", {}).keys():
_match = channel_layers[alias].router.root.match
def new_match(message):
if in_debug(message.channel.name):
m = _match(message)
if m and not is_no_debug(m[0]):
return debug_decorator(m[0], alias), m[1]
return _match(message)
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument(
"--layer",
action="store",
dest="layer",
default=DEFAULT_CHANNEL_LAYER,
help="Channel layer alias to use, if not the default.",
)
parser.add_argument("channels", nargs="+", help="Channels to listen on.")