Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@kopf.on.delete('group', 'version', 'plural')
def fn(**_):
pass
@kopf.on.delete('group', 'version', 'plural',
id='id', registry=registry,
errors=ErrorsMode.PERMANENT, timeout=123, retries=456, backoff=78,
optional=optional,
labels={'somelabel': 'somevalue'},
annotations={'someanno': 'somevalue'})
def fn(**_):
pass
@kopf.on.delete('', 'v1', 'pods', when=lambda meta, **_: meta['ownerReferences'][0]['kind']=='StatefulSet')
def delete_fn(meta, spec, namespace, logger, **kwargs):
name=meta['labels']['statefulset.kubernetes.io/pod-name']
logging.info(namespace)
logging.info(name)
api_response = api_instance.delete_namespaced_service(name,namespace)
logging.info(api_response)
@kopf.on.delete('batch', 'v1', 'jobs', labels={LABEL_PARENT_KIND: ClusterBenjiRetentionSchedule.kind})
@kopf.on.field('batch', 'v1', 'jobs', field='status', labels={LABEL_PARENT_KIND: ClusterBenjiRetentionSchedule.kind})
def benji_track_job_status_cluster_retention_schedule(**kwargs) -> Optional[Dict[str, Any]]:
return track_job_status(crd=ClusterBenjiRetentionSchedule, **kwargs)
@kopf.on.delete('sigproc.viasat.io', 'v1', 'triadsets')
def delete_fn(meta, **_):
logger = NHDCommon.GetLogger(__name__)
logger.info('Received delete request for TriadSet')
@kopf.on.delete('zalando.org', 'v1', 'kopfexamples')
def delete_fn(**_):
pass
@kopf.on.delete(CRD_CLUSTER_BACKUP_SCHEDULE.api_group, CRD_CLUSTER_BACKUP_SCHEDULE.api_version,
CRD_CLUSTER_BACKUP_SCHEDULE.plural)
def benji_backup_schedule_delete(name: str, namespace: str, body: Dict[str, Any], logger,
**_) -> Optional[Dict[str, Any]]:
try:
benji.k8s_operator.scheduler.remove_job(job_id=cr_to_job_name(body, 'scheduler'))
except JobLookupError:
pass
delete_all_dependant_jobs(name=name, namespace=namespace, kind=body['kind'], logger=logger)
@kopf.on.delete('zalando.org', 'v1', 'kopfexamples')
def delete(body, meta, spec, status, **kwargs):
pass
@kopf.on.delete('batch', 'v1', 'jobs', labels={LABEL_PARENT_KIND: CRD_CLUSTER_BACKUP_SCHEDULE.name})
@kopf.on.field('batch', 'v1', 'jobs', field='status', labels={LABEL_PARENT_KIND: CRD_CLUSTER_BACKUP_SCHEDULE.name})
def benji_track_job_status_cluster_backup_schedule(**_) -> Optional[Dict[str, Any]]:
return track_job_status(crd=CRD_CLUSTER_BACKUP_SCHEDULE, **_)
@kopf.on.delete(*BenjiRestore.group_version_plural())
def benji_backup_schedule_delete(name: str, namespace: str, body: Dict[str, Any], logger,
**_) -> Optional[Dict[str, Any]]:
delete_all_dependant_jobs(name=name, namespace=namespace, kind=body['kind'], logger=logger)