Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ServiceInfo passed in if it is not unique."""
now = currentTimeMillis()
nextTime = now
i = 0
while i < 3:
for record in self.cache.entriesWithName(info.type):
if (record.type == _TYPE_PTR and
not record.isExpired(now) and
record.alias == info.name):
if info.name.find('.') < 0:
info.name = '%s.[%s:%s].%s' % (info.name,
info.address, info.port, info.type)
self.checkService(info)
return
raise NonUniqueNameException
if now < nextTime:
self.wait(nextTime - now)
now = currentTimeMillis()
continue
out = DNSOutgoing(_FLAGS_QR_QUERY | _FLAGS_AA)
self.debug = out
out.addQuestion(DNSQuestion(info.type, _TYPE_PTR, _CLASS_IN))
out.addAuthorativeAnswer(DNSPointer(info.type, _TYPE_PTR,
_CLASS_IN, _DNS_TTL, info.name))
self.send(out)
i += 1
nextTime += _CHECK_TIME
ServiceInfo passed in if it is not unique."""
now = currentTimeMillis()
nextTime = now
i = 0
while i < 3:
for record in self.cache.entriesWithName(info.type):
if (record.type == _TYPE_PTR and
not record.isExpired(now) and
record.alias == info.name):
if info.name.find('.') < 0:
info.name = '%s.[%s:%s].%s' % (info.name,
info.address, info.port, info.type)
self.checkService(info)
return
raise NonUniqueNameException
if now < nextTime:
self.wait(nextTime - now)
now = currentTimeMillis()
continue
out = DNSOutgoing(_FLAGS_QR_QUERY | _FLAGS_AA)
self.debug = out
out.addQuestion(DNSQuestion(info.type, _TYPE_PTR, _CLASS_IN))
out.addAuthorativeAnswer(DNSPointer(info.type, _TYPE_PTR,
_CLASS_IN, _DNS_TTL, info.name))
self.send(out)
i += 1
nextTime += _CHECK_TIME
def discover_devices(args):
# Look for services and try to immediatly print the device when it is discovered
# (unless --sort is specified, then we will wait until discovery finishes
listener = Listener(print_when_discovered=not args.sort)
try:
browser = zeroconf.ServiceBrowser(
zeroconf.Zeroconf(), "_arduino._tcp.local.", listener
)
except (
zeroconf.BadTypeInNameException,
NotImplementedError,
OSError,
socket.error,
zeroconf.NonUniqueNameException,
) as exc:
print("! error when creating service discovery browser: {}".format(exc))
sys.exit(1)
try:
time.sleep(args.timeout)
except KeyboardInterrupt:
sys.exit(1)
try:
browser.zc.close()
except KeyboardInterrupt:
sys.exit(1)
if not listener.devices:
print("Nothing found!\n")
object. The CastListener object will contain information for the discovered
chromecasts. To stop discovery, call the stop_discovery method with the
ServiceBrowser object.
"""
listener = CastListener(add_callback, remove_callback)
service_browser = False
try:
service_browser = zeroconf.ServiceBrowser(
zeroconf.Zeroconf(), "_googlecast._tcp.local.", listener
)
except (
zeroconf.BadTypeInNameException,
NotImplementedError,
OSError,
socket.error,
zeroconf.NonUniqueNameException,
):
pass
return listener, service_browser
object. The CastListener object will contain information for the discovered
chromecasts. To stop discovery, call the stop_discovery method with the
ServiceBrowser object.
"""
listener = CastListener(add_callback, remove_callback)
service_browser = False
try:
service_browser = zeroconf.ServiceBrowser(
zeroconf.Zeroconf(), "_googlecast._tcp.local.", listener
)
except (
zeroconf.BadTypeInNameException,
NotImplementedError,
OSError,
socket.error,
zeroconf.NonUniqueNameException,
):
pass
return listener, service_browser
# attempt to create an mDNS service and register it on the network
try:
info = ServiceInfo(
SERVICE_TYPE,
name=".".join([id, SERVICE_TYPE]),
server=".".join([id, LOCAL_DOMAIN, ""]),
address=USE_IP_OF_OUTGOING_INTERFACE,
port=self.port,
properties=self.data,
)
ZEROCONF_STATE["zeroconf"].register_service(info, ttl=60)
self.info = info
except NonUniqueNameException:
# if there's a name conflict, append incrementing integer until no conflict
i += 1
id = "%s-%d" % (self.id, i)
if i > 100:
raise NonUniqueNameException()
self.id = id
return self
address=USE_IP_OF_OUTGOING_INTERFACE,
port=self.port,
properties=self.data,
)
ZEROCONF_STATE["zeroconf"].register_service(info, ttl=60)
self.info = info
except NonUniqueNameException:
# if there's a name conflict, append incrementing integer until no conflict
i += 1
id = "%s-%d" % (self.id, i)
if i > 100:
raise NonUniqueNameException()
self.id = id
return self
This method returns the CastListener object and the zeroconf ServiceBrowser
object. The CastListener object will contain information for the discovered
chromecasts. To stop discovery, call the stop_discovery method with the
ServiceBrowser object.
"""
listener = CastListener(callback)
service_browser = False
try:
service_browser = zeroconf.ServiceBrowser(zeroconf.Zeroconf(interfaces=zeroconf.InterfaceChoice.All),
"_googlecast._tcp.local.",
listener)
except (zeroconf.BadTypeInNameException,
NotImplementedError,
OSError,
socket.error,
zeroconf.NonUniqueNameException):
pass
return listener, service_browser