Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_start_tls_with_cipher(self):
# ciphers = None
# ciphers = '!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2'
ciphers = 'HIGH:!aNULL:!RC4:!DSS'
if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
if isinstance(test_server, (list, tuple)):
server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
for host in test_server:
server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
else:
server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE, ciphers=ciphers), get_info=test_get_info, mode=test_server_mode)
connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
connection.open()
connection.start_tls()
self.assertFalse(connection.closed)
# self.assertEqual(connection.socket.cipher(), ciphers)
connection.unbind()
if connection.strategy.pooled:
connection.strategy.terminate()
def setUp(self):
global testcase_id
testcase_id = random_id()
# The mock server can be defined in two different ways, so tests are duplicated, connection_3 is without schema
schema = SchemaInfo.from_json(edir_9_1_4_schema)
info = DsaInfo.from_json(edir_9_1_4_dsa_info, schema)
server_1 = Server.from_definition('MockSyncServer', info, schema)
self.connection_1 = Connection(server_1, user='cn=user1,ou=test,o=lab', password='test1111', client_strategy=MOCK_SYNC)
self.connection_1b = Connection(server_1, user='cn=user1,ou=test,o=lab', password='test1111', client_strategy=MOCK_SYNC)
self.connection_1c = Connection(server_1, user='cn=user1,ou=test,o=lab', password='test1111', client_strategy=MOCK_SYNC, raise_exceptions=True)
server_2 = Server('dummy', get_info=OFFLINE_EDIR_9_1_4)
self.connection_2 = Connection(server_2, user='cn=user2,ou=test,o=lab', password='test2222', client_strategy=MOCK_SYNC)
self.connection_2b = Connection(server_2, user='cn=user2,ou=test,o=lab', password='test2222', client_strategy=MOCK_SYNC)
self.connection_2c = Connection(server_2, user='cn=user2,ou=test,o=lab', password='test2222', client_strategy=MOCK_SYNC, raise_exceptions=True)
server_3 = Server('dummy') # no schema
self.connection_3 = Connection(server_3, user='cn=user3,ou=test,o=lab', password='test3333', client_strategy=MOCK_SYNC)
self.connection_3b = Connection(server_3, user='cn=user3,ou=test,o=lab', password='test3333', client_strategy=MOCK_SYNC)
self.connection_3c = Connection(server_3, user='cn=user3,ou=test,o=lab', password='test3333', client_strategy=MOCK_SYNC, raise_exceptions=True)
# creates fixtures
self.connection_1.strategy.add_entry('cn=user0,o=lab', {'userPassword': 'test0000', 'sn': 'user0_sn', 'revision': 0, 'guid': '07039e68-4373-264d-a0a7-000000000000'})
self.connection_2.strategy.add_entry('cn=user0,o=lab', {'userPassword': 'test0000', 'sn': 'user0_sn', 'revision': 0, 'guid': '07039e68-4373-264d-a0a7-000000000000'})
self.connection_3.strategy.add_entry('cn=user0,o=lab', {'userPassword': 'test0000', 'sn': 'user0_sn', 'revision': 0, 'guid': '07039e68-4373-264d-a0a7-000000000000'})
self.connection_1.strategy.add_entry('cn=user1,ou=test,o=lab', {'userPassword': 'test1111', 'sn': 'user1_sn', 'revision': 1, 'guid': '07039e68-4373-264d-a0a7-111111111111'})
for host in ldap_host_list:
use_ssl = False
ldap_port = 386
# parse the url to extract scheme host and port
url_parsed = urllib.parse.urlparse(host)
if url_parsed.scheme == "ldaps":
use_ssl = True
ldap_port = 636
if ":" in url_parsed.netloc:
ldap_host, ldap_port = url_parsed.netloc.split(":")
else:
ldap_host = url_parsed.netloc
server = ldap3.Server(ldap_host,
port=int(ldap_port),
use_ssl=use_ssl)
servers_list.append(server)
last_auth_err = ""
for bind_dn in ldap_dn_list:
c = ldap3.Connection(servers_list,
user=bind_dn % login,
password=password)
# perform the Bind operation
auth_success = c.bind()
last_auth_err = c.result
if auth_success:
break
parser.add_argument("-a", "--additional", action='store_true', help="Add the SPN via the msDS-AdditionalDnsHostName attribute")
args = parser.parse_args()
#Prompt for password if not set
authentication = None
if args.user is not None:
authentication = NTLM
if not '\\' in args.user:
print_f('Username must include a domain, use: DOMAIN\\username')
sys.exit(1)
if args.password is None:
args.password = getpass.getpass()
controls = security_descriptor_control(sdflags=0x04)
# define the server and the connection
s = Server(args.host, get_info=ALL)
print_m('Connecting to host...')
c = Connection(s, user=args.user, password=args.password, authentication=authentication)
print_m('Binding to host')
# perform the Bind operation
if not c.bind():
print_f('Could not bind with specified credentials')
print_f(c.result)
sys.exit(1)
print_o('Bind OK')
if args.target:
targetuser = args.target
else:
targetuser = args.user.split('\\')[1]
if '.' in targetuser:
client_strategy_string = config["client_strategy"]
client_strategy_map = {
"SYNC": ldap3.SYNC,
"ASYNC": ldap3.ASYNC,
"LDIF": ldap3.LDIF,
"RESTARTABLE": ldap3.RESTARTABLE,
"REUSABLE": ldap3.REUSABLE,
"MOCK_SYNC": ldap3.MOCK_SYNC
}
client_strategy = client_strategy_map[client_strategy_string]
args = {'host': config["ldap_url"]}
if client_strategy == ldap3.MOCK_SYNC:
args['get_info'] = ldap3.OFFLINE_SLAPD_2_4
server = ldap3.Server(**args)
msg = "Creating a new LDAP connection"
logger.debug(msg)
msg = "Using LDAP URL {}".format(ldap_url)
logger.debug(msg)
msg = "Using bind DN {}".format(bind_dn)
logger.debug(msg)
auto_bind_string = config["auto_bind"]
auto_bind_map = {
"AUTO_BIND_NONE": ldap3.AUTO_BIND_NONE,
"AUTO_BIND_NO_TLS": ldap3.AUTO_BIND_NO_TLS,
"AUTO_BIND_TLS_AFTER_BIND": ldap3.AUTO_BIND_TLS_AFTER_BIND,
"AUTO_BIND_TLS_BEFORE_BIND": ldap3.AUTO_BIND_TLS_BEFORE_BIND,
app.config.setdefault('FORCE_ATTRIBUTE_VALUE_AS_LIST', False)
self.tls = Tls(
local_private_key_file=app.config['LDAP_CLIENT_PRIVATE_KEY'],
local_certificate_file=app.config['LDAP_CLIENT_CERT'],
validate=app.config['LDAP_REQUIRE_CERT'],
version=app.config['LDAP_TLS_VERSION'],
ca_certs_file=app.config['LDAP_CA_CERTS_FILE'],
valid_names=app.config['LDAP_VALID_NAMES'],
ca_certs_path=app.config['LDAP_CA_CERTS_PATH'],
ca_certs_data=app.config['LDAP_CA_CERTS_DATA'],
local_private_key_password=app.config['LDAP_PRIVATE_KEY_PASSWORD']
)
self.ldap_server = Server(
host=app.config['LDAP_SERVER'],
port=app.config['LDAP_PORT'],
use_ssl=app.config['LDAP_USE_SSL'],
connect_timeout=app.config['LDAP_CONNECT_TIMEOUT'],
tls=self.tls,
get_info=ALL
)
# Store ldap_conn object to extensions
app.extensions['ldap_conn'] = self
# Teardown appcontext
app.teardown_appcontext(self.teardown)
def __init__(self, ldap_server_host, ldap_admin_dn, ldap_admin_password, ldap_server_port=389, ldap_use_ssl=False):
self._ldap_admin_dn = ldap_admin_dn
self._ldap_admin_password = ldap_admin_password
# ldap_server_pool = ServerPool(["172.16.0.102",'172.16.0.103'])
self.ldap_server = Server(ldap_server_host, port=ldap_server_port, use_ssl=ldap_use_ssl)
port = 389
start_tls = False
if 'START_TLS' in settings and settings['START_TLS']:
start_tls = True
tls = None
if use_ssl or start_tls:
tls = ldap3.Tls()
if 'TLS_CA' in settings and settings['TLS_CA']:
tls.ca_certs_file = settings['TLS_CA']
if 'REQUIRE_TLS' in settings and settings['REQUIRE_TLS']:
tls.validate = ssl.CERT_REQUIRED
s = ldap3.Server(host, port=port, use_ssl=use_ssl, tls=tls)
c = self._connection_class(
s, # client_strategy=ldap3.STRATEGY_SYNC_RESTARTABLE,
user=user, password=password, authentication=ldap3.SIMPLE)
c.strategy.restartable_sleep_time = 0
c.strategy.restartable_tries = 1
c.raise_exceptions = True
c.open()
if start_tls:
c.start_tls()
try:
c.bind()
except: # noqa: E722
c.unbind()
def ldap_connection(host):
"""Context manager that provides an ldap3 Connection.
Example usage:
with ldap_connection('ldap.ocf.berkeley.edu') as c:
c.search(OCF_LDAP_PEOPLE, '(uid=ckuehl)', attributes=['uidNumber'])
You might find it more convenient to use the ldap_ocf or ldap_ucb functions
also defined.
:param host: server hostname
"""
server = ldap3.Server(host, use_ssl=True)
with ldap3.Connection(server) as connection:
yield connection
def create_user(self, request):
log.info("Create user body:{}".format(request.data))
data = json.loads(request.data)
server = Server('10.120.46.80')
conn = Connection(server, "cn=zgrossbart,ou=services,o=corp", "N0v3ll123")
conn.bind()
cn = data["fName"].replace(" ", "").lower() + data["sName"].lower()[0]
attrs = {}
attrs['cn'] = cn
attrs['sn'] = data["sName"]
attrs['userPassword'] = data["password"]
attrs['clDoB'] = data["birthDate"]
attrs['mail'] = data["email"]
attrs['postalCode'] = data["postalCode"]
attrs['fullName'] = data["fName"] + ' ' + data["sName"]
attrs['givenName'] = data["fName"]
attrs['description'] = data["description"]
dn = "cn=" + cn + ",ou=SA,ou=cx,o=corp"
result = conn.add(dn, ['User', 'clCustomer'], attrs)