Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:raises BulbException: When the bulb indicates an error condition.
:returns: The response from the bulb.
"""
command = {"id": self._cmd_id, "method": method, "params": params}
_LOGGER.debug("%s > %s", self, command)
try:
self._socket.send((json.dumps(command) + "\r\n").encode("utf8"))
except socket.error as ex:
# Some error occurred, remove this socket in hopes that we can later
# create a new one.
self.__socket.close()
self.__socket = None
raise_from(BulbException("A socket error occurred when sending the command."), ex)
if self._music_mode:
# We're in music mode, nothing else will happen.
return {"result": ["ok"]}
# The bulb will send us updates on its state in addition to responses,
# so we want to make sure that we read until we see an actual response.
response = None
while response is None:
try:
data = self._socket.recv(16 * 1024)
except socket.error:
# An error occured, let's close and abort...
self.__socket.close()
self.__socket = None
response = {"error": "Bulb closed the connection."}
:raises BulbException: When the bulb indicates an error condition.
:returns: The response from the bulb.
"""
command = {"id": self._cmd_id, "method": method, "params": params}
_LOGGER.debug("%s > %s", self, command)
try:
self._socket.send((json.dumps(command) + "\r\n").encode("utf8"))
except socket.error as ex:
# Some error occurred, remove this socket in hopes that we can later
# create a new one.
self.__socket.close()
self.__socket = None
raise_from(BulbException("A socket error occurred when sending the command."), ex)
if self._music_mode:
# We're in music mode, nothing else will happen.
return {"result": ["ok"]}
# The bulb will send us updates on its state in addition to responses,
# so we want to make sure that we read until we see an actual response.
response = None
while response is None:
try:
data = self._socket.recv(16 * 1024)
except socket.error:
# An error occured, let's close and abort...
self.__socket.close()
self.__socket = None
response = {"error": "Bulb closed the connection."}
continue
try:
line = json.loads(line.decode("utf8"))
_LOGGER.debug("%s < %s", self, line)
except ValueError:
line = {"result": ["invalid command"]}
if line.get("method") != "props":
# This is probably the response we want.
response = line
else:
self._last_properties.update(line["params"])
if "error" in response:
raise BulbException(response["error"])
return response
continue
try:
line = json.loads(line.decode("utf8"))
_LOGGER.debug("%s < %s", self, line)
except ValueError:
line = {"result": ["invalid command"]}
if line.get("method") != "props":
# This is probably the response we want.
response = line
else:
self._last_properties.update(line["params"])
if "error" in response:
raise BulbException(response["error"])
return response