Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ATTR_WATCHDOG,
CHANNEL_BETA,
CHANNEL_DEV,
CHANNEL_STABLE,
)
from .utils.validate import validate_timezone
RE_REPOSITORY = re.compile(r"^(?P[^#]+)(?:#(?P[\w\-]+))?$")
# pylint: disable=no-value-for-parameter
# pylint: disable=invalid-name
network_port = vol.All(vol.Coerce(int), vol.Range(min=1, max=65535))
wait_boot = vol.All(vol.Coerce(int), vol.Range(min=1, max=60))
docker_image = vol.Match(r"^[\w{}]+/[\-\w{}]+$")
alsa_device = vol.Maybe(vol.Match(r"\d+,\d+"))
channels = vol.In([CHANNEL_STABLE, CHANNEL_BETA, CHANNEL_DEV])
uuid_match = vol.Match(r"^[0-9a-f]{32}$")
sha256 = vol.Match(r"^[0-9a-f]{64}$")
token = vol.Match(r"^[0-9a-f]{32,256}$")
log_level = vol.In(["debug", "info", "warning", "error", "critical"])
def dns_url(url: str) -> str:
""" takes a DNS url (str) and validates that it matches the scheme dns://."""
if not url.lower().startswith("dns://"):
raise vol.Invalid("Doesn't start with dns://")
address: str = url[6:] # strip the dns:// off
try:
ipaddress.ip_address(address) # matches ipv4 or ipv6 addresses
except ValueError:
raise vol.Invalid("Invalid DNS URL: {}".format(url))
CHANNEL_DEV,
CHANNEL_STABLE,
)
from .utils.validate import validate_timezone
RE_REPOSITORY = re.compile(r"^(?P[^#]+)(?:#(?P[\w\-]+))?$")
# pylint: disable=no-value-for-parameter
# pylint: disable=invalid-name
network_port = vol.All(vol.Coerce(int), vol.Range(min=1, max=65535))
wait_boot = vol.All(vol.Coerce(int), vol.Range(min=1, max=60))
docker_image = vol.Match(r"^[\w{}]+/[\-\w{}]+$")
alsa_device = vol.Maybe(vol.Match(r"\d+,\d+"))
channels = vol.In([CHANNEL_STABLE, CHANNEL_BETA, CHANNEL_DEV])
uuid_match = vol.Match(r"^[0-9a-f]{32}$")
sha256 = vol.Match(r"^[0-9a-f]{64}$")
token = vol.Match(r"^[0-9a-f]{32,256}$")
log_level = vol.In(["debug", "info", "warning", "error", "critical"])
def dns_url(url: str) -> str:
""" takes a DNS url (str) and validates that it matches the scheme dns://."""
if not url.lower().startswith("dns://"):
raise vol.Invalid("Doesn't start with dns://")
address: str = url[6:] # strip the dns:// off
try:
ipaddress.ip_address(address) # matches ipv4 or ipv6 addresses
except ValueError:
raise vol.Invalid("Invalid DNS URL: {}".format(url))
return url
GadgetYAML = Schema({
Optional('defaults'): {
str: {
str: object
}
},
Optional('connections'): [Schema({
Required('plug'): str,
Optional('slot'): str,
})
],
Optional('device-tree-origin', default='gadget'): str,
Optional('device-tree'): str,
Optional('format'): YAMLFormat,
Required('volumes'): {
Match('^[a-zA-Z0-9][-a-zA-Z0-9]*$'): Schema({
Optional('schema', default='gpt' if has_new_voluptuous()
else VolumeSchema.gpt):
Enumify(VolumeSchema),
Optional('bootloader'): Enumify(
BootLoader, preprocessor=methodcaller('replace', '-', '')),
Optional('id'): Coerce(Id),
Required('structure'): [Schema({
Optional('name'): str,
Optional('offset'): Coerce(as_size),
Optional('offset-write'): Any(
Coerce(Size32bit), RelativeOffset),
Required('size'): Coerce(as_size),
Required('type'): Any('mbr', 'bare', Coerce(HybridId)),
Optional('role'): Enumify(
StructureRole,
preprocessor=methodcaller('replace', '-', '_')),
return vol.All(
vol.Any(int, str), lambda v: util.expand_range_spec(v, min_value, max_value)
)
ENTITY_ID_VALIDATOR = vol.Match(r"^[A-Za-z_]+\.[A-Za-z0-9_]+$")
PARTIAL_DATE_SCHEMA = vol.Schema(
{
vol.Optional("year"): vol.All(int, vol.Range(min=1970, max=2099)),
vol.Optional("month"): vol.All(int, vol.Range(min=1, max=12)),
vol.Optional("day"): vol.All(int, vol.Range(min=1, max=31)),
}
)
TIME_VALIDATOR = vol.All(vol.Match(util.TIME_REGEXP), util.parse_time_string)
RULE_TIME_VALIDATOR = vol.All(
vol.Match(
util.RULE_TIME_REGEXP, msg="correct format: [:[:]][{+-}d]"
),
util.parse_rule_time_string,
)
# This schema does no real validation and default value insertion,
# it just ensures a dictionary containing string keys and dictionary
# values is given.
DICTS_IN_DICT_SCHEMA = vol.Schema(
vol.All(lambda v: v or {}, {util.CONF_STR_KEY: vol.All(lambda v: v or {}, dict)})
)
def parse_watched_entity_str(value: str) -> T.Dict[str, T.Any]:
"""Parses the alternative :: strings."""
vol.Optional(ATTR_ENVIRONMENT): {vol.Match(r"\w*"): vol.Coerce(str)},
vol.Optional(ATTR_PRIVILEGED): [vol.In(PRIVILEGED_ALL)],
vol.Optional(ATTR_APPARMOR, default=True): vol.Boolean(),
vol.Optional(ATTR_FULL_ACCESS, default=False): vol.Boolean(),
vol.Optional(ATTR_AUDIO, default=False): vol.Boolean(),
vol.Optional(ATTR_GPIO, default=False): vol.Boolean(),
vol.Optional(ATTR_DEVICETREE, default=False): vol.Boolean(),
vol.Optional(ATTR_KERNEL_MODULES, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_API, default=False): vol.Boolean(),
vol.Optional(ATTR_HASSIO_ROLE, default=ROLE_DEFAULT): vol.In(ROLE_ALL),
vol.Optional(ATTR_HOMEASSISTANT_API, default=False): vol.Boolean(),
vol.Optional(ATTR_STDIN, default=False): vol.Boolean(),
vol.Optional(ATTR_LEGACY, default=False): vol.Boolean(),
vol.Optional(ATTR_DOCKER_API, default=False): vol.Boolean(),
vol.Optional(ATTR_AUTH_API, default=False): vol.Boolean(),
vol.Optional(ATTR_SERVICES): [vol.Match(RE_SERVICE)],
vol.Optional(ATTR_DISCOVERY): [valid_discovery_service],
vol.Optional(ATTR_SNAPSHOT_EXCLUDE): [vol.Coerce(str)],
vol.Required(ATTR_OPTIONS): dict,
vol.Required(ATTR_SCHEMA): vol.Any(
vol.Schema(
{
vol.Coerce(str): vol.Any(
SCHEMA_ELEMENT,
[
vol.Any(
SCHEMA_ELEMENT,
{
vol.Coerce(str): vol.Any(
SCHEMA_ELEMENT, [SCHEMA_ELEMENT]
)
},
from homeassistant.components.notify import (
ATTR_TARGET,
PLATFORM_SCHEMA,
BaseNotificationService,
)
from homeassistant.const import CONF_API_KEY, CONF_SENDER
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_API_KEY): cv.string,
vol.Optional(CONF_SENDER, default="HA"): vol.All(
cv.string, vol.Match(r"^(\+?[1-9]\d{1,14}|\w{1,11})$")
),
}
)
def get_service(hass, config, discovery_info=None):
"""Get the MessageBird notification service."""
client = messagebird.Client(config[CONF_API_KEY])
try:
# validates the api key
client.balance()
except messagebird.client.ErrorException:
_LOGGER.error("The specified MessageBird API key is invalid")
return None
return MessageBirdNotificationService(config.get(CONF_SENDER), client)
Required('width'): All(int, Range(min=10)),
Required('height'): All(int, Range(min=10)),
Required('url'): All(str, Url()),
})
schema_screenshots = Schema({
Required('default', default=False): All(bool),
Required('source-image'): All(dict, Length(min=1), schema_image),
'thumbnails': All(list, Length(min=1), [schema_image]),
'caption': All(dict, Length(min=1), schema_translated),
})
schema_icon = Schema({
'stock': All(str, Length(min=1)),
'cached': All(str, Match(r'.*[.].*$'), msg='Icon entry is missing filename or extension'),
'local': All(str, Match(r'^[\'"]?(?:/[^/]+)*[\'"]?$'), msg='Icon entry should be an absolute path'),
'remote': All(str, Url()),
})
schema_url = Schema({
Any('homepage',
'bugtracker',
'faq',
'help',
'donation'): All(str, Url()),
})
schema_component = Schema({
Required('Type'): All(str, Any('generic', 'desktop-app', 'web-app', 'addon', 'codec', 'inputmethod', 'font')),
Required('ID'): All(str, Length(min=1)),
Required('Name'): All(dict, Length(min=1), schema_translated),
Required('Packages'): All(list, [str], Length(min=1)),
return Schema({
Required('schema'): 1,
Required('bugzilla'): {
Required('product'): All(str, Length(min=1)),
Required('component'): All(str, Length(min=1)),
},
'origin': {
Required('name'): All(str, Length(min=1)),
Required('description'): All(str, Length(min=1)),
Required('url'): FqdnUrl(),
Required('license'): Msg(License(), msg='Unsupported License'),
Required('release'): All(str, Length(min=1)),
},
'vendoring': {
Required('url'): FqdnUrl(),
Required('revision'): Match(r'^[a-fA-F0-9]{12,40}$'),
'patches': Unique([str]),
'keep': Unique([str]),
'exclude': Unique([str]),
'include': Unique([str]),
'run_after': Unique([str]),
},
CONF_HDR_TONE_REMAPPING = "hdr_tone_remapping"
SERVICE_HDR_TONE_REMAPPING_SCHEMA = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_ids,
vol.Required(CONF_HDR_TONE_REMAPPING): vol.All(
vol.Coerce(int), vol.Range(min=0, max=1)
),
}
)
SERVICE_AMBIENT_COLOR = "set_ambient_color"
CONF_AMBIENT_COLOR = "color"
SERVICE_AMBIENT_COLOR_SCHEMA = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_ids,
vol.Required(CONF_AMBIENT_COLOR): vol.Match(r"^#(?:[0-9a-fA-F]{3}){1,2}$"),
}
)
SERVICE_RESTART = "restart_device"
SERVICE_RESTART_SCHEMA = vol.Schema({vol.Required(ATTR_ENTITY_ID): cv.entity_ids})
SERVICE_TO_ATTRIBUTE = {
SERVICE_MODE: {
"attribute": "mode",
"schema": SERVICE_MODE_SCHEMA,
"param": CONF_MODE,
},
SERVICE_HDMI_SOURCE: {
"attribute": "hdmi_input",
"schema": SERVICE_HDMI_SOURCE_SCHEMA,
"param": CONF_HDMI_SOURCE,
CONF_TARGET_TEMP_STEP = 'target_temp_step'
CONF_TIMEOUT = 'update_timeout'
CONF_SYNC_CLOCK_TIME_ONCE_PER_DAY = 'sync_clock_time_per_day'
CONF_GETCURERNTTEMP_FROM_SENSOR = "current_temp_from_sensor_override"
CONF_DNSHOST = 'host_dns'
CONF_HOST_PORT = 'host_port'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_DEVICES, default={}): {
cv.string: vol.Schema({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_DNSHOST): cv.string,
vol.Optional(CONF_HOST): vol.Match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$"),
vol.Optional(CONF_HOST_PORT, default=80): vol.Range(min=1, max=65535),
vol.Required(CONF_MAC): vol.Match("(?:[0-9a-fA-F]:?){12}"),
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): vol.Range(min=0, max=99),
vol.Optional(CONF_TARGET_TEMP, default=DEFAULT_TARGET_TEMP): vol.Range(min=5, max=99),
vol.Optional(CONF_TARGET_TEMP_STEP, default=DEFAULT_TARGET_TEMP_STEP): vol.Coerce(float),
vol.Optional(CONF_SYNC_CLOCK_TIME_ONCE_PER_DAY, default=DEFAULT_CONF_SYNC_CLOCK_TIME_ONCE_PER_DAY): cv.boolean,
vol.Optional(CONF_GETCURERNTTEMP_FROM_SENSOR, default=-1): vol.Range(min=-1, max=1),
})
},
})
async def devices_from_config(domain_config, hass):
hass_devices = []
for device_id, config in domain_config[CONF_DEVICES].items():
# Get device-specific parameters
name = config.get(CONF_NAME)
dns_name = config.get(CONF_DNSHOST)
ip_addr = config.get(CONF_HOST)