Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@Subscription("Subscription.dogAdded")
async def on_dog_added(
parent: typing.Any, args: dict, ctx: dict, info: dict
) -> typing.AsyncIterator[dict]:
pubsub: PubSub = ctx["pubsub"]
queue: Queue = Queue()
@pubsub.on("dog_added")
def on_dog(dog: Dog) -> None:
queue.put(dog)
while True:
try:
dog = queue.get_nowait()
except Empty:
await asyncio.sleep(0.01)
continue
@Subscription("Subscription.newSearch", schema_name="test_issue139")
async def subscription_new_search(*_, **__):
for i in range(2):
yield {"name": f"Human #{i}"}
await asyncio.sleep(1)
@Subscription("Subscription.counter", schema_name="subscribe_counter")
async def _subscription_counter_subscription(
parent_result, args, *_args, **_kwargs
):
start_at = args["startAt"]
while start_at > 0:
await asyncio.sleep(0.01)
start_at -= 1
yield {"counter": start_at}
async def test_subscribe_non_subscription_field():
with pytest.raises(
NotSubscriptionField,
match="Field < Query.search > isn't a subscription field.",
):
async def subscription_query_search(*_, **__):
yield {}
Subscription(
"Query.search", schema_name="test_subscribe_non_subscription_field"
)(subscription_query_search)
await create_engine(
_SDL, schema_name="test_subscribe_non_subscription_field"
)
async def test_subscribe_non_async_generator_implementation():
with pytest.raises(
NonAsyncGeneratorSubscription,
match=r"The subscription < .* > given is not an awaitable generator.",
):
async def subscription_search(*_, **__):
return 1
Subscription(
"Query.search", schema_name="test_subscribe_non_subscription_field"
)(subscription_search)
await create_engine(
_SDL, schema_name="test_subscribe_non_subscription_field"
)
@Subscription("Subscription.counter", schema_name="subscribe_counter")
async def _subscription_counter_subscription(
parent_result, args, *_args, **_kwargs
):
start_at = args["startAt"]
while start_at > 0:
await asyncio.sleep(0.01)
start_at -= 1
yield start_at
async def ttftt_engine():
@Subscription("MySubscription.newSearch", schema_name="test_subscribe")
async def subscription_new_search(*_, **__):
for search in _SEARCHS:
yield {"newSearch": search}
await asyncio.sleep(0.01)
class MySubscriptionCustomSearchSubscriber:
async def __call__(self, *_, **__):
for search in _SEARCHS:
yield {"newSearch": search}
await asyncio.sleep(0.01)
Subscription("MySubscription.customSearch", schema_name="test_subscribe")(
MySubscriptionCustomSearchSubscriber()
)
@Resolver("MySubscription.customSearch", schema_name="test_subscribe")
async def resolver_subscription_custom_search(parent, args, ctx, info):
return [f"{search} #c" for search in parent["newSearch"]]
return await create_engine(sdl=_SDL, schema_name="test_subscribe")
@Subscription("Subscription.launchAndWaitCookingTimer")
async def subscribe_subscription_launch_and_wait_cooking_timer(
parent: Optional[Any],
args: Dict[str, Any],
ctx: Dict[str, Any],
info: "ResolveInfo",
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Subscription in charge of generating an event stream related to the cooking
of a recipe.
:param parent: initial value filled in to the engine `subscribe` method
:param args: computed arguments related to the subscription
:param ctx: context filled in at engine initialization
:param info: information related to the execution and field resolution
:type parent: Optional[Any]
:type args: Dict[str, Any]
:type ctx: Dict[str, Any]
@Subscription("Subscription.dogAdded")
async def subscription_dog_added_subscription(parent_result, args, ctx, info):
dog_added_pubsub = _REDIS.pubsub(ignore_subscribe_messages=True)
dog_added_pubsub.subscribe(DOG_ADDED_TOPIC)
while True:
message = dog_added_pubsub.get_message()
if message:
payload = json.loads(message["data"])
yield payload
await asyncio.sleep(0.001)
@Subscription("Subscription.dogUpdated")
async def subscription_dog_updated_subscription(
parent_result, args, ctx, info
):
dog_updated_pubsub = _REDIS.pubsub(ignore_subscribe_messages=True)
dog_updated_pubsub.subscribe(DOG_UPDATED_TOPIC)
while True:
message = dog_updated_pubsub.get_message()
# Returns message only if its related to the wanted dog
if message:
payload = json.loads(message["data"])
if payload["id"] == args["id"]:
yield payload
await asyncio.sleep(0.001)