Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _run_action(self, action, stream_id):
try:
action(stream_id)
except priority.MissingStreamError:
assert stream_id not in self.stream_ids
except priority.PseudoStreamError:
assert stream_id == 0
else:
assert stream_id in self.stream_ids
def test_priority_raises_good_errors_for_missing_streams(self):
"""
Attempting operations on absent streams raises a MissingStreamError.
"""
p = priority.PriorityTree()
p.insert_stream(1)
with pytest.raises(priority.MissingStreamError):
p.reprioritize(3)
with pytest.raises(priority.MissingStreamError):
p.block(3)
with pytest.raises(priority.MissingStreamError):
p.unblock(3)
with pytest.raises(priority.MissingStreamError):
p.remove_stream(3)
async def _priority_updated(self, event: h2.events.PriorityUpdated) -> None:
try:
self.priority.reprioritize(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
except priority.MissingStreamError:
# Received PRIORITY frame before HEADERS frame
self.priority.insert_stream(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
self.priority.block(event.stream_id)
await self.has_data.set()
def _handlePriorityUpdate(self, event):
"""
Internal handler for when a stream priority is updated.
@param event: The Hyper-h2 event that encodes information about the
stream reprioritization.
@type event: L{h2.events.PriorityUpdated}
"""
try:
self.priority.reprioritize(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
except priority.MissingStreamError:
# A PRIORITY frame arrived before the HEADERS frame that would
# trigger us to insert the stream into the tree. That's fine: we
# can create the stream here and mark it as blocked.
self.priority.insert_stream(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
self.priority.block(event.stream_id)
def _handlePriorityUpdate(self, event):
"""
Internal handler for when a stream priority is updated.
@param event: The Hyper-h2 event that encodes information about the
stream reprioritization.
@type event: L{h2.events.PriorityUpdated}
"""
try:
self.priority.reprioritize(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
except priority.MissingStreamError:
# A PRIORITY frame arrived before the HEADERS frame that would
# trigger us to insert the stream into the tree. That's fine: we
# can create the stream here and mark it as blocked.
self.priority.insert_stream(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
self.priority.block(event.stream_id)
def _handlePriorityUpdate(self, event):
"""
Internal handler for when a stream priority is updated.
@param event: The Hyper-h2 event that encodes information about the
stream reprioritization.
@type event: L{h2.events.PriorityUpdated}
"""
try:
self.priority.reprioritize(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
except priority.MissingStreamError:
# A PRIORITY frame arrived before the HEADERS frame that would
# trigger us to insert the stream into the tree. That's fine: we
# can create the stream here and mark it as blocked.
self.priority.insert_stream(
stream_id=event.stream_id,
depends_on=event.depends_on or None,
weight=event.weight,
exclusive=event.exclusive,
)
self.priority.block(event.stream_id)