Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_autodiscover_cache(self, m):
# Empty the cache
from exchangelib.autodiscover import _autodiscover_cache
_autodiscover_cache.clear()
cache_key = (self.account.domain, self.account.protocol.credentials)
# Not cached
self.assertNotIn(cache_key, _autodiscover_cache)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
# Now it's cached
self.assertIn(cache_key, _autodiscover_cache)
# Make sure the cache can be looked by value, not by id(). This is important for multi-threading/processing
self.assertIn((
self.account.primary_smtp_address.split('@')[1],
Credentials(self.account.protocol.credentials.username, self.account.protocol.credentials.password),
True
), _autodiscover_cache)
# Poison the cache. discover() must survive and rebuild the cache
_autodiscover_cache[cache_key] = AutodiscoverProtocol(config=Configuration(
service_endpoint='https://example.com/blackhole.asmx',
credentials=Credentials('leet_user', 'cannaguess'),
auth_type=NTLM,
retry_policy=FailFast(),
))
m.post('https://example.com/blackhole.asmx', status_code=404)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
self.assertIn(cache_key, _autodiscover_cache)
# Make sure that the cache is actually used on the second call to discover()
_orig = exchangelib.autodiscover._try_autodiscover
cache_key = (self.account.domain, self.account.protocol.credentials)
# Not cached
self.assertNotIn(cache_key, _autodiscover_cache)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
# Now it's cached
self.assertIn(cache_key, _autodiscover_cache)
# Make sure the cache can be looked by value, not by id(). This is important for multi-threading/processing
self.assertIn((
self.account.primary_smtp_address.split('@')[1],
Credentials(self.account.protocol.credentials.username, self.account.protocol.credentials.password),
True
), _autodiscover_cache)
# Poison the cache. discover() must survive and rebuild the cache
_autodiscover_cache[cache_key] = AutodiscoverProtocol(config=Configuration(
service_endpoint='https://example.com/blackhole.asmx',
credentials=Credentials('leet_user', 'cannaguess'),
auth_type=NTLM,
retry_policy=FailFast(),
))
m.post('https://example.com/blackhole.asmx', status_code=404)
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
self.assertIn(cache_key, _autodiscover_cache)
# Make sure that the cache is actually used on the second call to discover()
_orig = exchangelib.autodiscover._try_autodiscover
def _mock(*args, **kwargs):
raise NotImplementedError()
exchangelib.autodiscover._try_autodiscover = _mock
discover(email=self.account.primary_smtp_address, credentials=self.account.protocol.credentials)
# Fake that another thread added the cache entry into the persistent storage but we don't have it in our
# in-memory cache. The cache should work anyway.
def test_equality(self):
self.assertEqual(Credentials('a', 'b'), Credentials('a', 'b'))
self.assertNotEqual(Credentials('a', 'b'), Credentials('a', 'a'))
self.assertNotEqual(Credentials('a', 'b'), Credentials('b', 'b'))
def test_type(self):
self.assertEqual(Credentials('a', 'b').type, Credentials.UPN)
self.assertEqual(Credentials('a@example.com', 'b').type, Credentials.EMAIL)
self.assertEqual(Credentials('a\\n', 'b').type, Credentials.DOMAIN)
def test_magic(self):
config = Configuration(
server='example.com',
credentials=Credentials('foo', 'bar'),
auth_type=NTLM,
version=Version(build=Build(15, 1, 2, 3), api_version='foo'),
)
# Just test that these work
str(config)
repr(config)
def test_decrease_poolsize(self):
protocol = Protocol(config=Configuration(
service_endpoint='https://example.com/Foo.asmx', credentials=Credentials('A', 'B'),
auth_type=NTLM, version=Version(Build(15, 1)), retry_policy=FailFast()
))
self.assertEqual(protocol._session_pool.qsize(), Protocol.SESSION_POOLSIZE)
protocol.decrease_poolsize()
self.assertEqual(protocol._session_pool.qsize(), 3)
protocol.decrease_poolsize()
self.assertEqual(protocol._session_pool.qsize(), 2)
protocol.decrease_poolsize()
self.assertEqual(protocol._session_pool.qsize(), 1)
with self.assertRaises(SessionPoolMinSizeReached):
protocol.decrease_poolsize()
self.assertEqual(protocol._session_pool.qsize(), 1)
def test_hash(self):
# Test that we can use credentials as a dict key
self.assertEqual(hash(Credentials('a', 'b')), hash(Credentials('a', 'b')))
self.assertNotEqual(hash(Credentials('a', 'b')), hash(Credentials('a', 'a')))
self.assertNotEqual(hash(Credentials('a', 'b')), hash(Credentials('b', 'b')))
def fetch_free_busy(date, tz, uid):
acc = current_app.config['EXCHANGE_PROVIDER_ACCOUNT']
creds = current_app.config['EXCHANGE_PROVIDER_CREDENTIALS']
server = current_app.config['EXCHANGE_PROVIDER_SERVER']
domain = current_app.config['EXCHANGE_DOMAIN']
if not creds or not server or not domain:
raise RuntimeError('Exchange provider not configured!')
credentials = Credentials(*creds)
configuration = Configuration(
server=server, auth_type=NTLM, credentials=credentials
)
uid_account = Account(acc, config=configuration, autodiscover=False)
accounts = [
(uid_account, 'Organizer', False),
(
Account(f'{uid}@{domain}', autodiscover=False, config=configuration),
'Optional',
False,
),
]
if tz in NON_STANDARD_TZS:
tz = NON_STANDARD_TZS[tz]
def connect_to_account(self, primary_smtp_address, impersonation=False):
"""Connect to specified account and return it"""
# Don't check certificates
if not self.verify_cert:
BaseProtocol.HTTP_ADAPTER_CLS = NoVerifyHTTPAdapter
# Decide whether or not to use impersonation access
access_type = IMPERSONATION if impersonation else DELEGATE
# Use credentials to get account
try:
credentials = Credentials(username=self.username, password=self.password)
config = Configuration(server=self.server, credentials=credentials)
account = Account(primary_smtp_address=primary_smtp_address, config=config,
autodiscover=self.verify_cert, access_type=access_type)
except ErrorNonExistentMailbox:
raise NoMailboxError(primary_smtp_address)
except ConnectionError:
raise ServerConnectionError(self.server)
except UnauthorizedError:
raise CredentialsError()
except ErrorImpersonateUserDenied:
raise ImpersonationError(self.username, primary_smtp_address)
return account
return None, credentials
else:
if 'outlook.office365.com' in EWS_SERVER.lower():
if not AUTH_METHOD_STR:
AUTH_METHOD_STR = 'Basic'
VERSION_STR = '2016'
else:
if MANUAL_USERNAME:
USERNAME = MANUAL_USERNAME
if not AUTH_METHOD_STR:
AUTH_METHOD_STR = 'ntlm'
if not VERSION_STR:
return_error('Exchange Server Version is required for on-premise Exchange Servers.')
version = get_version(VERSION_STR)
credentials = Credentials(username=USERNAME, password=PASSWORD)
config_args = {
'credentials': credentials,
'auth_type': get_auth_method(AUTH_METHOD_STR),
'version': version
}
if not EWS_SERVER:
return_error("Exchange Server Hostname or IP Address is required for manual configuration.")
elif 'http' in EWS_SERVER.lower():
config_args['service_endpoint'] = EWS_SERVER
else:
config_args['server'] = EWS_SERVER
return Configuration(**config_args), None