Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def discover_devices(args):
# Look for services and try to immediatly print the device when it is discovered
# (unless --sort is specified, then we will wait until discovery finishes
listener = Listener(print_when_discovered=not args.sort)
try:
browser = zeroconf.ServiceBrowser(
zeroconf.Zeroconf(), "_arduino._tcp.local.", listener
)
except (
zeroconf.BadTypeInNameException,
NotImplementedError,
OSError,
socket.error,
zeroconf.NonUniqueNameException,
) as exc:
print("! error when creating service discovery browser: {}".format(exc))
sys.exit(1)
try:
time.sleep(args.timeout)
except KeyboardInterrupt:
sys.exit(1)
def onInit(self):
log.info('Zeroconf publish init')
self.ip_addr = self._get_ip()
self.hostname = socket.gethostname()
self.services = []
self.desc = {'Description': 'Chains Home Automation service on rabbitmq'}
self.amqp_info = ServiceInfo("_amqp._tcp.local.",
"Chains Master AMQP %s._amqp._tcp.local." % self.hostname,
socket.inet_aton(self.ip_addr), 5672, 0, 0,
self.desc, "%s.local." % self.hostname)
self.service.append(self.amqp_info)
self.zeroconf = Zeroconf()
class ZCListener:
def __init__(self, names):
self.names = names
def removeService(self, server, type, name):
self.names.remove(name)
def addService(self, server, type, name):
self.names.append(name)
REMOTE = '_tivo-videos._tcp.local.'
tivo_names = []
# Get the names of TiVos offering network remote control
try:
serv = zeroconf.Zeroconf()
browser = zeroconf.ServiceBrowser(serv, REMOTE, ZCListener(tivo_names))
except Exception, e:
Log("Error staring Zero Conf: %s" % e)
return oc
# Give them a second to respond
sleep(0.7)
# For proxied TiVos, remove the original and any black listed tivos
browseblacklist = (Prefs['browseblacklist'] or "").split(",")
for t in tivo_names[:]:
if t.split(".")[0] in browseblacklist:
tivo_names.remove(t)
continue
if t.startswith('Proxy('):
try:
0, 0, 0, # 3 unused LONGs
*self.axes, # 8 LONGs for axes and 7 unused LONGs
self.buttons & 0xffffffff, # 1 LONG for buttons
0, 0, 0, 0, # 4 DWORDs for hats
(self.buttons >> 32) & 0xffffffff,
(self.buttons >> 64) & 0xffffffff,
(self.buttons >> 96) & 0xffffffff # 3 LONGs for buttons
))
# This allows a very simple emit() definition:
self.buttons = 0
def close(self):
self.device.close()
zeroconf = Zeroconf()
# Webserver to serve files to android client
from http.server import HTTPServer, SimpleHTTPRequestHandler
from threading import Thread
import socketserver
import os, urllib, posixpath
# TODO: These three lines allow using the syntax with socketserver with
# old versions of Python like in Debian.
# Please delete once socketserver.py is updated in every major Linux distro.
if not "__enter__" in dir(socketserver.BaseServer):
socketserver.BaseServer.__enter__ = lambda self: self
socketserver.BaseServer.__exit__ = lambda self, *args: self.server_close()
class HTTPRequestHandler(SimpleHTTPRequestHandler):
def discover(self):
print("{status} Smart Module hosting asset {asset_id} {asset_type} {asset_context}.".format(
status="Mock" if self.rtc.mock else "Real",
asset_id=self.asset.id,
asset_type=self.asset.type,
asset_context=self.asset.context))
try:
max_sleep_time = 3 # Calling sleep should be reviewed.
zeroconf = Zeroconf()
Log.info("Performing Broker discovery...")
self.find_broker(zeroconf)
time.sleep(max_sleep_time) # Wait for max_sleep_time to see if we found it.
if self.comm.broker_name or self.comm.broker_ip: # Found it.
Log.info("MQTT Broker: {broker_name} IP: {broker_ip}.".format(
broker_name=self.comm.broker_name,
broker_ip=self.comm.broker_ip))
else: # Make necessary actions to become the broker.
Log.info("Broker not found. Becoming the broker.")
self.become_broker()
time.sleep(max_sleep_time)
self.comm.connect() # Now it's time to connect to the broker.
except Exception as excpt:
Log.exception("[Exiting] Trying to find or become the broker.")
finally:
Log.info("Closing Zeroconf connection.")
transfer. See :code:`urllib.urlretrieve` for callback parameters.
Optional.
timeout : int
Seconds to wait until process is aborted. A running transfer is not
aborted even when timeout was hit. Optional.
Raises
-------
TimeoutException
When a timeout occurred.
"""
basename = os.path.basename(filename)
filehash = hashlib.sha1(basename.encode('utf-8')).hexdigest()
zeroconf = Zeroconf()
listener = ServiceListener()
listener.filehash = filehash
utils.logger.debug(_("Looking for %s._zget._http._tcp.local.") % filehash)
browser = ServiceBrowser(zeroconf, "_zget._http._tcp.local.", listener)
start_time = time.time()
try:
while listener.address is None:
time.sleep(0.5)
if (
timeout is not None and
time.time() - start_time > timeout
):
zeroconf.close()
self.airplay_mirroring_server.parent = self
self.airtunes_server.parent = self
self.airplay_thread = threading.Thread(
target=self.airplay_server.serve_forever)
self.airplay_mirroring_thread = threading.Thread(
target=self.airplay_mirroring_server.serve_forever)
# self.airtunes_thead = threading.Thread(
# target=self.airtunes_server.serve_forever)
self.airplay_thread.start()
self.airplay_mirroring_thread.start()
# self.airtunes_thead.start()
# register with bonjour
self.zc = zeroconf.Zeroconf()
self.register_airtunes(self.airtunes_server.server_port)
self.register_airplay(self.airplay_port)
print 'Ready'
try:
self.airtunes_server.serve_forever()
finally:
for con in self.connections.itervalues():
con.shutdown()
self.airplay_server.shutdown()
self.airplay_mirroring_server.shutdown()
# self.airtunes_server.shutdown()
:param str ip_address: The IP address on which *Virtual Touchpad* is
reachable.
:param int port: The port on which to connect to *Virtual Touchpad*.
"""
import getpass
import socket
import types
try:
import zeroconf
except:
yield
return
zc = zeroconf.Zeroconf()
info = zeroconf.ServiceInfo(
SERVICE_NAME,
'%s@%s.%s' % (
getpass.getuser(),
socket.gethostname().replace('.', '_'),
SERVICE_NAME),
socket.inet_aton(ip_address),
port,
0, 0,
{
'version': '.'.join(str(v) for v in __version__)})
zc.register_service(info)
try:
yield
finally:
def __init__(self):
self._zc = zeroconf.Zeroconf(interfaces=zeroconf.InterfaceChoice.All)
self._found_types = []
self._found_services = []
# a separator, and some NSD clients can't handle names with it.
system = f"{platform.system()} {platform.node()}".replace('.', '_')
full_name = f"{self.SERVICE_NAME} - {system}"
# The name's maximum length is 64 bytes
if len(full_name) >= 64:
full_name = full_name[:60] + "..."
self.info = ServiceInfo(
self.SERVICE_TYPE,
f"{full_name}.{self.SERVICE_TYPE}",
addresses=[socket.inet_aton(self.address)],
port=self.port,
properties=desc)
logging.info("Registering Vidify service")
self.zeroconf = Zeroconf(ip_version=self.IP_VERSION)
self.zeroconf.register_service(self.info)