Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dut.tftp_port = 69
if (getpass.getuser() != 'root'):
dut.tftp_port = 20069
client_dir = './tftp_client_dir'
server_dir = './tftp_server_dir'
if (os.path.exists(client_dir) == False):
os.mkdir(client_dir)
if (os.path.exists(server_dir) == False):
os.mkdir(server_dir)
if not hasattr(dut, 'tftp_server_thread'):
dut.tftp_server_thread = threading.Thread(target=tftp_server_for_ever, args=(dut.tftp_port, server_dir,))
dut.tftp_server_thread.daemon = True
dut.tftp_server_thread.start()
tftp_client = tftpy.TftpClient(socket.gethostbyname(socket.getfqdn()), dut.tftp_port)
tftp_client.upload(cert_file.split('/')[-1], cert_file)
if (filename == ''):
filename = name
myfile = open(filename, 'w')
# TFTP doesn't allow empty file creation
if (text == ''):
myfile.write('\n')
else:
myfile.write(text)
myfile.close()
# upload on TFTP server
if (server == ''):
server = socket.gethostbyname(socket.getfqdn())
tftp_client = tftpy.TftpClient(server, port)
tftp_client.upload(filename.split('/')[-1], filename)
self._tftp_port = port
# device commands (timeout of 60 seconds for each MB)
timeout = (os.path.getsize(filename) / 1048576 + 1) * 60000
create_cmd = 'copy {0}://{1}/{2} {3}'.format(protocol, server, filename.split('/')[-1], name)
cmds = {'cmds': [{'cmd': create_cmd, 'prompt': '\#', 'timeout': timeout}]}
self._device.cmd(cmds, cache=False, flush_cache=True)
self._update_file()
m = r.match(url)
if m:
host = m.group(1)
port = m.group(2)
fname = m.group(3)
if port == "":
port = 69
f = {}
f["name"] = url.split("/")[-1].strip()
f["date"] = int(time.time())
f["len"] = 0
f["file"] = self.dir + str(f["date"]) + "_" + f["name"]
client = tftpy.TftpClient(host, int(port))
client.download(fname, f["file"])
h = hashlib.sha256()
with open(f["file"], 'rb') as fd:
chunk = fd.read(4096)
h.update(chunk)
f["sha256"] = h.hexdigest()
return f
else:
raise ValueError("Invalid tftp url")
log.setLevel(logging.DEBUG)
# increase the verbosity of the formatter
debug_formatter = logging.Formatter('[%(asctime)s%(msecs)03d] %(levelname)s [%(name)s:%(lineno)s] %(message)s')
handler.setFormatter(debug_formatter)
elif options.quiet:
log.setLevel(logging.WARNING)
progresshook = Progress(log.info).progresshook
tftp_options = {}
if options.blksize:
tftp_options['blksize'] = int(options.blksize)
if options.tsize:
tftp_options['tsize'] = 0
tclient = tftpy.TftpClient(options.host,
int(options.port),
tftp_options,
options.localip)
try:
if options.download:
if not options.output:
options.output = os.path.basename(options.download)
tclient.download(options.download,
options.output,
progresshook)
elif options.upload:
if not options.input:
options.input = os.path.basename(options.upload)
tclient.upload(options.upload,
options.input,
progresshook)
def makeTftpRetrieval(self):
progresshook = Progress(self).progresshook
self.artifactFile = Artifact(self.file_to_get)
tclient = None
url = ''
try:
tclient = tftpy.TftpClient(self.hostname, int(self.port))
# tftpy can't handle unicode string as filename
# so we have to convert unicode type to str type
tclient.download(str(self.file_to_get), self.artifactFile, progresshook)
url = 'tftp://%s/%s' % (self.hostname, self.file_to_get.strip('/'))
self.file_to_get = self.fs.resolve_path(self.file_to_get, self.protocol.cwd)
if hasattr(tclient.context, 'metrics'):
self.fs.mkfile(self.file_to_get, 0, 0, tclient.context.metrics.bytes, 33188)
else:
self.fs.mkfile(self.file_to_get, 0, 0, 0, 33188)
except tftpy.TftpException:
if tclient and tclient.context and not tclient.context.fileobj.closed:
@dbus.service.method(DBUS_NAME,
in_signature='sss', out_signature='')
def updateFromTftp(self,flash,url,filename):
if (self.dbus_objects.has_key(flash) == False):
print "ERROR FlashManager: Not a valid flash device: "+flash
return
try:
## need to make download async
self.status[flash]="DOWNLOADING"
filename = str(filename)
client = tftpy.TftpClient(url, TFTP_PORT)
print "Downloading: "+filename+" from "+url
outfile = DOWNLOAD_DIR+"/"+filename
client.download(filename,outfile)
intf = self.getInterface(flash)
self.status[flash]="FLASHING"
intf.update(outfile)
except Exception as e:
print "ERROR FlashManager: "+str(e)
self.status="ERROR"