Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_sockets(self):
sockets = []
for addr in zeroconf.get_all_addresses():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set the time-to-live for messages for local network
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL,
self.SSDP_MX)
sock.bind((addr, 0))
sockets.append(sock)
except socket.error:
pass
return sockets
def scan(timeout=DISCOVER_TIMEOUT):
"""Send a message over the network to discover uPnP devices.
Inspired by Crimsdings
https://github.com/crimsdings/ChromeCast/blob/master/cc_discovery.py
Protocol explanation:
https://embeddedinn.wordpress.com/tutorials/upnp-device-architecture/
"""
ssdp_requests = ssdp_request(ST_ALL), ssdp_request(ST_ROOTDEVICE)
stop_wait = datetime.now() + timedelta(seconds=timeout)
sockets = []
for addr in zeroconf.get_all_addresses():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Set the time-to-live for messages for local network
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL,
SSDP_MX)
sock.bind((addr, 0))
sockets.append(sock)
except socket.error:
pass
entries = {}
for sock in [s for s in sockets]:
try:
for req in ssdp_requests:
sock.sendto(req, SSDP_TARGET)
str(timeout / 1000.0)
)
)
return
id = _id_from_name(name)
ip = socket.inet_ntoa(info.address)
base_url = "http://{ip}:{port}/".format(ip=ip, port=info.port)
zeroconf_service = ZEROCONF_STATE.get("service")
is_self = zeroconf_service and zeroconf_service.id == id
instance = {
"id": id,
"ip": ip,
"local": ip in get_all_addresses(),
"port": info.port,
"host": info.server.strip("."),
"base_url": base_url,
"self": is_self,
}
device_info = {
bytes.decode(key): json.loads(val) for (key, val) in info.properties.items()
}
instance.update(device_info)
self.instances[id] = instance
if not is_self:
try:
try:
if preferred_database is not None:
database = provider.server.databases[preferred_database]
else:
database = provider.server.databases.values()[0]
except LookupError:
# The server may not have any databases (yet).
return
# The IP 0.0.0.0 tells this server to bind to all interfaces. However,
# Bonjour advertises itself to others, so others need an actual IP.
# There is definately a better way, but it works.
if daap_server.ip == "0.0.0.0":
addresses = []
for address in zeroconf.get_all_addresses(socket.AF_INET):
if not address == "127.0.0.1":
addresses.append(socket.inet_aton(address))
else:
addresses = [socket.inet_aton(daap_server.ip)]
# Determine machine ID and database ID, depending on the provider. If
# the provider has no support for persistent IDs, generate a random
# ID.
if provider.supports_persistent_id:
machine_id = hex(provider.server.persistent_id)
database_id = hex(database.persistent_id)
else:
machine_id = hex(generate_persistent_id())
database_id = hex(generate_persistent_id())
# iTunes 11+ uses more properties, but this seems to be sufficient.