Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param username: The name of the shared access policy.
:type username: str
:param password: The shared access key.
:type password: str
"""
http_proxy = self.config.http_proxy
transport_type = self.config.transport_type
auth_timeout = self.config.auth_timeout
# TODO: the following code can be refactored to create auth from classes directly instead of using if-else
if isinstance(self.credential, EventHubSharedKeyCredential):
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.host, username, password, http_proxy=http_proxy, transport_type=transport_type)
return authentication.SASTokenAuth.from_shared_access_key(
self.auth_uri, username, password, timeout=auth_timeout, http_proxy=http_proxy,
transport_type=transport_type)
elif isinstance(self.credential, EventHubSASTokenCredential):
token = self.credential.get_sas_token()
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAuth(
self.auth_uri, self.auth_uri, token,
expires_at=expiry,
timeout=auth_timeout,
http_proxy=http_proxy,
if self.sas_token:
token = self.sas_token() if callable(self.sas_token) else self.sas_token
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAsync(
self.auth_uri, self.auth_uri, token,
expires_at=expiry,
timeout=self.auth_timeout,
http_proxy=self.http_proxy)
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=self.http_proxy)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy)
"""
Create an ~uamqp.authentication.SASTokenAuth instance to authenticate
the session.
:param username: The name of the shared access policy.
:type username: str
:param password: The shared access key.
:type password: str
"""
if self.sas_token:
token = self.sas_token() if callable(self.sas_token) else self.sas_token
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAuth(
self.auth_uri, self.auth_uri, token,
expires_at=expiry,
timeout=self.auth_timeout,
http_proxy=self.http_proxy)
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=self.http_proxy)
return authentication.SASTokenAuth.from_shared_access_key(
self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy)
if self.sas_token:
token = self.sas_token() if callable(self.sas_token) else self.sas_token
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAsync(
self.auth_uri, self.auth_uri, token,
expires_at=expiry,
timeout=self.auth_timeout,
http_proxy=self.http_proxy)
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=self.http_proxy)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy)