Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@kopf.on.create('group', 'version', 'plural')
def fn(**_):
pass
@kopf.on.create('sigproc.viasat.io', 'v1', 'triadsets')
def TriadSetCreate(spec, meta, **_):
logger = NHDCommon.GetLogger(__name__)
logger.info(f'Found new TriadSet for component {spec["serviceName"]} with {spec["replicas"]} replicas in namespace {meta["namespace"]}')
@kopf.on.create('', 'v1', 'pods',
annotations={'sigproc.viasat.io/cfg_type': 'triad'},
when=lambda spec, **_: spec.get('schedulerName') == NHD_SCHED_NAME)
def TriadPodCreate(spec, meta, **_):
logger = NHDCommon.GetLogger(__name__)
logger.info(f'Saw new Triad pod {meta["namespace"]}.{meta["name"]}')
k8sq = qinst # Get the watch queue so we can notify NHD of events from the controller
k8sq.put({"type": NHDWatchTypes.NHD_WATCH_TYPE_TRIAD_POD_CREATE, "pod": {"ns": meta["namespace"], "name": meta["name"]}})
@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_1(body, meta, spec, status, **kwargs):
children = _create_children(owner=body)
kopf.info(body, reason='AnyReason')
kopf.event(body, type='Warning', reason='SomeReason', message="Cannot do something")
kopf.event(children, type='Normal', reason='SomeReason', message="Created as part of the job1step")
return {'job1-status': 100}
@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn(body, **kwargs):
# The all-purpose function for the vent creation.
kopf.event(body, type="SomeType", reason="SomeReason", message="Some message")
# The shortcuts for the conventional events and common cases.
kopf.info(body, reason="SomeReason", message="Some message")
kopf.warn(body, reason="SomeReason", message="Some message")
try:
raise RuntimeError("Exception text.")
except:
kopf.exception(body, reason="SomeReason", message="Some exception:")
@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def instant_failure_with_only_a_message(**kwargs):
raise kopf.PermanentError("Fail once and for all.")
@kopf.on.create('', 'v1', 'pods', when=lambda meta, **_: meta['ownerReferences'][0]['kind']=='StatefulSet')
def create_fn(meta, spec, namespace, status, logger, **kwargs):
if 'statefulset.kubernetes.io/pod-name' in meta['labels']:
label_selector = {'statefulset.kubernetes.io/pod-name': meta['labels']['statefulset.kubernetes.io/pod-name']}
service_ports = []
logging.debug(spec['containers'])
for container in spec['containers']:
if 'ports' in container:
for port in container['ports']:
service_ports.append(client.models.V1ServicePort(port=port['containerPort'],protocol='UDP'))
api_response = create_node_port_service(namespace, label_selector, service_ports)
node_port_spec = api_response.spec
node_port_ports = node_port_spec.ports
#logging.debug(node_port_ports)
node_port = node_port_ports[0].node_port
#TODO Get nodePort from api_response
instance_id, public_ip = get_instance_id(spec['nodeName'])
@kopf.on.create('batch', 'v1', 'jobs', labels={LABEL_PARENT_KIND: CRD_BACKUP_SCHEDULE.name})
@kopf.on.resume('batch', 'v1', 'jobs', labels={LABEL_PARENT_KIND: CRD_BACKUP_SCHEDULE.name})
@kopf.on.delete('batch', 'v1', 'jobs', labels={LABEL_PARENT_KIND: CRD_BACKUP_SCHEDULE.name})
@kopf.on.field('batch', 'v1', 'jobs', field='status', labels={LABEL_PARENT_KIND: CRD_BACKUP_SCHEDULE.name})
def benji_track_job_status_backup_schedule(**_) -> Optional[Dict[str, Any]]:
return track_job_status(crd=CRD_BACKUP_SCHEDULE, **_)
@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': 'somevalue'})
def create_with_labels_satisfied(logger, **kwargs):
logger.info("Label satisfied.")