Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'seealso': Any(None, seealso_schema),
'requirements': list_string_types,
'todo': Any(None, list_string_types, *string_types),
'options': Any(None, *list_dict_option_schema),
'extends_documentation_fragment': Any(list_string_types, *string_types)
}
if version_added:
doc_schema_dict[Required('version_added')] = Any(float, *string_types)
else:
# Optional
doc_schema_dict['version_added'] = Any(float, *string_types)
if deprecated_module:
deprecation_required_scheme = {
Required('deprecated'): Any(deprecation_schema),
}
doc_schema_dict.update(deprecation_required_scheme)
return Schema(
doc_schema_dict,
extra=PREVENT_EXTRA
)
Required('url'): Http,
'headers': {String: String},
'payload': Any(String, None),
'timeout': Any(Coerce(float), None),
'allow_redirects': Any(bool, None)
},
'cname': Any(String, None),
'countdown': Any(All(Coerce(float), v.Range(.0)), None),
'eta': Any(Coerce(DateTime), None),
'schedule': Any(Coerce(Schedule), None),
Any('on_success', 'on_failure', 'on_complete'):
Any('__report__', Http, NestedSchema('add_task_form'), None)
})
identifier_form = Schema(
Any(
All(v.Replace('^id:', ''), Coerce(int),
v.Range(max=2 ** 63 - 1), Coerce(IdentifierKind('id'))),
All(v.Match('^uuid:'), v.Replace('^uuid:', ''), Coerce(UUID),
Coerce(str), Coerce(IdentifierKind('uuid'))),
All(v.Match('^cname:'), v.Replace('^cname:', ''),
v.Length(min=3, max=96), Coerce(IdentifierKind('cname')))
)
SIGNAL_REMOVE_SENSOR,
)
from .device import Device
NOTIFICATION_ID = "upnp_notification"
NOTIFICATION_TITLE = "UPnP/IGD Setup"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Optional(CONF_ENABLE_PORT_MAPPING, default=False): cv.boolean,
vol.Optional(CONF_ENABLE_SENSORS, default=True): cv.boolean,
vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
vol.Optional(CONF_PORTS): vol.Schema(
{vol.Any(CONF_HASS, cv.port): vol.Any(CONF_HASS, cv.port)}
),
}
)
},
extra=vol.ALLOW_EXTRA,
)
def _substitute_hass_ports(ports, hass_port=None):
"""
Substitute 'hass' for the hass_port.
This triggers a warning when hass_port is None.
"""
ports = ports.copy()
long,
Doc('vcf_id', 'The ID of the Run this comment is associated with.'):
long,
Doc('sample_name', 'The name of the sample this comment is on.'):
basestring,
Doc('contig', 'The contig of the variant this comment is on.'):
basestring,
Doc('position', 'The position of the variant this comment is on.'):
int,
Doc('reference', 'The reference of the variant this comment is on.'):
basestring,
Doc('alternates',
'The alternate allele of the variant this comment is on.'):
basestring,
Doc('comment_text', 'The text of the comment.'):
Any(basestring, None),
Doc('user_id', 'The ID of the User this comment is associated with.'):
Any(long, None),
Doc('created',
'The time at which the comment was created (in epoch time).'):
Coerce(to_epoch),
Doc('last_modified',
'The last modified time of the comment (in epoch time).'):
Coerce(to_epoch)
})
class CommentList(Resource):
require_auth = True
@marshal_with(CommentFields, envelope='comments')
def get(self, run_id):
"""Get a list of all comments."""
Any(basestring, None),
Doc('genotype_count', 'The number of genotypes (calls) made in this Run.'):
Any(long, None),
Doc('created_at', 'Timestamp when the Run was created.'):
datetime.datetime,
Doc('notes', 'Any ancillary notes, parameters, etc of the Run.'):
Any(basestring, None),
Doc('vcf_header', 'The raw VCF text of the header.'):
Any(basestring, None),
Doc('vcf_release', 'ENSEMBL Release for the reference'):
Any(int, None),
Doc('project_id', 'The internal ID of the Project this Run belongs to.'):
long,
Doc('normal_bam_id',
'The internal ID of the normal BAM associated with the Run.'):
Any(long, None),
Doc('tumor_bam_id',
'The internal ID of the normal BAM associated with the Run.'):
Any(long, None)
}
RunFields = Schema(run_fields)
# Used because on GET /runs/ we need some extra fields [for now].
thick_run_fields = run_fields.copy()
thick_run_fields.update({
'spec': type,
'contigs': list,
'normal_bam': object,
'tumor_bam': object
})
ThickRunFields = Schema(thick_run_fields)
_LOGGER = logging.getLogger(__name__)
DATA_SISYPHUS = "sisyphus"
DOMAIN = "sisyphus"
AUTODETECT_SCHEMA = vol.Schema({})
TABLE_SCHEMA = vol.Schema(
{vol.Required(CONF_NAME): cv.string, vol.Required(CONF_HOST): cv.string}
)
TABLES_SCHEMA = vol.Schema([TABLE_SCHEMA])
CONFIG_SCHEMA = vol.Schema(
{DOMAIN: vol.Any(AUTODETECT_SCHEMA, TABLES_SCHEMA)}, extra=vol.ALLOW_EXTRA
)
async def async_setup(hass, config):
"""Set up the sisyphus component."""
class SocketIONoiseFilter(logging.Filter):
"""Filters out excessively verbose logs from SocketIO."""
def filter(self, record):
if "waiting for connection" in record.msg:
return False
return True
logging.getLogger("socketIO-client").addFilter(SocketIONoiseFilter())
tables = hass.data.setdefault(DATA_SISYPHUS, {})
def remote_ssl_no_validate():
return {Optional('remote_ssl_no_validate', default=False): Any(bool, All(Any(*string_types), Boolean()))}
vol.Optional(CONF_ENTITY_NAME): cv.string,
vol.Optional(CONF_ENTITY_HIDDEN): cv.boolean
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_HOST_IP): cv.string,
vol.Optional(CONF_LISTEN_PORT, default=DEFAULT_LISTEN_PORT): cv.port,
vol.Optional(CONF_ADVERTISE_IP): cv.string,
vol.Optional(CONF_ADVERTISE_PORT): cv.port,
vol.Optional(CONF_UPNP_BIND_MULTICAST): cv.boolean,
vol.Optional(CONF_OFF_MAPS_TO_ON_DOMAINS): cv.ensure_list,
vol.Optional(CONF_EXPOSE_BY_DEFAULT): cv.boolean,
vol.Optional(CONF_EXPOSED_DOMAINS): cv.ensure_list,
vol.Optional(CONF_TYPE, default=DEFAULT_TYPE):
vol.Any(TYPE_ALEXA, TYPE_GOOGLE,TYPE_DINGDONG),
vol.Optional(CONF_AUTOLINK): cv.boolean,
vol.Optional(CONF_ENTITIES):
vol.Schema({cv.entity_id: CONFIG_ENTITY_SCHEMA}),
vol.Optional(CONF_NETMASK,default='255.255.255.0'): cv.string
})
}, extra=vol.ALLOW_EXTRA)
ATTR_EMULATED_HUE = 'emulated_hue'
ATTR_EMULATED_HUE_NAME = 'emulated_hue_name'
ATTR_EMULATED_HUE_HIDDEN = 'emulated_hue_hidden'
def setup(hass, yaml_config):
"""Activate the emulated_hue component."""
timezone = yaml_config.get("homeassistant").get("time_zone")
config = Config(hass, yaml_config.get(DOMAIN, {}), timezone)
def _validate_metadata_section(metadata):
any_str = Any(str, six.text_type)
schema = Schema({
Required(TemplateFields.NAME, msg=60): any_str,
TemplateFields.DESCRIPTION: any_str
})
return _validate_dict_schema(schema, metadata)
CIC_SERVICE = 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1'
UNITS = {
"Bytes": 1,
"KBytes": 1024,
"MBytes": 1024**2,
"GBytes": 1024**3,
}
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_ENABLE_PORT_MAPPING, default=False): cv.boolean,
vol.Optional(CONF_UNITS, default="MBytes"): vol.In(UNITS),
vol.Optional(CONF_LOCAL_IP): vol.All(ip_address, cv.string),
vol.Optional(CONF_PORTS):
vol.Schema({vol.Any(CONF_HASS, cv.positive_int): cv.positive_int})
}),
}, extra=vol.ALLOW_EXTRA)
async def async_setup(hass, config):
"""Register a port mapping for Home Assistant via UPnP."""
config = config[DOMAIN]
host = config.get(CONF_LOCAL_IP)
if host is None:
host = get_local_ip()
if host == '127.0.0.1':
_LOGGER.error(
'Unable to determine local IP. Add it to your configuration.')
return False