Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@kopf.on.field('group', 'version', 'plural')
def fn(**_):
pass
@kopf.on.field('batch', 'v1', 'jobs', field='status', labels={LABEL_PARENT_KIND: CRD_OPERATOR_CONFIG.name})
def benji_track_job_status_maintenance(**_) -> Optional[Dict[str, Any]]:
return track_job_status(crd=CRD_OPERATOR_CONFIG, **_)
@kopf.on.field('zalando.org', 'v1', 'kopfexamples', field='spec.field')
def field_fn(old, new, **kwargs):
print(f'FIELD CHANGED: {old} -> {new}')
@kopf.on.field('batch', 'v1', 'jobs', field='status', labels={LABEL_PARENT_KIND: ClusterBenjiRetentionSchedule.kind})
def benji_track_job_status_cluster_retention_schedule(**kwargs) -> Optional[Dict[str, Any]]:
return track_job_status(crd=ClusterBenjiRetentionSchedule, **kwargs)
@kopf.on.field('zalando.org', 'v1', 'kopfexamples', field='spec.lst')
def update_lst(body, meta, spec, status, old, new, **kwargs):
print(f'Handling the FIELD = {old} -> {new}')
@kopf.on.field('batch', 'v1', 'jobs', field='status', labels={LABEL_PARENT_KIND: BenjiRetentionSchedule.kind})
def benji_track_job_status_retention_schedule(**kwargs) -> Optional[Dict[str, Any]]:
return track_job_status(crd=BenjiRetentionSchedule, **kwargs)
@kopf.on.field(*BenjiVersion.group_version_plural(), field='status.protected')
def benji_protect(name: str, status: Dict[str, Any], body: Dict[str, Any], **_) -> Optional[Dict[str, Any]]:
benji = APIClient()
check_version_access(benji, name, body)
protected = status.get('protected', False)
benji.protect(name, protected)