Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data: Either `"on_data"` or `"off_data"`, for `getattr(self, data)`
Returns:
Boolean indicating whether it state was set successfully
"""
req = urllib.request.Request(
url=cmd, data=data, headers=self.headers, method=self.method
)
try:
with self.urlopen(req) as resp:
if isinstance(resp, http.client.HTTPResponse):
return resp.status in (200, 201)
except HTTPError as e:
logger.warning(f"Error with request to {cmd}: {e}")
return False
success = True
logger.info(f"{self.plugin.name} returning friendly name")
if success:
soap_message = soap_format(
action=action, action_type=action_type, return_val=return_val
)
response = self.add_http_headers(soap_message)
logger.debug(response)
self.transport.write(response.encode())
else:
errmsg = (
f"Unable to complete command for {self.plugin.name}:\n{msg}"
)
logger.warning(errmsg)
self.transport.close()
def connection_lost(self, exc: Exception) -> None:
"""Handle lost connections.
Args:
exc: Exception type
"""
if exc:
logger.warning(f"SSDPServer closed with exception: {exc}")
elif command_format("SetBinaryState").casefold() in msg.casefold():
action = "Set"
action_type = "BinaryState"
if "0" in msg:
logger.info(f"Attempting to turn off {self.plugin.name}")
return_val = "0"
success = self.plugin.off()
elif "1" in msg:
logger.info(f"Attempting to turn on {self.plugin.name}")
return_val = "1"
success = self.plugin.on()
else:
logger.warning(f"Unrecognized request:\n{msg}")
elif command_format("GetFriendlyName").casefold() in msg.casefold():
action = "Get"
action_type = "FriendlyName"
return_val = self.plugin.name
success = True
logger.info(f"{self.plugin.name} returning friendly name")
if success:
soap_message = soap_format(
action=action, action_type=action_type, return_val=return_val
)
response = self.add_http_headers(soap_message)
logger.debug(response)
self.transport.write(response.encode())