Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
) -> BeaconState:
eth1_block_hash, eth1_timestamp, deposits = inputs
return initialize_beacon_state_from_eth1(
eth1_block_hash=eth1_block_hash,
eth1_timestamp=eth1_timestamp,
deposits=deposits,
config=config,
)
@staticmethod
def condition(output: BeaconState, expected_output: BeaconState) -> None:
validate_state(output, expected_output)
GenesisHandlerType = Tuple[Type[ValidityHandler], Type[InitializationHandler]]
class GenesisTestType(TestType[GenesisHandlerType]):
name = "genesis"
handlers = (ValidityHandler, InitializationHandler)
:param log_result_obj: log result of function call.
:type log_result_obj: bool
.. versionchanged:: 3.3.0 Extract func from log and do not use Union.
.. versionchanged:: 5.1.0 log_traceback parameter
.. versionchanged:: 8.0.0 pick up logger from target module if possible
"""
super().__init__(func=func)
# Typing fix:
if blacklisted_names is None:
self.__blacklisted_names: typing.List[str] = []
else:
self.__blacklisted_names = list(blacklisted_names)
if blacklisted_exceptions is None:
self.__blacklisted_exceptions: typing.List[typing.Type[Exception]] = []
else:
self.__blacklisted_exceptions = list(blacklisted_exceptions)
if isinstance(log, logging.Logger):
self.__logger: typing.Optional[logging.Logger] = log
else:
self.__logger = None
if func is not None: # Special case: we can prefetch logger
self.__logger = self._get_logger_for_func(func)
self.__log_level: int = log_level
self.__exc_level: int = exc_level
self.__max_indent: int = max_indent
self.__spec: typing.Optional[typing.Callable[..., FuncResultType]] = spec or self._func # type: ignore
self.__log_call_args: bool = log_call_args
def factory() -> SiteManagement:
if cmk_version.is_raw_edition():
cls: Type[SiteManagement] = CRESiteManagement
else:
cls = CEESiteManagement
return cls()
pass
class InvalidCredentialsError(EufySecurityError):
"""Define an error for unauthenticated accounts."""
pass
class RequestError(EufySecurityError):
"""Define an error related to invalid requests."""
pass
ERRORS: Dict[int, Type[EufySecurityError]] = {26006: InvalidCredentialsError}
def raise_error(data: dict) -> None:
"""Raise the appropriate error based upon a response code."""
cls = ERRORS.get(data["code"], EufySecurityError)
raise cls(data["msg"])
def penalize_queen(self, peer: ETHPeer) -> None:
...
class BeamStateBackfill(BaseService, PeerSubscriber, QueenTrackerAPI):
"""
Use a very simple strategy to fill in state in the background.
Ask each peer in sequence for some nodes, ignoring the lowest RTT node.
Reduce memory pressure by using a depth-first strategy.
An intended side-effect is to build & maintain an accurate measurement of
the round-trip-time that peers take to respond to GetNodeData commands.
"""
# We are only interested in peers entering or leaving the pool
subscription_msg_types: FrozenSet[Type[CommandAPI[Any]]] = frozenset()
# This is a rather arbitrary value, but when the sync is operating normally we never see
# the msg queue grow past a few hundred items, so this should be a reasonable limit for
# now.
msg_queue_maxsize: int = 2000
_total_processed_nodes = 0
_num_added = 0
_num_missed = 0
_report_interval = 10
_num_requests_by_peer: typing.Counter[ETHPeer]
def __init__(
self,
db: AtomicDatabaseAPI,
...
@abstractmethod
def record_response(self,
elapsed: float,
request: TRequestCommand,
result: TResult) -> None:
...
class ResponseCandidateStreamAPI(AsyncioServiceAPI, Generic[TRequestCommand, TResponseCommand]):
response_timeout: float
pending_request: Optional[Tuple[float, 'asyncio.Future[TResponseCommand]']]
request_protocol_type: Type[ProtocolAPI]
response_cmd_type: Type[TResponseCommand]
last_response_time: float
@abstractmethod
def __init__(
self,
connection: ConnectionAPI,
request_protocol_type: Type[ProtocolAPI],
response_cmd_type: Type[TResponseCommand]) -> None:
...
@abstractmethod
def payload_candidates(
self,
request: TRequestCommand,
from typing import Type
from .assets.numba_kernels import gpu_datatile
from .charts.core.core_chart import BaseChart
class DataTile:
dtype: str = "pandas"
cumsum: bool = True
dimensions: int = 2
active_chart: Type[BaseChart] = None
passive_chart: Type[BaseChart] = None
def __init__(
self,
active_chart: Type[BaseChart],
passive_chart: Type[BaseChart],
dtype: str = "pandas",
dimensions: int = 2,
cumsum: bool = True,
):
"""
init function
"""
self.dtype = dtype
self.dimensions = dimensions
self.active_chart = active_chart
def make_interval_emitter(
center_xy: Point,
filenames_and_textures: Sequence[FilenameOrTexture],
emit_interval: float,
emit_duration: float,
particle_speed: float,
particle_lifetime_min: float,
particle_lifetime_max: float,
particle_scale: float = 1.0,
fade_particles: bool = True):
"""Returns an emitter that emits its particles at a constant rate for a given amount of time"""
particle_factory: Type[arcade.LifetimeParticle] = arcade.LifetimeParticle
if fade_particles:
particle_factory = arcade.FadeParticle
return arcade.Emitter(
center_xy=center_xy,
emit_controller=arcade.EmitterIntervalWithTime(emit_interval, emit_duration),
particle_factory=lambda emitter: particle_factory(
filename_or_texture=random.choice(filenames_and_textures),
change_xy=arcade.rand_on_circle((0.0, 0.0), particle_speed),
lifetime=random.uniform(particle_lifetime_min, particle_lifetime_max),
scale=particle_scale
)
async def get_remote_auras(
self, aura_groups: WeakAuras
) -> Tuple[Type[WeakAuras], List[RemoteAura]]:
if not aura_groups.entries:
return (aura_groups.__class__, [])
metadata = await self.get_wago_metadata(aura_groups)
import_strings = await gather((self.get_wago_import_string(m.id) for m in metadata), False)
return (
aura_groups.__class__,
list(zip(aura_groups.entries.values(), metadata, import_strings)),
)
ScheduleType,
SliceEmailSchedule,
)
from superset.models.slice import Slice
from superset.tasks.schedules import schedule_email_report
from superset.utils.core import get_email_address_list, json_iso_dttm_ser
from superset.views.core import json_success
from .base import DeleteMixin, SupersetModelView
class EmailScheduleView(
SupersetModelView, DeleteMixin
): # pylint: disable=too-many-ancestors
_extra_data = {"test_email": False, "test_email_recipients": None}
schedule_type: Optional[Type] = None
schedule_type_model: Optional[Type] = None
page_size = 20
add_exclude_columns = [
"user",
"created_on",
"changed_on",
"created_by",
"changed_by",
]
edit_exclude_columns = add_exclude_columns
description_columns = {
"deliver_as_group": "If enabled, send a single email to all "