Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
default: Union[_T, _UNSET] = _UNSET.token,
session: Optional[auth.APISession] = None, # injected by the decorator
) -> Union[bodies.Body, _T]:
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")
is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
namespace = namespace if is_namespaced else None
try:
response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace, name=name),
)
response.raise_for_status()
respdata = await response.json()
return cast(bodies.Body, respdata)
except aiohttp.ClientResponseError as e:
if e.status in [403, 404] and not isinstance(default, _UNSET):
return default
raise
class ResourceCause(BaseCause):
resource: resources.Resource
patch: patches.Patch
body: bodies.Body
memo: containers.ObjectDict
@dataclasses.dataclass
class ResourceWatchingCause(ResourceCause):
"""
The raw event received from the API.
It is a read-only mapping with some extra properties and methods.
"""
type: bodies.EventType
raw: bodies.Event
@dataclasses.dataclass
class ResourceChangingCause(ResourceCause):
"""
The cause is what has caused the whole reaction as a chain of handlers.
Unlike the low-level Kubernetes watch-events, the cause is aware
of actual field changes, including multi-handler changes.
"""
initial: bool
reason: Reason
diff: diffs.Diff = diffs.EMPTY
old: Optional[bodies.BodyEssence] = None
new: Optional[bodies.BodyEssence] = None
def retrieve_essence(
body: bodies.Body,
) -> Optional[bodies.BodyEssence]:
if not has_essence_stored(body):
return None
essence_str: str = body['metadata']['annotations'][LAST_SEEN_ANNOTATION]
essence_obj: bodies.BodyEssence = json.loads(essence_str)
return essence_obj
async def read_obj(
*,
resource: resources.Resource,
namespace: Optional[str] = None,
name: Optional[str] = None,
default: Union[_T, _UNSET] = _UNSET.token,
session: Optional[auth.APISession] = None, # injected by the decorator
) -> Union[bodies.Body, _T]:
if session is None:
raise RuntimeError("API instance is not injected by the decorator.")
is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
namespace = namespace if is_namespaced else None
try:
response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace, name=name),
)
response.raise_for_status()
respdata = await response.json()
return cast(bodies.Body, respdata)
except aiohttp.ClientResponseError as e:
if e.status in [403, 404] and not isinstance(default, _UNSET):
def remove_owner_reference(
objs: K8sObjects,
owner: Optional[bodies.Body] = None,
) -> None:
"""
Remove an owner reference to the resource(s), if it is there.
Note: the owned objects are usually not the one being processed,
so the whole body can be modified, no patches are needed.
"""
real_owner = _guess_owner(owner)
owner_ref = bodies.build_owner_reference(real_owner)
for obj in cast(Iterator[K8sObject], dicts.walk(objs)):
refs = obj.setdefault('metadata', {}).setdefault('ownerReferences', [])
matching = [ref for ref in refs if ref.get('uid') == owner_ref['uid']]
for ref in matching:
refs.remove(ref)
@dataclasses.dataclass
class BaseCause:
logger: Union[logging.Logger, logging.LoggerAdapter]
@dataclasses.dataclass
class ActivityCause(BaseCause):
activity: Activity
@dataclasses.dataclass
class ResourceCause(BaseCause):
resource: resources.Resource
patch: patches.Patch
body: bodies.Body
memo: containers.ObjectDict
@dataclasses.dataclass
class ResourceWatchingCause(ResourceCause):
"""
The raw event received from the API.
It is a read-only mapping with some extra properties and methods.
"""
type: bodies.EventType
raw: bodies.Event
@dataclasses.dataclass
class ResourceChangingCause(ResourceCause):
async def streaming_watch(
*,
resource: resources.Resource,
namespace: Optional[str],
) -> AsyncIterator[bodies.Event]:
"""
Stream the watch-events from one single API watch-call.
"""
# First, list the resources regularly, and get the list's resource version.
# Simulate the events with type "None" event - used in detection of causes.
items, resource_version = await fetching.list_objs_rv(resource=resource, namespace=namespace)
for item in items:
yield {'type': None, 'object': item}
# Repeat through disconnects of the watch as long as the resource version is valid (no errors).
# The individual watching API calls are disconnected by timeout even if the stream is fine.
while True:
# Then, watch the resources starting from the list's resource version.
stream = watch_objs(
def append_owner_reference(
objs: K8sObjects,
owner: Optional[bodies.Body] = None,
) -> None:
"""
Append an owner reference to the resource(s), if it is not yet there.
Note: the owned objects are usually not the one being processed,
so the whole body can be modified, no patches are needed.
"""
real_owner = _guess_owner(owner)
owner_ref = bodies.build_owner_reference(real_owner)
for obj in cast(Iterator[K8sObject], dicts.walk(objs)):
refs = obj.setdefault('metadata', {}).setdefault('ownerReferences', [])
matching = [ref for ref in refs if ref.get('uid') == owner_ref['uid']]
if not matching:
refs.append(owner_ref)
async def __call__(
self,
*,
event: bodies.Event,
replenished: asyncio.Event,
) -> None: ...
# An end-of-stream marker sent from the watcher to the workers.
# See: https://www.python.org/dev/peps/pep-0484/#support-for-singleton-types-in-unions
class EOS(enum.Enum):
token = enum.auto()
if TYPE_CHECKING:
WatchEventQueue = asyncio.Queue[Union[bodies.Event, EOS]]
else:
WatchEventQueue = asyncio.Queue
class Stream(NamedTuple):
""" A single object's stream of watch-events, with some extra helpers. """
watchevents: WatchEventQueue
replenished: asyncio.Event # means: "hurry up, there are new events queued again"
ObjectUid = NewType('ObjectUid', str)
ObjectRef = Tuple[resources.Resource, ObjectUid]
Streams = MutableMapping[ObjectRef, Stream]
# TODO: add the label_selector support for the dev-mode?
raw: bodies.Event
@dataclasses.dataclass
class ResourceChangingCause(ResourceCause):
"""
The cause is what has caused the whole reaction as a chain of handlers.
Unlike the low-level Kubernetes watch-events, the cause is aware
of actual field changes, including multi-handler changes.
"""
initial: bool
reason: Reason
diff: diffs.Diff = diffs.EMPTY
old: Optional[bodies.BodyEssence] = None
new: Optional[bodies.BodyEssence] = None
@property
def event(self) -> Reason:
warnings.warn("`cause.event` is deprecated; use `cause.reason`.", DeprecationWarning)
return self.reason
@property
def deleted(self) -> bool:
""" Used to conditionally skip/select the @on.resume handlers if the object is deleted. """
return finalizers.is_deleted(self.body)
def detect_resource_watching_cause(
event: bodies.Event,
**kwargs: Any,
) -> ResourceWatchingCause: