Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def resolve_address(endpoint_type=PUBLIC):
resolved_address = None
if is_clustered():
if config(_address_map[endpoint_type]['config']) is None:
# Assume vip is simple and pass back directly
resolved_address = config('vip')
else:
for vip in config('vip').split():
if is_address_in_network(
config(_address_map[endpoint_type]['config']),
vip):
resolved_address = vip
else:
if config('prefer-ipv6'):
fallback_addr = get_ipv6_addr(exc_list=[config('vip')])[0]
else:
fallback_addr = unit_get(_address_map[endpoint_type]['fallback'])
resolved_address = get_address_in_network(
config(_address_map[endpoint_type]['config']), fallback_addr)
if resolved_address is None:
raise ValueError('Unable to resolve a suitable IP address'
' based on charm state and configuration')
else:
return resolved_address
ctxt['live_migration_downtime_steps'] = \
config('live-migration-downtime-steps')
ctxt['live_migration_downtime_delay'] = \
config('live-migration-downtime-delay')
ctxt['live_migration_permit_post_copy'] = \
config('live-migration-permit-post-copy')
ctxt['live_migration_permit_auto_converge'] = \
config('live-migration-permit-auto-converge')
if config('instances-path') is not None:
ctxt['instances_path'] = config('instances-path')
if config('disk-cachemodes'):
ctxt['disk_cachemodes'] = config('disk-cachemodes')
if config('use-multipath'):
ctxt['use_multipath'] = config('use-multipath')
if config('default-ephemeral-format'):
ctxt['default_ephemeral_format'] = \
config('default-ephemeral-format')
if config('cpu-mode'):
ctxt['cpu_mode'] = config('cpu-mode')
elif ctxt['arch'] in ('ppc64el', 'ppc64le', 'aarch64'):
ctxt['cpu_mode'] = 'host-passthrough'
elif ctxt['arch'] == 's390x':
ctxt['cpu_mode'] = 'none'
if config('cpu-model'):
ctxt['cpu_model'] = config('cpu-model')
def db_joined():
if config('prefer-ipv6'):
sync_db_with_multi_ipv6_addresses(config('database'),
config('database-user'))
else:
# Avoid churn check for access-network early
access_network = None
for unit in related_units():
access_network = relation_get(unit=unit,
attribute='access-network')
if access_network:
break
host = get_relation_ip('shared-db', cidr_network=access_network)
relation_set(database=config('database'),
username=config('database-user'),
hostname=host)
(default: 'openstack-origin')
:type source_key: Optional[str]
:returns: OpenStack release codename
:rtype: str
"""
source_key = source_key or 'openstack-origin'
if not base:
base = UBUNTU_OPENSTACK_RELEASE[lsb_release()['DISTRIB_CODENAME']]
global _os_rel
if reset_cache:
reset_os_release()
if _os_rel:
return _os_rel
_os_rel = (
get_os_codename_package(package, fatal=False) or
get_os_codename_install_source(config(source_key)) or
base)
return _os_rel
def set_privileged():
"""Update the allow-privileged flag for kubelet.
"""
privileged = hookenv.config('allow-privileged')
if privileged == 'auto':
gpu_enabled = is_state('kubernetes-worker.gpu.enabled')
privileged = 'true' if gpu_enabled else 'false'
if privileged == 'true':
set_state('kubernetes-worker.privileged')
else:
remove_state('kubernetes-worker.privileged')
def git_reinstall():
"""Reinstall from source and restart services.
If the openstack-origin-git config option was used to install openstack
from source git repositories, then this action can be used to reinstall
from updated git repositories, followed by a restart of services."""
if not git_install_requested():
action_fail('openstack-origin-git is not configured')
return
try:
git_install(config('openstack-origin-git'))
config_changed()
except:
action_set({'traceback': traceback.format_exc()})
action_fail('git-reinstall resulted in an unexpected error')
'replicas-container' options.
:param ring: ring name
:type ring: str
:returns: replicas for the ring
"""
if ring == 'account':
if config('replicas-account'):
return config('replicas-account')
else:
return config('replicas')
elif ring == 'container':
if config('replicas-container'):
return config('replicas-container')
else:
return config('replicas')
else:
return config('replicas')
def set_upgrade_needed():
set_state('kubernetes-worker.snaps.upgrade-needed')
config = hookenv.config()
previous_channel = config.previous('channel')
require_manual = config.get('require-manual-upgrade')
if previous_channel is None or not require_manual:
set_state('kubernetes-worker.snaps.upgrade-specified')
def configure_easrsa():
'''Require the tls layer to generate certificates with "clientAuth". '''
# By default easyrsa generates the server certificates without clientAuth
# Setting this state before easyrsa is configured ensures the tls layer is
# configured to generate certificates with client authentication.
set_state('tls.client.authorization.required')
domain = hookenv.config().get('dns_domain')
cidr = hookenv.config().get('cidr')
sdn_ip = get_sdn_ip(cidr)
# Create extra sans that the tls layer will add to the server cert.
extra_sans = [
sdn_ip,
'kubernetes',
'kubernetes.{0}'.format(domain),
'kubernetes.default',
'kubernetes.default.svc',
'kubernetes.default.svc.{0}'.format(domain)
]
unitdata.kv().set('extra_sans', extra_sans)
def getStorageBackend():
storage_backend = hookenv.config('storage-backend')
if storage_backend == 'auto':
storage_backend = leader_get('auto_storage_backend')
return storage_backend