Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
enabled = overrides[:] or (config("harden") or "").split()
if enabled:
modules_to_run = []
# modules will always be performed in the following order
for module, func in six.iteritems(RUN_CATALOG):
if module in enabled:
enabled.remove(module)
modules_to_run.append(func)
if enabled:
log("Unknown hardening modules '%s' - ignoring" %
(', '.join(enabled)), level=WARNING)
for hardener in modules_to_run:
log("Executing hardening module '%s'" %
(hardener.__name__), level=DEBUG)
hardener()
else:
log("No hardening applied to '%s'" % (f.__name__), level=DEBUG)
return f(*args, **kwargs)
return _harden_inner2
release = ch_utils.os_release('nova-common')
cmp_os_release = ch_utils.CompareOpenStackReleases(release)
ctxt = {}
if cmp_os_release >= 'rocky':
ctxt.update(vdata_values)
ctxt['metadata_proxy_shared_secret'] = hookenv.leader_get(
'shared-metadata-secret')
ctxt['enable_metadata'] = True
else:
hookenv.log("Vendor metadata has been configured but is not "
"effective in nova-cloud-controller because release "
"{} is prior to Rocky.".format(release),
level=hookenv.DEBUG)
ctxt['enable_metadata'] = False
# NOTE(ganso): always propagate config value for nova-compute since
# we need to apply it there for all releases, and we cannot determine
# whether nova-compute is really the one serving the vendor metadata
for rid in hookenv.relation_ids('cloud-compute'):
hookenv.relation_set(relation_id=rid,
vendor_data=json.dumps(vdata_values))
return ctxt
'sudo',
'-u', ceph_user(),
'ceph',
'--name', 'mon.',
'--keyring',
'/var/lib/ceph/mon/ceph-{}/keyring'.format(
socket.gethostname()
),
'auth',
'get',
'client.{}'.format(name),
]).strip()
return parse_key(output)
except subprocess.CalledProcessError:
# Couldn't get the key, time to create it!
log("Creating new key for {}".format(name), level=DEBUG)
caps = caps or _default_caps
cmd = [
"sudo",
"-u",
ceph_user(),
'ceph',
'--name', 'mon.',
'--keyring',
'/var/lib/ceph/mon/ceph-{}/keyring'.format(
socket.gethostname()
),
'auth', 'get-or-create', 'client.{}'.format(name),
]
# Add capabilities
for subsystem, subcaps in caps.items():
if subsystem == 'osd':
if weight:
kwargs['percent_data'] = weight
if replicas:
kwargs['replicas'] = replicas
if app_name:
kwargs['app_name'] = app_name
pool = ReplicatedPool(service=service,
name=pool_name, **kwargs)
if not pool_exists(service=service, name=pool_name):
log("Creating pool '{}' (replicas={})".format(pool.name, replicas),
level=INFO)
pool.create()
else:
log("Pool '{}' already exists - skipping create".format(pool.name),
level=DEBUG)
# Set a quota if requested
if max_bytes or max_objects:
set_pool_quota(service=service, pool_name=pool_name,
max_bytes=max_bytes, max_objects=max_objects)
def import_key(keyid):
key = keyid.strip()
if (key.startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----') and
key.endswith('-----END PGP PUBLIC KEY BLOCK-----')):
juju_log("PGP key found (looks like ASCII Armor format)", level=DEBUG)
juju_log("Importing ASCII Armor PGP key", level=DEBUG)
with tempfile.NamedTemporaryFile() as keyfile:
with open(keyfile.name, 'w') as fd:
fd.write(key)
fd.write("\n")
cmd = ['apt-key', 'add', keyfile.name]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
error_out("Error importing PGP key '%s'" % key)
else:
juju_log("PGP key found (looks like Radix64 format)", level=DEBUG)
juju_log("Importing PGP key from keyserver", level=DEBUG)
cmd = ['apt-key', 'adv', '--keyserver',
'hkp://keyserver.ubuntu.com:80', '--recv-keys', key]
def __call__(self):
log('Generating template context for ' + self.rel_name, level=DEBUG)
ctxt = {}
if self.service and self.service_user:
# This is required for pki token signing if we don't want /tmp to
# be used.
cachedir = '/var/cache/%s' % (self.service)
if not os.path.isdir(cachedir):
log("Creating service cache dir %s" % (cachedir), level=DEBUG)
mkdir(path=cachedir, owner=self.service_user,
group=self.service_user, perms=0o700)
ctxt['signing_dir'] = cachedir
for rid in relation_ids(self.rel_name):
self.related = True
for unit in related_units(rid):
rdata = relation_get(rid=rid, unit=unit)
serv_host = rdata.get('service_host')
serv_host = format_ipv6_addr(serv_host) or serv_host
auth_host = rdata.get('auth_host')
auth_host = format_ipv6_addr(auth_host) or auth_host
svc_protocol = rdata.get('service_protocol') or 'http'
auth_protocol = rdata.get('auth_protocol') or 'http'
api_version = rdata.get('api_version') or '2.0'
def run_mysql_checks():
log("Starting MySQL hardening checks.", level=DEBUG)
checks = config.get_audits()
for check in checks:
log("Running '%s' check" % (check.__class__.__name__), level=DEBUG)
check.ensure_compliance()
log("MySQL hardening checks complete.", level=DEBUG)
default = 'hmac-sha2-512,hmac-sha2-256,hmac-ripemd160'
macs = {'default': default,
'weak': default + ',hmac-sha1'}
default = ('hmac-sha2-512-etm@openssh.com,'
'hmac-sha2-256-etm@openssh.com,'
'hmac-ripemd160-etm@openssh.com,umac-128-etm@openssh.com,'
'hmac-sha2-512,hmac-sha2-256,hmac-ripemd160')
macs_66 = {'default': default,
'weak': default + ',hmac-sha1'}
# Use newer ciphers on Ubuntu Trusty and above
_release = lsb_release()['DISTRIB_CODENAME'].lower()
if CompareHostReleases(_release) >= 'trusty':
log("Detected Ubuntu 14.04 or newer, using new macs", level=DEBUG)
macs = macs_66
return macs[weak_macs]
def run_ssh_checks():
log("Starting SSH hardening checks.", level=DEBUG)
checks = config.get_audits()
for check in checks:
log("Running '%s' check" % (check.__class__.__name__), level=DEBUG)
check.ensure_compliance()
log("SSH hardening checks complete.", level=DEBUG)
'use high availability')
status_set('blocked', msg)
raise HAIncorrectConfig(msg)
# If dns-ha then one of os-*-hostname must be set
if dns:
dns_settings = ['os-internal-hostname', 'os-admin-hostname',
'os-public-hostname', 'os-access-hostname']
# At this point it is unknown if one or all of the possible
# network spaces are in HA. Validate at least one is set which is
# the minimum required.
for setting in dns_settings:
if config_get(setting):
log('DNS HA: At least one hostname is set {}: {}'
''.format(setting, config_get(setting)),
level=DEBUG)
return True
msg = ('DNS HA: At least one os-*-hostname(s) must be set to use '
'DNS HA')
status_set('blocked', msg)
raise HAIncompleteConfig(msg)
log('VIP HA: VIP is set {}'.format(vip), level=DEBUG)
return True