Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def append_service(info: ServiceInfo) -> None:
"""Append discovered zeroconf service to service list."""
name = info.name[: -(len(info.type) + 1)]
ip = info.parsed_addresses(IPVersion.V4Only)[0]
port = info.port
model = info.properties.get(b"name", "").decode("utf-8")
id = info.properties.get(b"id")
# handle id decode for various discovered use cases
if isinstance(id, bytes):
try:
int(id, 16)
except Exception:
id = id.hex()
else:
id = None
service = ZeroconfDevice(name, ip, port, model, id)
services.append(service)
def __init__(self):
super().__init__()
logging.getLogger("zeroconf").setLevel(logging.DEBUG)
self.periodic_callbacks = []
self.discovered_api_instances = {}
self.ip_version = IPVersion.V4Only # IPVersion.All
self.secure = not options.disable_ssl
self.network = f"{socket.getfqdn()}"
self.zeroconf = None
subdesc = "{}:{}".format(socket.gethostname(), options.name if options.name else options.server_port_nonssl)
desc = {
"httpEndpoint": f"http://{socket.getfqdn()}:{options.server_port_nonssl}/graphql",
"wsEndpoint": f"ws://{socket.getfqdn()}:{options.server_port_nonssl}/subscriptions",
"schemaEndpoint": f"http://{socket.getfqdn()}:{options.server_port_nonssl}/schema",
"websocketsOnly": False,
"uuid": api_instance_uuid,
"service_type": "maverick-api",
"name": subdesc,
"hostname": socket.getfqdn(),
}
if self.secure:
desc["httpsEndpoint"] = f"https://{socket.getfqdn()}:{options.server_port_ssl}/graphql"
def __init__(self, config):
self.ip_addr = AirDropUtil.get_ip_for_interface(config.interface, ipv6=True)
if self.ip_addr is None:
if config.interface == 'awdl0':
raise RuntimeError('Interface {} does not have an IPv6 address. '
'Make sure that `owl` is running.'.format(config.interface))
else:
raise RuntimeError('Interface {} does not have an IPv6 address'.format(config.interface))
self.zeroconf = Zeroconf(interfaces=[str(self.ip_addr)],
ip_version=IPVersion.V6Only,
apple_p2p=platform.system() == 'Darwin')
self.callback_add = None
self.callback_remove = None
self.browser = None
# The external player requires a YouTube URL rather than the direct URL,
# so that it complies with YouTube's Terms Of Service.
DIRECT_URL = False
# The name should be something like Vidify + system specs.
# This part should also check that there aren't services with the same
# name already.
SERVICE_NAME = "vidify"
# The service type includes the protocols used, like this:
# "_._".
# For now, the data is transmitted with TCP, so this is enough.
SERVICE_TYPE = "_vidify._tcp.local."
# Trying to use both IPv4 and IPv6
IP_VERSION = IPVersion.All
# Prefixes for the labels shown in the layout
_LABEL_PREFIXES = {
'url': '<b>URL:</b> ',
'relative_pos': '<b>Last relative position change:</b> ',
'absolute_pos': '<b>Last absolute position change:</b> ',
'is_playing': '<b>Is it playing?:</b> ',
'clients': '<b>Connected clients:</b> '
}
_CONFIRM_MSG = json.dumps({'success': True}).encode('utf-8')
def __init__(self, api_name: str) -> None:
"""
This initializes both the player widget and the TCP server.
"""
self.ServerClass = HTTPServerV6
self.ServerClass.allow_reuse_address = False
self.ip_addr = AirDropUtil.get_ip_for_interface(self.config.interface, ipv6=True)
if self.ip_addr is None:
if self.config.interface == 'awdl0':
raise RuntimeError('Interface {} does not have an IPv6 address. '
'Make sure that `owl` is running.'.format(self.config.interface))
else:
raise RuntimeError('Interface {} does not have an IPv6 address'.format(self.config.interface))
self.Handler = AirDropServerHandler
self.Handler.config = self.config
self.zeroconf = Zeroconf(interfaces=[str(self.ip_addr)],
ip_version=IPVersion.V6Only,
apple_p2p=platform.system() == 'Darwin')
self.http_server = self._init_server()
self.service_info = self._init_service()
def append_service(info: ServiceInfo) -> None:
name = info.name[: -(len(info.type) + 1)]
ip = info.parsed_addresses(IPVersion.V4Only)[0]
port = info.port
model = info.properties[b"name"].decode("utf-8")
# id = info.properties[b"id"]
# # handle id decode for various discovered use cases
# if isinstance(id, bytes):
# try:
# int(id, 16)
# except Exception:
# id = id.hex()
# else:
# id = None
service = ZeroconfDevice(name, ip, port, model, None)
services.append(service)