Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
params["password"] = self.sftp_password
# not empty sftp_public_key means that we should verify sftp server with it
cnopts = pysftp.CnOpts()
if self.sftp_public_key:
key = paramiko.RSAKey(data=base64.b64decode(self.sftp_public_key))
cnopts.hostkeys.add(self.sftp_server, 'ssh-rsa', key)
else:
cnopts.hostkeys = None
with pysftp.Connection(**params, cnopts=cnopts):
raise exceptions.Warning(_("Connection Test Succeeded!"))
except (pysftp.CredentialException,
pysftp.ConnectionException,
pysftp.SSHException):
_logger.info("Connection Test Failed!", exc_info=True)
raise exceptions.Warning(_("Connection Test Failed!"))
def upload(stop):
while not stop.is_set():
try:
for fname in os.listdir(incomingpath):
if fname.endswith('.pcap'):
try:
with pysftp.Connection(host=sshhost, username=sshuser, private_key='~/.ssh/id_rsa') as sftp:
with sftp.cd(incomingpath):
sftp.put(incomingpath +fname)
logging.info("uploaded pcap")
except pysftp.SSHException:
logging.info("Unable to establish SSH connection will retry in 5 min")
time.sleep(300) #seconds
os.remove(incomingpath +fname)
else:
logging.info("no pcap found, will try again in 5 min")
time.sleep(300) #seconds
except KeyboardInterrupt: pass
stop = multiprocessing.Event()