Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def worker_cmd(a_device, mp_queue, cmd='show arp'):
'''
Return a dictionary where the key is the device identifier
Value is (success|fail(boolean), return_string)
'''
identifier = '{ip}:{port}'.format(**a_device)
return_data = {}
try:
net_connect = ConnectHandler(**a_device)
output = net_connect.send_command(cmd)
return_data[identifier] = (True, output)
except (NetMikoTimeoutException, NetMikoAuthenticationException) as e:
return_data[identifier] = (False, e)
# Add data to the queue (for parent process)
mp_queue.put(return_data)
def send_show(device_dict, command):
start_msg = '===> {} Connection: {}'
received_msg = '<=== {} Received: {}'
ip = device_dict['ip']
logging.info(start_msg.format(datetime.now().time(), ip))
if ip == '192.168.100.1': time.sleep(5)
try:
with ConnectHandler(**device_dict) as ssh:
ssh.enable()
result = ssh.send_command(command)
logging.info(received_msg.format(datetime.now().time(), ip))
return result
except NetMikoAuthenticationException as err:
logging.warning(err)
try:
# Establish a connection to the device
result['connection'] = handler(
device_type=netmiko_platform,
ip=ip,
username=cred['username'],
password=cred['password'],
secret=cred['password'],
)
result['cred'] = cred
log('Successful ssh auth to %s using %s, %s' % (ip, cred['username'], cred['password'][:2]), ip=ip, proc=proc, v=logging.N)
return result
except NetMikoAuthenticationException:
log ('SSH auth error to %s using %s, %s' % (ip, cred['username'], cred['password'][:2]), ip=ip, proc=proc, v=logging.A)
continue
except NetMikoTimeoutException:
log('SSH to %s timed out.' % ip, ip=ip, proc=proc, v=logging.A)
# If the device is unavailable, don't try any other credentials
break
except Exception as e:
log('SSH to [{}] failed due to [{}] error: [{}]'.format(ip, type(e).__name__, str(e)))
break
# Check to see if port 23 (telnet) is open
if not result['TCP_23']:
log('Port 23 is closed on %s' % ip, ip=ip, proc=proc, v=logging.I)
elif port is None or port is 23:
for cred in _credList:
try:
def send_show(device_dict, command):
start_msg = '===> {} Connection: {}'
received_msg = '<=== {} Received: {}'
ip = device_dict['ip']
logging.info(start_msg.format(datetime.now().time(), ip))
if ip == '192.168.100.1': time.sleep(5)
try:
with ConnectHandler(**device_dict) as ssh:
ssh.enable()
result = ssh.send_command(command)
logging.info(received_msg.format(datetime.now().time(), ip))
return result
except NetMikoAuthenticationException as err:
logging.warning(err)
def worker_cmd(a_device, mp_queue, cmd='show arp'):
"""
Return a dictionary where the key is the device identifier
Value is (success|fail(boolean), return_string)
"""
identifier = '{ip}:{port}'.format(**a_device)
return_data = {}
try:
net_connect = ConnectHandler(**a_device)
output = net_connect.send_command(cmd)
return_data[identifier] = (True, output)
except (NetMikoTimeoutException, NetMikoAuthenticationException) as e:
return_data[identifier] = (False, e)
# Add data to the queue (for parent process)
mp_queue.put(return_data)
nm_connect_direct.disconnect()
except (NetMikoTimeoutException, NetmikoTimeoutError) as scrape_error:
log.error(
"Timeout connecting to device {loc}: {e}",
loc=self.device.name,
e=str(scrape_error),
)
raise DeviceTimeout(
params.messages.connection_error,
device_name=self.device.display_name,
proxy=self.device.proxy.name,
error=params.messages.request_timeout,
)
except (NetMikoAuthenticationException, NetmikoAuthError) as auth_error:
log.error(
"Error authenticating to device {loc}: {e}",
loc=self.device.name,
e=str(auth_error),
)
raise AuthError(
params.messages.connection_error,
device_name=self.device.display_name,
proxy=self.device.proxy.name,
error=params.messages.authentication_error,
) from None
except sshtunnel.BaseSSHTunnelForwarderError:
raise ScrapeError(
params.messages.connection_error,
device_name=self.device.display_name,
proxy=self.device.proxy.name,