Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
a_device['port']
except KeyError:
a_device['port'] = 22
identifier = '{ip}:{port}'.format(**a_device)
return_data = {}
show_ver_command = 'show version'
SSHClass = netmiko.ssh_dispatcher(a_device['device_type'])
try:
net_connect = SSHClass(**a_device)
show_version = net_connect.send_command(show_ver_command)
except (NetMikoTimeoutException, NetMikoAuthenticationException) as e:
return_data[identifier] = (False, e)
# Add data to the queue (for parent process)
mp_queue.put(return_data)
return None
return_data[identifier] = (True, show_version)
mp_queue.put(return_data)
def open(self):
"""Open a connection to the device."""
try:
if self.transport == 'ssh':
device_type = 'huawei'
else:
raise ConnectionException("Unknown transport: {}".format(self.transport))
self.device = ConnectHandler(device_type=device_type,
host=self.hostname,
username=self.username,
password=self.password,
**self.netmiko_optional_args)
# self.device.enable()
except NetMikoTimeoutException:
raise ConnectionException('Cannot connect to {}'.format(self.hostname))
def enable(self, cmd="sudo su", pattern="ssword", re_flags=re.IGNORECASE):
"""Attempt to become root."""
delay_factor = self.select_delay_factor(delay_factor=0)
output = ""
if not self.check_enable_mode():
self.write_channel(self.normalize_cmd(cmd))
time.sleep(0.3 * delay_factor)
try:
output += self.read_channel()
if re.search(pattern, output, flags=re_flags):
self.write_channel(self.normalize_cmd(self.secret))
self.set_base_prompt()
except socket.timeout:
raise NetMikoTimeoutException(
"Timed-out reading channel, data not available."
)
if not self.check_enable_mode():
msg = (
"Failed to enter enable mode. Please ensure you pass "
"the 'secret' argument to ConnectHandler."
)
raise ValueError(msg)
return output
def open(self):
"""
Open a connection to an IOS-XR device.
Connects to the device using SSH and drops into XML mode.
"""
try:
self.device = ConnectHandler(device_type='cisco_xr',
ip=self.hostname,
port=self.port,
username=self.username,
password=self.password,
**self.netmiko_kwargs)
self.device.timeout = self.timeout
self._xml_agent_alive = True # successfully open thus alive
except NetMikoTimeoutException as t_err:
raise ConnectError(t_err.args[0])
except NetMikoAuthenticationException as au_err:
raise ConnectError(au_err.args[0])
self._cli_prompt = self.device.find_prompt() # get the prompt
self._enter_xml_mode()
# with connect(), which, I feel, behaves as it should.
# Otherwise, check to see if we are parsing by district/site
error = str(e).replace('\n', '-')
if ':22' in error:
port = 22
if 'Telnet' in error:
port = 23
return_data[host] = {
'port': port,
'status': 3,
'message': 'Authentication failure - provided credentials rejected.'
}
pass
except netmiko.ssh_exception.NetMikoTimeoutException as e:
# Error 3: Timeout
return_data[host] = {
'port': None,
'status': 4,
'message': str(e)
}
except SSHException as e:
# Error 4: SSH exception
return_data[host] = {
'port': 22,
'status': 5,
'message': str(e)
}
pass
def open(self):
try:
self.device = ConnectHandler(device_type='linux',
host=self.hostname,
username=self.username,
password=self.password,
**self.netmiko_optional_args)
# Enter root mode.
if self.netmiko_optional_args.get('secret'):
self.device.enable()
except NetMikoTimeoutException:
raise ConnectionException('Cannot connect to {}'.format(self.hostname))
except ValueError:
raise ConnectionException('Cannot become root.')
for hostname in devices_list:
print ('Connecting to device" ' + hostname)
ip_address_of_device = hostname
ios_device = {
'device_type': 'cisco_ios',
'ip': ip_address_of_device,
'username': username,
'password': password
}
try:
net_connect = ConnectHandler(**ios_device)
except (AuthenticationException):
print('Authentication failure: ' + ip_address_of_device)
continue
except (NetMikoTimeoutException):
print('Timeout to device: ' + ip_address_of_device)
continue
except (EOFError):
print("End of file while attempting device " + ip_address_of_device)
continue
except (SSHException):
print('SSH Issue. Are you sure SSH is enabled? ' + ip_address_of_device)
continue
except Exception as unknown_error:
print('Some other error: ' + str(unknown_error))
continue
if hostname == 'r1':
print("hostname == r1:", hostname == 'r1')
output = net_connect.send_config_set(commands_list_r1)
print(output)
output += new_data
new_data = self.read_channel()
# Search for password pattern / send password
if re.search(pattern, new_data, flags=re.I):
self.write_channel(self.secret + self.RETURN)
time.sleep(1 * delay_factor)
output += new_data
new_data = self.read_channel()
if 'Password has not been set' in new_data:
raise ValueError(msg)
if re.search(r'\<.+\>', new_data):
output += new_data
break
i += 1
except NetMikoTimeoutException:
raise ValueError(msg)
# FIX - Need a way to validate that we actually went into priv 15
# if not self.check_enable_mode():
# raise ValueError(msg)
return output
'''All the logic happens here. Take the data, process it, print results'''
sessiondict = {
'device_type': devicetype,
'ip': ipaddr,
'username': username,
'password': password,
'secret': secret,
'verbose': False
}
try:
# Start the session, enable, send the commands, capture terminal output and remove the connections
session = netmiko.ConnectHandler(**sessiondict)
session.enable()
session_return = session.send_command(clicomm)
session.disconnect()
except (netmiko.ssh_exception.NetMikoTimeoutException):
session_return = "----------DEVICE CONNECTION FAILED----------"
# Fancy formatting here for results
print("\n\n>>>>>>>>> {0} {1} <<<<<<<<<\n".format(hostname, ipaddr)
+ session_return
+ "\n>>>>>>>>> End <<<<<<<<<\n")