Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
AsyncIOEventEmitter,
EventEmitter
])
@pytest.mark.asyncio
async def test_asyncio_emit(cls, event_loop):
"""Test that asyncio-supporting event emitters can handle wrapping
coroutines
"""
ee = cls(loop=event_loop)
should_call = Future(loop=event_loop)
@ee.on('event')
async def event_handler():
should_call.set_result(True)
from ._compat import asynccontextmanager
@asynccontextmanager
async def get_client(app: typing.Callable) -> typing.AsyncIterator:
async with LifespanManager(app):
async with httpx.AsyncClient(app=app, base_url="http://testserver/") as client:
yield client
def omit_none(dct: dict) -> dict:
return {key: value for key, value in dct.items() if value is not None}
PubSub = AsyncIOEventEmitter
pubsub = PubSub() # pylint: disable=invalid-name
class Dog(typing.NamedTuple):
id: int
name: str
nickname: typing.Optional[str] = None
"""
Retrieve the ICE parameters of the ICE gatherer.
:rtype: RTCIceParameters
"""
return RTCIceParameters(
usernameFragment=self._connection.local_username,
password=self._connection.local_password,
)
def __setState(self, state: str) -> None:
self.__state = state
self.emit("statechange")
class RTCIceTransport(AsyncIOEventEmitter):
"""
The :class:`RTCIceTransport` interface allows an application access to
information about the Interactive Connectivity Establishment (ICE)
transport over which packets are sent and received.
:param gatherer: An :class:`RTCIceGatherer`.
"""
def __init__(self, gatherer: RTCIceGatherer) -> None:
super().__init__()
self.__start = None # type: Optional[asyncio.Event]
self.__iceGatherer = gatherer
self.__state = "new"
self._connection = gatherer._connection
@property
parsed["port"] = int(parsed["port"])
elif parsed["scheme"] in ["stuns", "turns"]:
parsed["port"] = 5349
else:
parsed["port"] = 3478
# set transport
if parsed["scheme"] == "turn" and not parsed["transport"]:
parsed["transport"] = "udp"
elif parsed["scheme"] == "turns" and not parsed["transport"]:
parsed["transport"] = "tcp"
return parsed
class RTCIceGatherer(AsyncIOEventEmitter):
"""
The :class:`RTCIceGatherer` interface gathers local host, server reflexive
and relay candidates, as well as enabling the retrieval of local
Interactive Connectivity Establishment (ICE) parameters which can be
exchanged in signaling.
"""
def __init__(self, iceServers: Optional[List[RTCIceServer]] = None) -> None:
super().__init__()
if iceServers is None:
iceServers = self.getDefaultIceServers()
ice_kwargs = connection_kwargs(iceServers)
self._connection = Connection(ice_controlling=False, **ice_kwargs)
self.__state = "new"
"The name of the subprotocol in use."
negotiated = attr.ib(default=False) # type: bool
"""
Whether data channel will be negotiated out of-band, where both sides
create data channel with an agreed-upon ID."""
id = attr.ib(default=None) # type: Optional[int]
"""
An numeric ID for the channel; permitted values are 0-65534.
If you don't include this option, the user agent will select an ID for you.
Must be set when negotiating out-of-band.
"""
class RTCDataChannel(AsyncIOEventEmitter):
"""
The :class:`RTCDataChannel` interface represents a network channel which
can be used for bidirectional peer-to-peer transfers of arbitrary data.
:param transport: An :class:`RTCSctpTransport`.
:param parameters: An :class:`RTCDataChannelParameters`.
"""
def __init__(
self, transport, parameters: RTCDataChannelParameters, send_open: bool = True
) -> None:
super().__init__()
self.__bufferedAmount = 0
self.__bufferedAmountLowThreshold = 0
self.__id = parameters.id
self.__parameters = parameters
@attr.s
class RTCSctpCapabilities:
"""
The :class:`RTCSctpCapabilities` dictionary provides information about the
capabilities of the :class:`RTCSctpTransport`.
"""
maxMessageSize = attr.ib()
"""
The maximum size of data that the implementation can send or
0 if the implementation can handle messages of any size.
"""
class RTCSctpTransport(AsyncIOEventEmitter):
"""
The :class:`RTCSctpTransport` interface includes information relating to
Stream Control Transmission Protocol (SCTP) transport.
:param transport: An :class:`RTCDtlsTransport`.
"""
def __init__(self, transport: RTCDtlsTransport, port: int = 5000) -> None:
if transport.state == "closed":
raise InvalidStateError
super().__init__()
self._association_state = self.State.CLOSED
self.__log_debug = lambda *args: None # type: Callable[..., None]
self.__started = False
self.__state = "new"
VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)
def convert_timebase(
pts: int, from_base: fractions.Fraction, to_base: fractions.Fraction
) -> int:
if from_base != to_base:
pts = int(pts * from_base / to_base)
return pts
class MediaStreamError(Exception):
pass
class MediaStreamTrack(AsyncIOEventEmitter, metaclass=ABCMeta):
"""
A single media track within a stream.
"""
kind = "unknown"
def __init__(self) -> None:
super().__init__()
self.__ended = False
self.__id = str(uuid.uuid4())
@property
def id(self) -> str:
"""
An automatically generated globally unique ID.
"""
elif direction == "recvonly":
return "sendonly"
return direction
def wrap_session_description(
session_description: Optional[sdp.SessionDescription],
) -> Optional[RTCSessionDescription]:
if session_description is not None:
return RTCSessionDescription(
sdp=str(session_description), type=session_description.type
)
return None
class RTCPeerConnection(AsyncIOEventEmitter):
"""
The :class:`RTCPeerConnection` interface represents a WebRTC connection
between the local computer and a remote peer.
:param configuration: An optional :class:`RTCConfiguration`.
"""
def __init__(self, configuration: Optional[RTCConfiguration] = None) -> None:
super().__init__()
self.__certificates = [RTCCertificate.generateCertificate()]
self.__cname = "{%s}" % uuid.uuid4()
self.__configuration = configuration or RTCConfiguration()
self.__iceTransports = set() # type: Set[RTCIceTransport]
self.__initialOfferer = None # type: Optional[bool]
self.__remoteDtls = (
{}
def unregister_receiver(self, receiver) -> None:
self.receivers.discard(receiver)
self.__discard(self.mid_table, receiver)
self.__discard(self.ssrc_table, receiver)
self.__discard(self.payload_type_table, receiver)
def unregister_sender(self, sender) -> None:
self.__discard(self.senders, sender)
def __discard(self, d: Dict, value: Any) -> None:
for k, v in list(d.items()):
if v == value:
d.pop(k)
class RTCDtlsTransport(AsyncIOEventEmitter):
"""
The :class:`RTCDtlsTransport` object includes information relating to
Datagram Transport Layer Security (DTLS) transport.
:param transport: An :class:`RTCIceTransport`.
:param certificates: A list of :class:`RTCCertificate` (only one is allowed currently).
"""
def __init__(
self, transport: RTCIceTransport, certificates: List[RTCCertificate]
) -> None:
assert len(certificates) == 1
certificate = certificates[0]
super().__init__()
self.encrypted = False