Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import asyncio
import functools
import logging
import pathlib
import shutil
import socket
import ssl
import nose
import trustme
import aioftp
ca = trustme.CA()
server_cert = ca.issue_server_cert("127.0.0.1", "::1")
ssl_server = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server_cert.configure_cert(ssl_server)
ssl_client = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ca.configure_trust(ssl_client)
PORT = 8888
@nose.tools.nottest
def aioftp_setup(*, server_args=([], {}), client_args=([], {})):
def decorator(f):
def CA():
yield trustme.CA()
import os
import pytest
import subprocess
import ssl
import time
import trustme
import bmemcached
import test_simple_functions
ca = trustme.CA()
server_cert = ca.issue_cert(os.environ["MEMCACHED_HOST"] + u"")
@pytest.yield_fixture(scope="module", autouse=True)
def memcached_tls():
key = server_cert.private_key_pem
cert = server_cert.cert_chain_pems[0]
with cert.tempfile() as c, key.tempfile() as k:
p = subprocess.Popen(
[
"memcached",
"-p5001",
"-Z",
"-o",
"ssl_key={}".format(k),
def setup_class(cls):
cls.tmpdir = tempfile.mkdtemp("certs")
ca = trustme.CA()
# only common name, no subject alternative names
server_cert = ca.issue_cert(common_name=u"localhost")
cls.ca_certs = os.path.join(cls.tmpdir, "ca.pem")
cls.server_cert_path = os.path.join(cls.tmpdir, "server.pem")
cls.server_key_path = os.path.join(cls.tmpdir, "server.key")
ca.cert_pem.write_to_path(cls.ca_certs)
server_cert.private_key_pem.write_to_path(cls.server_key_path)
server_cert.cert_chain_pems[0].write_to_path(cls.server_cert_path)
cls.certs = {"keyfile": cls.server_key_path, "certfile": cls.server_cert_path}
super(TestHTTPS_NoSAN, cls)._start_server()
"--quiet",
action="store_true",
help="Doesn't print out helpful information for humans",
)
args = parser.parse_args(argv or sys.argv[1:])
if len(args.identities) < 1:
raise ValueError("Must include at least one identity")
cert_dir = pathlib.Path(args.dir)
if not cert_dir.is_dir():
raise ValueError(f"--dir={cert_dir} is not a directory")
common_name = args.common_name[0] if args.common_name else None
# Generate the CA certificate
trustme._KEY_SIZE = args.key_size
ca = trustme.CA()
cert = ca.issue_cert(*args.identities, common_name=common_name)
# Write the certificate and private key the server should use
server_key = cert_dir / "server.key"
server_cert = cert_dir / "server.pem"
cert.private_key_pem.write_to_path(path=str(server_key))
with server_cert.open(mode="w") as f:
f.truncate()
for blob in cert.cert_chain_pems:
blob.write_to_path(path=str(server_cert), append=True)
# Write the certificate the client should trust
client_cert = cert_dir / "client.pem"
ca.cert_pem.write_to_path(path=str(client_cert))
if not args.quiet:
"-q",
"--quiet",
action="store_true",
help="Doesn't print out helpful information for humans",
)
args = parser.parse_args(argv or sys.argv[1:])
if len(args.identities) < 1:
raise ValueError("Must include at least one identity")
cert_dir = pathlib.Path(args.dir)
if not cert_dir.is_dir():
raise ValueError(f"--dir={cert_dir} is not a directory")
common_name = args.common_name[0] if args.common_name else None
# Generate the CA certificate
trustme._KEY_SIZE = args.key_size
ca = trustme.CA()
cert = ca.issue_cert(*args.identities, common_name=common_name)
# Write the certificate and private key the server should use
server_key = cert_dir / "server.key"
server_cert = cert_dir / "server.pem"
cert.private_key_pem.write_to_path(path=str(server_key))
with server_cert.open(mode="w") as f:
f.truncate()
for blob in cert.cert_chain_pems:
blob.write_to_path(path=str(server_cert), append=True)
# Write the certificate the client should trust
client_cert = cert_dir / "client.pem"
ca.cert_pem.write_to_path(path=str(client_cert))