Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class ChannelAccessRefused(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "
"which it has no access due to security settings: %r"
)
class ChannelNotFoundEntity(ChannelClosed):
reason = (
"The client attempted to work with a server "
"entity that does not exist: %r"
)
class ChannelLockedResource(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "
"which it has no access because another client is working "
"with it: %r"
)
class ChannelPreconditionFailed(ChannelClosed):
reason = (
"The client requested a method that was not allowed because "
"some precondition failed: %r"
)
class DuplicateConsumerTag(ChannelClosed):
reason = "The consumer tag specified already exists for this channel: %s"
class AMQPChannelError(AMQPError):
reason = "An unspecified AMQP channel error has occurred"
class ChannelClosed(AMQPChannelError):
reason = "The channel was closed (%s) %s"
class ChannelAccessRefused(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "
"which it has no access due to security settings: %r"
)
class ChannelNotFoundEntity(ChannelClosed):
reason = (
"The client attempted to work with a server "
"entity that does not exist: %r"
)
class ChannelLockedResource(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "
"which it has no access because another client is working "
"with it: %r"
)
class ChannelPreconditionFailed(ChannelClosed):
reason = (
def __exception_by_code(frame: spec.Channel.Close): # pragma: nocover
if frame.reply_code == 403:
return exc.ChannelAccessRefused(frame.reply_text)
elif frame.reply_code == 404:
return exc.ChannelNotFoundEntity(frame.reply_text)
elif frame.reply_code == 405:
return exc.ChannelLockedResource(frame.reply_text)
elif frame.reply_code == 406:
return exc.ChannelPreconditionFailed(frame.reply_text)
else:
return exc.ChannelClosed(frame.reply_code, frame.reply_text)
class ChannelNotFoundEntity(ChannelClosed):
reason = (
"The client attempted to work with a server "
"entity that does not exist: %r"
)
class ChannelLockedResource(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "
"which it has no access because another client is working "
"with it: %r"
)
class ChannelPreconditionFailed(ChannelClosed):
reason = (
"The client requested a method that was not allowed because "
"some precondition failed: %r"
)
class DuplicateConsumerTag(ChannelClosed):
reason = "The consumer tag specified already exists for this channel: %s"
class ProtocolSyntaxError(AMQPError):
reason = "An unspecified protocol syntax error occurred"
class InvalidFrameError(ProtocolSyntaxError):
reason = "Invalid frame received: %r"
class ChannelLockedResource(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "
"which it has no access because another client is working "
"with it: %r"
)
class ChannelPreconditionFailed(ChannelClosed):
reason = (
"The client requested a method that was not allowed because "
"some precondition failed: %r"
)
class DuplicateConsumerTag(ChannelClosed):
reason = "The consumer tag specified already exists for this channel: %s"
class ProtocolSyntaxError(AMQPError):
reason = "An unspecified protocol syntax error occurred"
class InvalidFrameError(ProtocolSyntaxError):
reason = "Invalid frame received: %r"
class MethodNotImplemented(AMQPError):
pass
class DeliveryError(AMQPError):
reason = (
" The server could not complete the method because of an "
"internal error. The server may require intervention by an "
"operator in order to resume normal operations: %r"
)
class AMQPChannelError(AMQPError):
reason = "An unspecified AMQP channel error has occurred"
class ChannelClosed(AMQPChannelError):
reason = "The channel was closed (%s) %s"
class ChannelAccessRefused(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "
"which it has no access due to security settings: %r"
)
class ChannelNotFoundEntity(ChannelClosed):
reason = (
"The client attempted to work with a server "
"entity that does not exist: %r"
)
class ChannelLockedResource(ChannelClosed):
reason = (
"The client attempted to work with a server entity to "