Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_s3api_with_s3_token_no_pass_token_to_auth_token(self):
self.swift = FakeSwift()
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.auth_token = AuthProtocol(
self.keystone_auth, {'delay_auth_decision': 'True'})
self.s3_token = S3Token(
self.auth_token, {'auth_uri': 'https://fakehost/identity'})
self.s3api = S3Middleware(self.s3_token, CONF)
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS access:signature',
'Date': self.get_date_header()})
self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
swob.HTTPCreated, {}, None)
self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
swob.HTTPOk, {}, None)
with patch.object(self.s3_token, '_json_request') as mock_req:
with patch.object(self.auth_token,
'_do_fetch_token') as mock_fetch:
def test_s3api_with_s3_token_and_auth_token(self):
self.swift = FakeSwift()
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.auth_token = AuthProtocol(
self.keystone_auth, {'delay_auth_decision': 'True'})
self.s3_token = S3Token(
self.auth_token, {'auth_uri': 'https://fakehost/identity'})
self.s3api = S3Middleware(self.s3_token, CONF)
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS access:signature',
'Date': self.get_date_header()})
self.swift.register('PUT', '/v1/AUTH_TENANT_ID/bucket',
swob.HTTPCreated, {}, None)
self.swift.register('HEAD', '/v1/AUTH_TENANT_ID',
swob.HTTPOk, {}, None)
with patch.object(self.s3_token, '_json_request') as mock_req:
with patch.object(self.auth_token,
'_do_fetch_token') as mock_fetch:
app = audit_middleware.AuditMiddleware(
app,
audit_map_file=CONF.audit.audit_map_file,
ignore_req_list=CONF.audit.ignore_req_list
)
except (EnvironmentError, OSError,
audit_middleware.PycadfAuditApiConfigError) as e:
raise exception.InputFileError(
file_name=CONF.audit.audit_map_file,
reason=e
)
if CONF.auth_strategy == "keystone":
app = auth_public_routes.AuthPublicRoutes(
app,
auth=auth_token.AuthProtocol(
app, {"oslo_config_config": cfg.CONF}),
public_api_routes=pecan_config.app.acl_public_routes)
if CONF.profiler.enabled:
app = osprofiler_web.WsgiMiddleware(app)
# NOTE(pas-ha) this registers oslo_middleware.enable_proxy_headers_parsing
# option, when disabled (default) this is noop middleware
app = http_proxy_to_wsgi.HTTPProxyToWSGI(app, CONF)
# add in the healthcheck middleware if enabled
# NOTE(jroll) this is after the auth token middleware as we don't want auth
# in front of this, and WSGI works from the outside in. Requests to
# /healthcheck will be handled and returned before the auth middleware
# is reached.
if CONF.healthcheck.enabled:
from keystoneauth1 import loading
from keystoneauth1 import session
from keystonemiddleware.auth_token import AuthProtocol
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
from six.moves import http_client as httplib
from webob import exc
LOG = logging.getLogger(__name__)
CFG_GROUP = "service_credentials"
class BasicAndKeystoneAuth(AuthProtocol):
def __init__(self, application, conf):
super(BasicAndKeystoneAuth, self).__init__(application, conf)
self.application = application
self.oslo_conf = cfg.ConfigOpts()
self.oslo_conf([],
project='vitrage',
validate_default_values=True)
password_option = loading.get_auth_plugin_conf_options('password')
self.oslo_conf.register_opts(password_option, group=CFG_GROUP)
self.auth_url = self.oslo_conf.service_credentials.auth_url or ''
@property
def reject_auth_headers(self):
def auth_app(app):
app = request_id.RequestId(app)
if cfg.CONF.auth_strategy == 'noauth':
pass
elif cfg.CONF.auth_strategy == 'keystone':
# NOTE(zhiyuan) pkg_resources will try to load tricircle to get module
# version, passing "project" as empty string to bypass it
app = auth_token.AuthProtocol(app, {'project': ''})
else:
raise t_exceptions.InvalidConfigurationOption(
opt_name='auth_strategy', opt_value=cfg.CONF.auth_strategy)
return app
def __init__(self):
# hardcode any non-default configuration here
conf = {'auth_protocol': 'http', 'admin_token': 'ADMIN'}
app = auth_token.AuthProtocol(echo_app, conf)
server = simple_server.make_server('', 8000, app)
print('Serving on port 8000 (Ctrl+C to end)...')
server.serve_forever()