Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"major": 202,
"minor": 65535,
"offset": 1,
"programId": "102454523",
"rating": "No Rating",
"startTime": 1541876400,
"stationId": 3900947,
"title": "Using Home Assistant to automate your home",
"uniqueId": "12345",
"episodeTitle": "Configure DirecTV platform.",
}
WORKING_CONFIG = {
"media_player": {
"platform": "directv",
CONF_HOST: IP_ADDRESS,
CONF_NAME: "Main DVR",
CONF_PORT: DEFAULT_PORT,
CONF_DEVICE: DEFAULT_DEVICE,
}
}
@pytest.fixture
def client_dtv():
"""Fixture for a client device."""
mocked_dtv = MockDirectvClass("mock_ip")
mocked_dtv.attributes = RECORDING
mocked_dtv._standby = False
return mocked_dtv
vol.Optional(CONF_REPEAT): cv.positive_int,
}
)
SENSOR_SCHEMA = vol.Schema(
{
vol.Optional(CONF_NAME, default=DOMAIN): vol.Exclusive(cv.string, "sensors"),
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
}
)
REMOTE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_NAME, default=DOMAIN): vol.Exclusive(cv.string, "remotes"),
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Optional(CONF_SENSORS): [SENSOR_SCHEMA],
vol.Optional(CONF_REMOTES): [REMOTE_SCHEMA],
}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the SNMP sensor."""
name = config.get(CONF_NAME)
host = config.get(CONF_HOST)
port = config.get(CONF_PORT)
community = config.get(CONF_COMMUNITY)
baseoid = config.get(CONF_BASEOID)
unit = config.get(CONF_UNIT_OF_MEASUREMENT)
version = config.get(CONF_VERSION)
username = config.get(CONF_USERNAME)
authkey = config.get(CONF_AUTH_KEY)
authproto = config.get(CONF_AUTH_PROTOCOL)
privkey = config.get(CONF_PRIV_KEY)
privproto = config.get(CONF_PRIV_PROTOCOL)
accept_errors = config.get(CONF_ACCEPT_ERRORS)
default_value = config.get(CONF_DEFAULT_VALUE)
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None:
value_template.hass = hass
show_all_sources = config.get(CONF_SHOW_ALL_SOURCES)
timeout = config.get(CONF_TIMEOUT)
# Get config option for additional zones
zones = config.get(CONF_ZONES)
if zones is not None:
add_zones = {}
for entry in zones:
add_zones[entry[CONF_ZONE]] = entry.get(CONF_NAME)
else:
add_zones = None
# Start assignment of host and name
new_hosts = []
# 1. option: manual setting
if config.get(CONF_HOST) is not None:
host = config.get(CONF_HOST)
name = config.get(CONF_NAME)
new_hosts.append(NewHost(host=host, name=name))
# 2. option: discovery using netdisco
if discovery_info is not None:
host = discovery_info.get("host")
name = discovery_info.get("name")
new_hosts.append(NewHost(host=host, name=name))
# 3. option: discovery using denonavr library
if config.get(CONF_HOST) is None and discovery_info is None:
d_receivers = denonavr.discover()
# More than one receiver could be discovered by that method
for d_receiver in d_receivers:
host = d_receiver["host"]
vol.Required(ATTR_LOCAL_PATH): cv.string,
}
)
SERVICE_UPLOAD_SCHEMA = vol.Schema(
{
vol.Required(ATTR_ENTITY_ID): cv.entity_ids,
vol.Required(ATTR_DEVICE_PATH): cv.string,
vol.Required(ATTR_LOCAL_PATH): cv.string,
}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_DEVICE_CLASS, default=DEFAULT_DEVICE_CLASS): vol.In(
DEVICE_CLASSES
),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_ADBKEY): cv.isfile,
vol.Optional(CONF_ADB_SERVER_IP): cv.string,
vol.Optional(CONF_ADB_SERVER_PORT, default=DEFAULT_ADB_SERVER_PORT): cv.port,
vol.Optional(CONF_GET_SOURCES, default=DEFAULT_GET_SOURCES): cv.boolean,
vol.Optional(CONF_APPS, default=dict()): vol.Schema(
{cv.string: vol.Any(cv.string, None)}
),
vol.Optional(CONF_TURN_ON_COMMAND): cv.string,
vol.Optional(CONF_TURN_OFF_COMMAND): cv.string,
vol.Optional(CONF_STATE_DETECTION_RULES, default={}): vol.Schema(
{cv.string: ha_state_detection_rules_validator(vol.Invalid)}
def __init__(self, sensor, config):
"""Initialize a Worx Landroid sensor."""
self._state = None
self.sensor = sensor
self.host = config.get(CONF_HOST)
self.pin = config.get(CONF_PIN)
self.timeout = config.get(CONF_TIMEOUT)
self.allow_unreachable = config.get(CONF_ALLOW_UNREACHABLE)
self.url = 'http://{}/jsondata.cgi'.format(self.host)
hass.data[DOMAIN] = {}
discovery.listen(
hass,
SERVICE_HUE,
lambda service, discovery_info:
bridge_discovered(hass, service, discovery_info))
bridges = config.get(CONF_BRIDGES, [])
for bridge in bridges:
filename = bridge.get(CONF_FILENAME)
allow_unreachable = bridge.get(CONF_ALLOW_UNREACHABLE)
allow_in_emulated_hue = bridge.get(CONF_ALLOW_IN_EMULATED_HUE)
allow_hue_groups = bridge.get(CONF_ALLOW_HUE_GROUPS)
host = bridge.get(CONF_HOST)
if host is None:
host = _find_host_from_config(hass, filename)
if host is None:
_LOGGER.error("No host found in configuration")
return False
setup_bridge(host, hass, filename, allow_unreachable,
allow_in_emulated_hue, allow_hue_groups)
return True
async def async_step_import(self, user_input=None):
"""Occurs when an entry is setup through config."""
host = user_input[CONF_HOST]
return self.async_create_entry(title=format_title(host), data={CONF_HOST: host})
def setup(hass, config):
"""Set up for the Asterisk Voicemail box."""
conf = config.get(DOMAIN)
host = conf.get(CONF_HOST)
port = conf.get(CONF_PORT)
password = conf.get(CONF_PASSWORD)
hass.data[DOMAIN] = AsteriskData(hass, host, port, password, config)
return True
def __init__(self, config):
"""Initialize the scanner."""
self.host = config[CONF_HOST]
self.last_results = {}
# Check if the access point is accessible
response = self._make_request()
if not response.status_code == 200:
raise ConnectionError("Cannot connect to Linksys Access Point")