Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if dest_path:
image_url = url.rsplit('/', 2)[1] + '-' + url.rsplit('/', 1)[1]
if file_name:
image_filename = file_name
else:
image_filename = image_url.split('?')[0]
dest = os.path.join(dest_path, image_filename)
image.raise_for_status()
with open(dest, 'wb') as data:
image.raw.decode_content = True
shutil.copyfileobj(image.raw, data)
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError):
raise NeatoRobotException("Unable to get robot map")
return image.raw
async def async_setup_entry(hass, entry):
"""Set up config entry."""
hub = NeatoHub(hass, entry.data, Account)
await hass.async_add_executor_job(hub.login)
if not hub.logged_in:
_LOGGER.debug("Failed to login to Neato API")
return False
try:
await hass.async_add_executor_job(hub.update_robots)
except NeatoRobotException:
_LOGGER.debug("Failed to connect to Neato API")
raise ConfigEntryNotReady
hass.data[NEATO_LOGIN] = hub
for component in ("camera", "vacuum", "switch", "sensor"):
hass.async_add_job(
hass.config_entries.async_forward_entry_setup(entry, component)
)
return True
"""
Sends message to robot with data from parameter 'json'
:param json: dict containing data to send
:return: server response
"""
try:
response = requests.post(self._url,
json=json,
verify=self._vendor.cert_path,
auth=Auth(self.serial, self.secret),
headers=self._headers)
response.raise_for_status()
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError):
raise NeatoRobotException("Unable to communicate with robot")
return response
def return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
try:
if self._clean_state == STATE_CLEANING:
self.robot.pause_cleaning()
self._clean_state = STATE_RETURNING
self.robot.send_to_base()
except NeatoRobotException as ex:
_LOGGER.error("Neato vacuum connection error: %s", ex)
def refresh_robots(self):
"""
Get information about robots connected to account.
:return:
"""
try:
resp = requests.get(urljoin(self._endpoint, 'dashboard'),
headers=self._headers)
resp.raise_for_status()
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError):
raise NeatoRobotException("Unable to refresh robots")
for robot in resp.json()['robots']:
if robot['mac_address'] is None:
continue # Ignore robots without mac-address
try:
self._robots.add(Robot(name=robot['name'],
vendor=self._vendor,
serial=robot['serial'],
secret=robot['secret_key'],
traits=robot['traits'],
endpoint=robot['nucleo_url']))
except NeatoRobotException:
print ("Your '{}' robot is offline.".format(robot['name']))
continue
"""
Get information about maps of the robots.
:return:
"""
try:
for robot in self.robots:
resp2 = (
requests.get(urljoin(self._endpoint, 'users/me/robots/{}/maps'.format(robot.serial)),
headers=self._headers))
resp2.raise_for_status()
self._maps.update({robot.serial: resp2.json()})
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError):
raise NeatoRobotException("Unable to refresh robot maps")
def try_login(username, password, vendor):
"""Try logging in to device and return any errors."""
this_vendor = None
if vendor == "vorwerk":
this_vendor = Vorwerk()
else: # Neato
this_vendor = Neato()
try:
Account(username, password, this_vendor)
except NeatoLoginException:
return "invalid_credentials"
except NeatoRobotException:
return "unexpected_error"
return None
def turn_on(self, **kwargs):
"""Turn the switch on."""
if self.type == SWITCH_TYPE_SCHEDULE:
try:
self.robot.enable_schedule()
except NeatoRobotException as ex:
_LOGGER.error("Neato switch connection error: %s", ex)
Get information about persistent maps of the robots.
:return:
"""
try:
for robot in self._robots:
resp2 = (requests.get(urljoin(
self._endpoint,
'users/me/robots/{}/persistent_maps'.format(robot.serial)),
headers=self._headers))
resp2.raise_for_status()
self._persistent_maps.update({robot.serial: resp2.json()})
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError):
raise NeatoRobotException("Unable to refresh persistent maps")
def try_login(username, password, vendor):
"""Try logging in to device and return any errors."""
this_vendor = None
if vendor == "vorwerk":
this_vendor = Vorwerk()
else: # Neato
this_vendor = Neato()
try:
Account(username, password, this_vendor)
except NeatoLoginException:
return "invalid_credentials"
except NeatoRobotException:
return "unexpected_error"
return None