Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(self):
"""
Call the external handler to test whether it should be invoked.
"""
# flush to ensure external process can see flags as they currently
# are, and write flags (flush releases lock)
unitdata.kv().flush()
try:
proc = subprocess.Popen([self._filepath, '--test'], stdout=subprocess.PIPE, env=os.environ)
except OSError as oserr:
if oserr.errno == errno.ENOEXEC:
raise BrokenHandlerException(self._filepath)
raise
self._test_output, _ = proc.communicate()
return proc.returncode == 0
def get_flags():
"""
Return a list of all flags which are set.
"""
flags = unitdata.kv().getrange('reactive.states.', strip=True) or {}
return sorted(flags.keys())
def snap_resources_changed():
'''
Check if the snapped resources have changed. The first time this method is
called will report "unknown".
Returns: "yes" in case a snap resource file has changed,
"no" in case a snap resources are the same as last call,
"unknown" if it is the first time this method is called
'''
db = unitdata.kv()
resources = ['kubectl', 'kube-apiserver', 'kube-controller-manager',
'kube-scheduler', 'cdk-addons']
paths = [hookenv.resource_get(resource) for resource in resources]
if db.get('snap.resources.fingerprint.initialised'):
result = 'yes' if any_file_changed(paths) else 'no'
return result
else:
db.set('snap.resources.fingerprint.initialised', True)
any_file_changed(paths)
return 'unknown'
"{} percona units reporting clustered".format(min_size),
DEBUG)
return False
elif len(set(uuids)) > 1:
log("Found inconsistent bootstrap uuids: "
"{}".format(uuids), level=WARNING)
return False
else:
log("All {} percona units reporting clustered".format(min_size),
DEBUG)
elif not seeded():
# Single unit deployment but not yet bootstrapped
return False
# Set INITIAL_CLUSTERED_KEY as the cluster has fully bootstrapped
kvstore = kv()
if not kvstore.get(INITIAL_CLUSTERED_KEY, False):
kvstore.set(key=INITIAL_CLUSTERED_KEY, value=True)
kvstore.flush()
return True
# Setting this state before easyrsa is configured ensures the tls layer is
# configured to generate certificates with client authentication.
set_state('tls.client.authorization.required')
domain = hookenv.config().get('dns_domain')
cidr = hookenv.config().get('cidr')
sdn_ip = get_sdn_ip(cidr)
# Create extra sans that the tls layer will add to the server cert.
extra_sans = [
sdn_ip,
'kubernetes',
'kubernetes.{0}'.format(domain),
'kubernetes.default',
'kubernetes.default.svc',
'kubernetes.default.svc.{0}'.format(domain)
]
unitdata.kv().set('extra_sans', extra_sans)
def configure_kubernetes_service(service, base_args, extra_args_key):
db = unitdata.kv()
prev_args_key = 'kubernetes-worker.prev_args.' + service
prev_args = db.get(prev_args_key) or {}
extra_args = parse_extra_args(extra_args_key)
args = {}
for arg in prev_args:
# remove previous args by setting to null
args[arg] = 'null'
for k, v in base_args.items():
args[k] = v
for k, v in extra_args.items():
args[k] = v
cmd = ['snap', 'set', service] + ['%s=%s' % item for item in args.items()]
def config_value_changed(option):
"""
Determine if config value changed since last call to this function.
"""
hook_data = unitdata.HookData()
with hook_data():
db = unitdata.kv()
current = config(option)
saved = db.get(option)
db.set(option, current)
if saved is None:
return False
return current != saved
def gather_sdn_data():
'''Get the Software Defined Network (SDN) information and return it as a
dictionary. '''
sdn_data = {}
# The dictionary named 'pillar' is a construct of the k8s template files.
pillar = {}
# SDN Providers pass data via the unitdata.kv module
db = unitdata.kv()
# Ideally the DNS address should come from the sdn cidr.
subnet = db.get('sdn_subnet')
if subnet:
# Generate the DNS ip address on the SDN cidr (this is desired).
pillar['dns_server'] = get_dns_ip(subnet)
else:
# There is no SDN cider fall back to the kubernetes config cidr option.
pillar['dns_server'] = get_dns_ip(hookenv.config().get('cidr'))
# The pillar['dns_domain'] value is used in the kubedns-rc.yaml
pillar['dns_domain'] = hookenv.config().get('dns_domain')
# Use a 'pillar' dictionary so we can reuse the upstream kubedns templates.
sdn_data['pillar'] = pillar
return sdn_data
def freeze_service_cidr():
''' Freeze the service CIDR. Once the apiserver has started, we can no
longer safely change this value. '''
db = unitdata.kv()
db.set('kubernetes-master.service-cidr', service_cidr())