Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': 'othervalue'})
def create_with_labels_not_satisfied(logger, **kwargs):
logger.info("Label not satisfied.")
@kopf.on.create('zalando.org', 'v1', 'kopfexamples', retries=3, backoff=1.0)
def eventual_failure_with_tracebacks(**kwargs):
raise MyException("An error that is supposed to be recoverable.")
@kopf.on.event('zalando.org', 'v1', 'kopfexamples')
def normal_event_fn(event, **kwargs):
print(f"Event received: {event!r}")
@kopf.on.login(errors=kopf.ErrorsMode.PERMANENT)
async def login_fn(**kwargs):
print('Logging in in 2s...')
await asyncio.sleep(2.0)
# An equivalent of kopf.login_via_pykube(), but shrinked for demo purposes.
config = pykube.KubeConfig.from_env()
ca = config.cluster.get('certificate-authority')
cert = config.user.get('client-certificate')
pkey = config.user.get('client-key')
return kopf.ConnectionInfo(
server=config.cluster.get('server'),
ca_path=ca.filename() if ca else None, # can be a temporary file
insecure=config.cluster.get('insecure-skip-tls-verify'),
username=config.user.get('username'),
password=config.user.get('password'),
scheme='Bearer',
@kopf.on.event(runtime.operator_domain, runtime.api_version, 'anarchysubjects')
def handle_subject_event(event, logger, **_):
'''
Anarchy uses on.event instead of on.delete because Anarchy needs custom
for removing finalizers. The finalizer will be removed immediately if the
AnarchyGovernor does not have a delete subject event handler. If there is
a delete subject event handler then it is up to the governor logic to remove
the finalizer.
'''
obj = event.get('object')
if obj and obj.get('apiVersion') == runtime.api_group_version:
if event['type'] in ['ADDED', 'MODIFIED', None]:
subject = AnarchySubject(obj)
if subject.is_pending_delete:
subject.handle_delete(runtime)
@kopf.on.delete('batch', 'v1', 'jobs', labels={LABEL_PARENT_KIND: CRD_OPERATOR_CONFIG.name})
@kopf.on.field('batch', 'v1', 'jobs', field='status', labels={LABEL_PARENT_KIND: CRD_OPERATOR_CONFIG.name})
def benji_track_job_status_maintenance(**_) -> Optional[Dict[str, Any]]:
return track_job_status(crd=CRD_OPERATOR_CONFIG, **_)