Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_LOGGER = logging.getLogger(__name__)
DEPENDENCIES = ['mqtt']
CONF_RELAY = 'relay'
COMMAND_FORMAT = '{}/{}'
DEFAULT_NAME = 'Home MQTT Switch'
DEFAULT_OPTIMISTIC = False
PLATFORM_SCHEMA = mqtt.MQTT_RW_PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_ICON): cv.icon,
vol.Required(CONF_RELAY): cv.positive_int,
vol.Required(CONF_PAYLOAD_OFF): cv.positive_int,
vol.Required(CONF_PAYLOAD_ON): cv.positive_int,
vol.Optional(CONF_OPTIMISTIC, default=DEFAULT_OPTIMISTIC): cv.boolean,
}).extend(mqtt.MQTT_AVAILABILITY_SCHEMA.schema)
async def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up a MQTT Switch."""
if discovery_info is not None:
config = PLATFORM_SCHEMA(discovery_info)
async_add_devices([HomeMqttSwitch(
config.get(CONF_NAME),
config.get(CONF_ICON),
COMMAND_FORMAT.format(config.get(CONF_COMMAND_TOPIC), config.get(CONF_RELAY)),
config.get(CONF_QOS),
{
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = 'Atag One Thermostat'
DEFAULT_TIMEOUT = 30
BASE_URL = 'http://{0}:{1}{2}'
MAC_ADDRESS = '01:23:45:67:89:01'
DEFAULT_MIN_TEMP = 4
DEFAULT_MAX_TEMP = 27
SUPPORT_FLAGS = (SUPPORT_TARGET_TEMPERATURE)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): config_validation.string,
vol.Optional(CONF_HOST): config_validation.string,
vol.Optional(CONF_PORT, default=10000): config_validation.positive_int,
})
HA_PRESET_TO_ATAG = {
PRESET_AWAY: 3,
PRESET_ECO: 2,
PRESET_HOME: 1
}
ATAG_PRESET_TO_HA = {v: k for k, v in HA_PRESET_TO_ATAG.items()}
# jsonPayload data templates
# update payload need to be in exact order. So, using string instead of json.dumps
UPDATE_MODE = '''{{
"update_message":{{
"seqnr":0,
"account_auth":{{
"user_account":"",
DEFAULT_NAME = 'broadlink'
STATE_CLOSING = 'closing'
STATE_OFFLINE = 'offline'
STATE_OPENING = 'opening'
STATE_STOPPED = 'stopped'
DEFAULT_TIMEOUT = 10
DEFAULT_RETRY = 3
COVER_SCHEMA = vol.Schema({
vol.Optional(CONF_COMMAND_OPEN, default=None): cv.string,
vol.Optional(CONF_COMMAND_CLOSE, default=None): cv.string,
vol.Optional(CONF_COMMAND_STOP, default=None): cv.string,
vol.Optional(CONF_TRIGGER_TIME, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_FRIENDLY_NAME, default=DEFAULT_NAME): cv.string
})
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_IP_ADDRESS): cv.string,
vol.Required(CONF_MAC): cv.string,
vol.Required(CONF_COVERS): vol.Schema({cv.slug: COVER_SCHEMA}),
vol.Optional(CONF_FRIENDLY_NAME, default=DEFAULT_NAME): cv.string
})
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Set up the broadlink covers."""
covers = []
devices = config.get(CONF_COVERS)
ip_addr = config.get(CONF_IP_ADDRESS)
mac_addr = config.get(CONF_MAC)
REQUIREMENTS = []
_LOGGER = logging.getLogger(__name__)
CONF_KISMET_SERVER = 'host'
CONF_KISMET_PORT = 'port'
CONF_KISMET_USER = 'user'
CONF_KISMET_PASS = 'pass'
CONF_SSIDS = 'ssids'
CONF_CLIENTS = 'clients'
CONF_LOCAL_LATITUDE = 'latitude'
CONF_LOCAL_LONGITUDE = 'longitude'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_KISMET_SERVER, default='127.0.0.1'): cv.string,
vol.Required(CONF_KISMET_PORT, default=2501): cv.positive_int,
vol.Required(CONF_KISMET_USER, default='kismet'): cv.string,
vol.Required(CONF_KISMET_PASS, default='changeme'): cv.string,
vol.Optional(CONF_SSIDS, default=[]): cv.ensure_list,
vol.Optional(CONF_CLIENTS, default=[]): cv.ensure_list,
vol.Optional(CONF_LOCAL_LATITUDE, default=0.0): cv.latitude,
vol.Optional(CONF_LOCAL_LONGITUDE, default=0.0): cv.longitude,
#vol.Required(CONF_SCAN_INTERVAL): cv.time_period_seconds
#vol.Required(CONF_SCAN_INTERVAL): cv.positive_int
#vol.Required(CONF_SCAN_INTERVAL): cv.timedelta
})
def get_scanner(hass, config):
"""Validate the configuration and return a Nmap scanner."""
_LOGGER.debug("Called get_scanner")
return KismetDeviceScanner(hass, config[DOMAIN])
CONF_PINS = "pins"
CONF_TYPE = "digital"
CONF_NEGATE = "negate"
CONF_INITIAL = "initial"
PIN_SCHEMA = vol.Schema(
{
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_INITIAL, default=False): cv.boolean,
vol.Optional(CONF_NEGATE, default=False): cv.boolean,
}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{vol.Required(CONF_PINS, default={}): vol.Schema({cv.positive_int: PIN_SCHEMA})}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Arduino platform."""
# Verify that Arduino board is present
if arduino.BOARD is None:
_LOGGER.error("A connection has not been made to the Arduino board")
return False
pins = config.get(CONF_PINS)
switches = []
for pinnum, pin in pins.items():
switches.append(ArduinoSwitch(pinnum, pin))
add_entities(switches)
"Bad Video": None,
"PIR Alarm": "motion",
"Face Detection": "motion",
"Scene Change Detection": "motion",
"I/O": None,
"Unattended Baggage": "motion",
"Attended Baggage": "motion",
"Recording Failure": None,
"Exiting Region": "motion",
"Entering Region": "motion",
}
CUSTOMIZE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_IGNORED, default=DEFAULT_IGNORED): cv.boolean,
vol.Optional(CONF_DELAY, default=DEFAULT_DELAY): cv.positive_int,
}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_NAME): cv.string,
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_CUSTOMIZE, default={}): vol.Schema(
{cv.string: CUSTOMIZE_SCHEMA}
),
}
)
CONF_FS_TIMEOUT = "failsafe_timeout"
CONF_FS_FALLBACK = "failsafe_fallback"
CONF_FS_PERSIST = "failsafe_persist"
CONF_FS_INTERVAL = "refresh_interval"
MAX_POLLING_INTERVAL = 5 # in seconds
MAX_FAST_POLLING_COUNT = 4
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_RFID, default="00845500"): cv.string,
vol.Optional(CONF_FS, default=False): cv.boolean,
vol.Optional(CONF_FS_TIMEOUT, default=30): cv.positive_int,
vol.Optional(CONF_FS_FALLBACK, default=6): cv.positive_int,
vol.Optional(CONF_FS_PERSIST, default=0): cv.positive_int,
vol.Optional(CONF_FS_INTERVAL, default=5): cv.positive_int,
}
)
},
extra=vol.ALLOW_EXTRA,
)
_SERVICE_MAP = {
"request_data": "request_data",
"set_energy": "async_set_energy",
"set_current": "async_set_current",
"authorize": "async_start",
"deauthorize": "async_stop",
"enable": "async_enable_ev",
DEFAULT_TRANSITION = 60
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_MIN_CT, default=DEFAULT_MIN_CT):
vol.All(vol.Coerce(int), vol.Range(min=1000, max=10000)),
vol.Optional(CONF_MAX_CT, default=DEFAULT_MAX_CT):
vol.All(vol.Coerce(int), vol.Range(min=1000, max=10000)),
vol.Optional(CONF_SUNRISE_OFFSET): cv.time_period_str,
vol.Optional(CONF_SUNSET_OFFSET): cv.time_period_str,
vol.Optional(CONF_SUNRISE_TIME): cv.time,
vol.Optional(CONF_SUNSET_TIME): cv.time,
vol.Optional(CONF_LATITUDE): cv.latitude,
vol.Optional(CONF_LONGITUDE): cv.longitude,
vol.Optional(CONF_ELEVATION): float,
vol.Optional(CONF_INTERVAL, default=DEFAULT_INTERVAL): cv.positive_int,
vol.Optional(ATTR_TRANSITION, default=DEFAULT_TRANSITION): VALID_TRANSITION
}),
}, extra=vol.ALLOW_EXTRA)
def setup(hass, config):
"""Set up the Circadian Lighting component."""
conf = config[DOMAIN]
min_colortemp = conf.get(CONF_MIN_CT)
max_colortemp = conf.get(CONF_MAX_CT)
sunrise_offset = conf.get(CONF_SUNRISE_OFFSET)
sunset_offset = conf.get(CONF_SUNSET_OFFSET)
sunrise_time = conf.get(CONF_SUNRISE_TIME)
sunset_time = conf.get(CONF_SUNSET_TIME)
latitude = conf.get(CONF_LATITUDE, hass.config.latitude)
longitude = conf.get(CONF_LONGITUDE, hass.config.longitude)
{vol.Required(ATTR_ENABLED): cv.boolean}
)
ENABLED_SIREN_SCHEMA = make_entity_service_schema(
{vol.Required(ATTR_ENABLED): cv.boolean}
)
DIAL_CONFIG_SCHEMA = make_entity_service_schema(
{
vol.Optional(ATTR_MIN_VALUE): vol.Coerce(int),
vol.Optional(ATTR_MAX_VALUE): vol.Coerce(int),
vol.Optional(ATTR_MIN_POSITION): cv.positive_int,
vol.Optional(ATTR_MAX_POSITION): cv.positive_int,
vol.Optional(ATTR_ROTATION): vol.In(ROTATIONS),
vol.Optional(ATTR_SCALE): vol.In(SCALES),
vol.Optional(ATTR_TICKS): cv.positive_int,
}
)
DIAL_STATE_SCHEMA = make_entity_service_schema(
{
vol.Required(ATTR_VALUE): vol.Coerce(int),
vol.Optional(ATTR_LABELS): cv.ensure_list(cv.string),
}
)
WINK_COMPONENTS = [
"binary_sensor",
"sensor",
"light",
"switch",
"lock",
from homeassistant.helpers.event import async_track_point_in_utc_time
import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__)
CONF_BUFFER_SIZE = 'buffer'
CONF_IMAGE_FIELD = 'field'
DEFAULT_NAME = "Push Camera"
ATTR_FILENAME = 'filename'
ATTR_LAST_TRIP = 'last_trip'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_BUFFER_SIZE, default=1): cv.positive_int,
vol.Optional(CONF_TIMEOUT, default=timedelta(seconds=5)): vol.All(
cv.time_period, cv.positive_timedelta),
vol.Optional(CONF_IMAGE_FIELD, default='image'): cv.string,
})
async def async_setup_platform(hass, config, async_add_devices,
discovery_info=None):
"""Set up the Push Camera platform."""
cameras = [PushCamera(config[CONF_NAME],
config[CONF_BUFFER_SIZE],
config[CONF_TIMEOUT])]
hass.http.register_view(CameraPushReceiver(cameras,
config[CONF_IMAGE_FIELD]))