Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
DEFAULT_PAYLOAD_OFF = "OFF"
SCAN_INTERVAL = timedelta(seconds=60)
CONF_COMMAND_TIMEOUT = "command_timeout"
DEFAULT_TIMEOUT = 15
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_COMMAND): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PAYLOAD_OFF, default=DEFAULT_PAYLOAD_OFF): cv.string,
vol.Optional(CONF_PAYLOAD_ON, default=DEFAULT_PAYLOAD_ON): cv.string,
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_COMMAND_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Command line Binary Sensor."""
name = config.get(CONF_NAME)
command = config.get(CONF_COMMAND)
payload_off = config.get(CONF_PAYLOAD_OFF)
payload_on = config.get(CONF_PAYLOAD_ON)
device_class = config.get(CONF_DEVICE_CLASS)
value_template = config.get(CONF_VALUE_TEMPLATE)
command_timeout = config.get(CONF_COMMAND_TIMEOUT)
if value_template is not None:
value_template.hass = hass
data = CommandSensorData(hass, command, command_timeout)
ACTIVE_NAME = "Energy Usage"
DAILY_NAME = "Daily Energy Usage"
ACTIVE_TYPE = "active"
DAILY_TYPE = "daily"
ICON = "mdi:flash"
MIN_TIME_BETWEEN_DAILY_UPDATES = timedelta(seconds=150)
MIN_TIME_BETWEEN_ACTIVE_UPDATES = timedelta(seconds=10)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_API_SECRET): cv.string,
vol.Optional(CONF_SENSOR_ID): cv.string,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Neurio sensor."""
api_key = config.get(CONF_API_KEY)
api_secret = config.get(CONF_API_SECRET)
sensor_id = config.get(CONF_SENSOR_ID)
data = NeurioData(api_key, api_secret, sensor_id)
@Throttle(MIN_TIME_BETWEEN_DAILY_UPDATES)
def update_daily():
"""Update the daily power usage."""
data.get_daily_usage()
CONF_SET_SPEED_ACTION = "set_speed"
CONF_SET_OSCILLATING_ACTION = "set_oscillating"
CONF_SET_DIRECTION_ACTION = "set_direction"
_VALID_STATES = [STATE_ON, STATE_OFF]
_VALID_OSC = [True, False]
_VALID_DIRECTIONS = [DIRECTION_FORWARD, DIRECTION_REVERSE]
FAN_SCHEMA = vol.Schema(
{
vol.Optional(CONF_FRIENDLY_NAME): cv.string,
vol.Required(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_SPEED_TEMPLATE): cv.template,
vol.Optional(CONF_OSCILLATING_TEMPLATE): cv.template,
vol.Optional(CONF_DIRECTION_TEMPLATE): cv.template,
vol.Optional(CONF_AVAILABILITY_TEMPLATE): cv.template,
vol.Required(CONF_ON_ACTION): cv.SCRIPT_SCHEMA,
vol.Required(CONF_OFF_ACTION): cv.SCRIPT_SCHEMA,
vol.Optional(CONF_SET_SPEED_ACTION): cv.SCRIPT_SCHEMA,
vol.Optional(CONF_SET_OSCILLATING_ACTION): cv.SCRIPT_SCHEMA,
vol.Optional(CONF_SET_DIRECTION_ACTION): cv.SCRIPT_SCHEMA,
vol.Optional(
CONF_SPEED_LIST, default=[SPEED_LOW, SPEED_MEDIUM, SPEED_HIGH]
): cv.ensure_list,
vol.Optional(CONF_ENTITY_ID): cv.entity_ids,
}
)
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend(
{vol.Required(CONF_FANS): cv.schema_with_slug_keys(FAN_SCHEMA)}
)
# Type of network
CONF_BAUDRATE = 'baudrate'
CONF_BYTESIZE = 'bytesize'
CONF_STOPBITS = 'stopbits'
CONF_PARITY = 'parity'
SERIAL_SCHEMA = {
vol.Required(CONF_BAUDRATE): cv.positive_int,
vol.Required(CONF_BYTESIZE): vol.Any(5, 6, 7, 8),
vol.Required(CONF_METHOD): vol.Any('rtu', 'ascii'),
vol.Required(CONF_PORT): cv.string,
vol.Required(CONF_PARITY): vol.Any('E', 'O', 'N'),
vol.Required(CONF_STOPBITS): vol.Any(1, 2),
vol.Required(CONF_TYPE): 'serial',
vol.Optional(CONF_TIMEOUT, default=3): cv.socket_timeout,
}
ETHERNET_SCHEMA = {
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT): cv.port,
vol.Required(CONF_TYPE): vol.Any('tcp', 'udp', 'rtuovertcp'),
vol.Optional(CONF_TIMEOUT, default=3): cv.socket_timeout,
}
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Any(SERIAL_SCHEMA, ETHERNET_SCHEMA)
}, extra=vol.ALLOW_EXTRA)
_LOGGER = logging.getLogger(__name__)
cv.GenerateID(): cv.declare_variable_id(RemoteReceiver),
vol.Optional(CONF_JVC): cv.Schema({
vol.Required(CONF_DATA): cv.hex_uint32_t,
}),
vol.Optional(CONF_LG): cv.Schema({
vol.Required(CONF_DATA): cv.hex_uint32_t,
vol.Optional(CONF_NBITS, default=28): cv.one_of(28, 32, int=True),
}),
vol.Optional(CONF_NEC): cv.Schema({
vol.Required(CONF_ADDRESS): cv.hex_uint16_t,
vol.Required(CONF_COMMAND): cv.hex_uint16_t,
}),
vol.Optional(CONF_SAMSUNG): cv.Schema({
vol.Required(CONF_DATA): cv.hex_uint32_t,
}),
vol.Optional(CONF_SONY): cv.Schema({
vol.Required(CONF_DATA): cv.hex_uint32_t,
vol.Optional(CONF_NBITS, default=12): cv.one_of(12, 15, 20, int=True),
}),
vol.Optional(CONF_PANASONIC): cv.Schema({
vol.Required(CONF_ADDRESS): cv.hex_uint16_t,
vol.Required(CONF_COMMAND): cv.hex_uint32_t,
}),
vol.Optional(CONF_RC5): cv.Schema({
vol.Required(CONF_ADDRESS): vol.All(cv.hex_int, vol.Range(min=0, max=0x1F)),
vol.Required(CONF_COMMAND): vol.All(cv.hex_int, vol.Range(min=0, max=0x3F)),
}),
vol.Optional(CONF_RAW): validate_raw,
vol.Optional(CONF_RC_SWITCH_RAW): RC_SWITCH_RAW_SCHEMA,
vol.Optional(CONF_RC_SWITCH_TYPE_A): RC_SWITCH_TYPE_A_SCHEMA,
vol.Optional(CONF_RC_SWITCH_TYPE_B): RC_SWITCH_TYPE_B_SCHEMA,
vol.Optional(CONF_RC_SWITCH_TYPE_C): RC_SWITCH_TYPE_C_SCHEMA,
SENSOR_TYPES = {
'application_name': ['Application name'],
'body': ['Body'],
'notification_id': ['Notification ID'],
'notification_tag': ['Notification tag'],
'package_name': ['Package name'],
'receiver_email': ['Receiver email'],
'sender_email': ['Sender email'],
'source_device_iden': ['Sender device ID'],
'title': ['Title'],
'type': ['Type'],
}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_API_KEY): cv.string,
vol.Optional(CONF_MONITORED_CONDITIONS, default=['title', 'body']):
vol.All(cv.ensure_list, vol.Length(min=1), [vol.In(SENSOR_TYPES)]),
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Pushbullet Sensor platform."""
from pushbullet import PushBullet
from pushbullet import InvalidKeyError
try:
pushbullet = PushBullet(config.get(CONF_API_KEY))
except InvalidKeyError:
_LOGGER.error("Wrong API key for Pushbullet supplied")
return False
pbprovider = PushBulletNotificationProvider(pushbullet)
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(
CONF_PAYLOAD_CLEAN_SPOT, default=DEFAULT_PAYLOAD_CLEAN_SPOT
): cv.string,
vol.Optional(
CONF_PAYLOAD_LOCATE, default=DEFAULT_PAYLOAD_LOCATE
): cv.string,
vol.Optional(
CONF_PAYLOAD_RETURN_TO_BASE, default=DEFAULT_PAYLOAD_RETURN_TO_BASE
): cv.string,
vol.Optional(CONF_PAYLOAD_START, default=DEFAULT_PAYLOAD_START): cv.string,
vol.Optional(CONF_PAYLOAD_PAUSE, default=DEFAULT_PAYLOAD_PAUSE): cv.string,
vol.Optional(CONF_PAYLOAD_STOP, default=DEFAULT_PAYLOAD_STOP): cv.string,
vol.Optional(CONF_SEND_COMMAND_TOPIC): mqtt.valid_publish_topic,
vol.Optional(CONF_SET_FAN_SPEED_TOPIC): mqtt.valid_publish_topic,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_STATE_TOPIC): mqtt.valid_publish_topic,
vol.Optional(
CONF_SUPPORTED_FEATURES, default=DEFAULT_SERVICE_STRINGS
): vol.All(cv.ensure_list, [vol.In(STRING_TO_SERVICE.keys())]),
vol.Optional(CONF_UNIQUE_ID): cv.string,
vol.Optional(CONF_COMMAND_TOPIC): mqtt.valid_publish_topic,
vol.Optional(CONF_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
}
)
.extend(mqtt.MQTT_AVAILABILITY_SCHEMA.schema)
.extend(mqtt.MQTT_JSON_ATTRS_SCHEMA.schema)
.extend(MQTT_VACUUM_SCHEMA.schema)
)
async def async_setup_entity_state(
list(_NETWORK_MON_COND.keys()) + \
list(_DRIVE_MON_COND.keys()) + \
list(_VOLUME_MON_COND.keys())
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_VERIFY_SSL, default=True): cv.boolean,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_MONITORED_CONDITIONS):
vol.All(cv.ensure_list, [vol.In(_MONITORED_CONDITIONS)]),
vol.Optional(CONF_NICS): cv.ensure_list,
vol.Optional(CONF_DRIVES): cv.ensure_list,
vol.Optional(CONF_VOLUMES): cv.ensure_list,
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the QNAP NAS sensor."""
api = QNAPStatsAPI(config)
api.update()
if not api.data:
hass.components.persistent_notification.create(
'Error: Failed to set up QNAP sensor.<br>'
'Check the logs for additional information. '
'You will need to restart hass after fixing.',
title=NOTIFICATION_TITLE,
notification_id=NOTIFICATION_ID)
),
vol.Optional(CONF_SUNSET_CT, default=3000): vol.All(
vol.Coerce(int), vol.Range(min=1000, max=40000)
),
vol.Optional(CONF_STOP_CT, default=1900): vol.All(
vol.Coerce(int), vol.Range(min=1000, max=40000)
),
vol.Optional(CONF_BRIGHTNESS): vol.All(
vol.Coerce(int), vol.Range(min=0, max=255)
),
vol.Optional(CONF_DISABLE_BRIGHTNESS_ADJUST): cv.boolean,
vol.Optional(CONF_MODE, default=DEFAULT_MODE): vol.Any(
MODE_XY, MODE_MIRED, MODE_RGB
),
vol.Optional(CONF_INTERVAL, default=30): cv.positive_int,
vol.Optional(ATTR_TRANSITION, default=30): VALID_TRANSITION,
}
)
async def async_set_lights_xy(hass, lights, x_val, y_val, brightness, transition):
"""Set color of array of lights."""
for light in lights:
if is_on(hass, light):
service_data = {ATTR_ENTITY_ID: light}
if x_val is not None and y_val is not None:
service_data[ATTR_XY_COLOR] = [x_val, y_val]
if brightness is not None:
service_data[ATTR_BRIGHTNESS] = brightness
service_data[ATTR_WHITE_VALUE] = brightness
if transition is not None:
service_data[ATTR_TRANSITION] = transition
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.components import ecobee
from homeassistant.components.notify import (
BaseNotificationService, PLATFORM_SCHEMA)
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['ecobee']
CONF_INDEX = 'index'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_INDEX, default=0): cv.positive_int,
})
def get_service(hass, config, discovery_info=None):
"""Get the Ecobee notification service."""
index = config.get(CONF_INDEX)
return EcobeeNotificationService(index)
class EcobeeNotificationService(BaseNotificationService):
"""Implement the notification service for the Ecobee thermostat."""
def __init__(self, thermostat_index):
"""Initialize the service."""
self.thermostat_index = thermostat_index