Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_schema_extend_overrides():
"""Verify that Schema.extend can override required/extra parameters."""
base = Schema({'a': int}, required=True)
extended = base.extend({'b': str}, required=False, extra=voluptuous.ALLOW_EXTRA)
assert base.required == True
assert base.extra == voluptuous.PREVENT_EXTRA
assert extended.required == False
assert extended.extra == voluptuous.ALLOW_EXTRA
SERVICE_START_REMOTE_CONTROL,
SERVICE_STOP_REMOTE_CONTROL,
)
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = "Xiaomi Vacuum cleaner"
DATA_KEY = "vacuum.xiaomi_miio"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_TOKEN): vol.All(str, vol.Length(min=32, max=32)),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
},
extra=vol.ALLOW_EXTRA,
)
FAN_SPEEDS = {"Quiet": 38, "Balanced": 60, "Turbo": 77, "Max": 90, "Gentle": 105}
ATTR_CLEAN_START = "clean_start"
ATTR_CLEAN_STOP = "clean_stop"
ATTR_CLEANING_TIME = "cleaning_time"
ATTR_DO_NOT_DISTURB = "do_not_disturb"
ATTR_DO_NOT_DISTURB_START = "do_not_disturb_start"
ATTR_DO_NOT_DISTURB_END = "do_not_disturb_end"
ATTR_MAIN_BRUSH_LEFT = "main_brush_left"
ATTR_SIDE_BRUSH_LEFT = "side_brush_left"
ATTR_FILTER_LEFT = "filter_left"
ATTR_SENSOR_DIRTY_LEFT = "sensor_dirty_left"
ATTR_CLEANING_COUNT = "cleaning_count"
ATTR_CLEANED_TOTAL_AREA = "total_cleaned_area"
NOTIFICATION_TITLE = "Skybell Sensor Setup"
DOMAIN = "skybell"
DEFAULT_CACHEDB = "./skybell_cache.pickle"
DEFAULT_ENTITY_NAMESPACE = "skybell"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
}
)
},
extra=vol.ALLOW_EXTRA,
)
def setup(hass, config):
"""Set up the Skybell component."""
conf = config[DOMAIN]
username = conf.get(CONF_USERNAME)
password = conf.get(CONF_PASSWORD)
try:
cache = hass.config.path(DEFAULT_CACHEDB)
skybell = Skybell(
username=username, password=password, get_devices=True, cache_path=cache
)
hass.data[DOMAIN] = skybell
MIN_TIME_BETWEEN_EVENT_UPDATES = timedelta(seconds=5)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_SECRET_KEY): cv.string,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_WEBHOOKS, default=DEFAULT_WEBHOOKS): cv.boolean,
vol.Optional(CONF_DISCOVERY, default=DEFAULT_DISCOVERY): cv.boolean,
}
)
},
extra=vol.ALLOW_EXTRA,
)
SCHEMA_SERVICE_ADDWEBHOOK = vol.Schema({vol.Optional(CONF_URL): cv.string})
SCHEMA_SERVICE_DROPWEBHOOK = vol.Schema({})
def setup(hass, config):
"""Set up the Netatmo devices."""
hass.data[DATA_PERSONS] = {}
try:
auth = pyatmo.ClientAuth(
config[DOMAIN][CONF_API_KEY],
config[DOMAIN][CONF_SECRET_KEY],
config[DOMAIN][CONF_USERNAME],
{
# pylint: disable=no-value-for-parameter
vol.Optional(CONF_TITLE): cv.string,
vol.Optional(CONF_ICON): cv.icon,
vol.Optional(CONF_REQUIRE_ADMIN, default=False): cv.boolean,
vol.Required(CONF_URL): vol.Any(
vol.Match(
CONF_RELATIVE_URL_REGEX, msg=CONF_RELATIVE_URL_ERROR_MSG
),
vol.Url(),
),
}
)
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass, config):
"""Set up the iFrame frontend panels."""
for url_path, info in config[DOMAIN].items():
hass.components.frontend.async_register_built_in_panel(
"iframe",
info.get(CONF_TITLE),
info.get(CONF_ICON),
url_path,
{"url": info[CONF_URL]},
require_admin=info[CONF_REQUIRE_ADMIN],
)
return True
from homeassistant.helpers.dispatcher import async_dispatcher_send
from . import const
from .const import DATA_UNSUBSCRIBE, DOMAIN, PLATFORMS, TOPIC_OPENZWAVE
from .discovery import DISCOVERY_SCHEMAS, check_node_schema, check_value_schema
from .entity import (
ZWaveDeviceEntityValues,
create_device_id,
create_device_name,
create_value_id,
)
from .services import ZWaveServices
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema({DOMAIN: vol.Schema({})}, extra=vol.ALLOW_EXTRA)
DATA_DEVICES = "zwave-mqtt-devices"
async def async_setup(hass: HomeAssistant, config: dict):
"""Initialize basic config of zwave_mqtt component."""
hass.data[DOMAIN] = {}
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up zwave_mqtt from a config entry."""
@callback
def async_receive_message(msg):
manager.receive_message(msg.topic, msg.payload)
from .const import (
ATTR_ENDPOINTS,
ATTR_STREAMS,
CONF_DURATION,
CONF_LOOKBACK,
CONF_STREAM_SOURCE,
DOMAIN,
SERVICE_RECORD,
)
from .core import PROVIDERS
from .hls import async_setup_hls
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema({DOMAIN: vol.Schema({})}, extra=vol.ALLOW_EXTRA)
STREAM_SERVICE_SCHEMA = vol.Schema({vol.Required(CONF_STREAM_SOURCE): cv.string})
SERVICE_RECORD_SCHEMA = STREAM_SERVICE_SCHEMA.extend(
{
vol.Required(CONF_FILENAME): cv.string,
vol.Optional(CONF_DURATION, default=30): int,
vol.Optional(CONF_LOOKBACK, default=0): int,
}
)
# Set log level to error for libav
logging.getLogger("libav").setLevel(logging.ERROR)
@bind_hass
def request_stream(hass, stream_source, *, fmt="hls", keepalive=False, options=None):
vol.Optional(CONF_BINARY_SENSORS): vol.All(
cv.ensure_list, [vol.In(BINARY_SENSORS)]
),
vol.Optional(CONF_SENSORS): vol.All(
cv.ensure_list, [vol.In(SENSORS)], _deprecated_sensor_values
),
vol.Optional(CONF_SWITCHES): vol.All(cv.ensure_list, [vol.In(SWITCHES)]),
vol.Optional(CONF_CONTROL_LIGHT, default=True): cv.boolean,
}
),
_deprecated_switches,
)
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: vol.All(cv.ensure_list, [AMCREST_SCHEMA], _has_unique_names)},
extra=vol.ALLOW_EXTRA,
)
# pylint: disable=too-many-ancestors
class AmcrestChecker(Http):
"""amcrest.Http wrapper for catching errors."""
def __init__(self, hass, name, host, port, user, password):
"""Initialize."""
self._hass = hass
self._wrap_name = name
self._wrap_errors = 0
self._wrap_lock = threading.Lock()
self._unsub_recheck = None
super().__init__(
host, port, user, password, retries_connection=1, timeout_protocol=3.05
})
METER_CONFIG_SCHEMA = vol.Schema({
vol.Required(CONF_SOURCE_SENSOR): cv.entity_id,
vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_METER_TYPE): vol.In(METER_TYPES),
vol.Optional(CONF_METER_OFFSET, default=0): cv.positive_int,
vol.Optional(CONF_TARIFFS, default=[]): vol.All(
cv.ensure_list, [cv.string]),
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
cv.slug: METER_CONFIG_SCHEMA,
}),
}, extra=vol.ALLOW_EXTRA)
async def async_setup(hass, config):
"""Set up an Utility Meter."""
component = EntityComponent(_LOGGER, DOMAIN, hass)
hass.data[DATA_UTILITY] = {}
for meter, conf in config.get(DOMAIN).items():
_LOGGER.debug("Setup %s.%s", DOMAIN, meter)
hass.data[DATA_UTILITY][meter] = conf
if not conf[CONF_TARIFFS]:
# only one entity is required
hass.async_create_task(discovery.async_load_platform(
hass, SENSOR_DOMAIN, DOMAIN,
ATTR_PAYLOAD_TEMPLATE = 'payload_template'
# { "subject": "subject", "eventtype":"eventtype","payload":{}, "name": "second" }
DEFAULT_EVENT_TYPE = 'HomeAssistant'
DEFAULT_DATA_VERSION = 1
TOPIC_CONFIG_SCHEMA = vol.Schema({
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_TOPIC_KEY): cv.string
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
cv.slug: TOPIC_CONFIG_SCHEMA,
}),
}, extra=vol.ALLOW_EXTRA)
MQTT_PUBLISH_SCHEMA = vol.Schema({
vol.Required(CONF_NAME): cv.string,
vol.Required(ATTR_SUBJECT): cv.string,
vol.Exclusive(ATTR_PAYLOAD, CONF_PAYLOAD): object,
vol.Exclusive(ATTR_PAYLOAD_TEMPLATE, CONF_PAYLOAD): cv.string,
vol.Optional(ATTR_EVENT_TYPE, default=DEFAULT_EVENT_TYPE): cv.string,
vol.Optional(ATTR_DATA_VERSION, default=DEFAULT_DATA_VERSION): cv.string,
}, required=True)
async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool:
"""Set up the Azure Event Grid platform."""
try:
LOGGER.debug("async_setup")