Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_no_server(loop, *args, **kwargs):
async with aioftp.ClientSession("127.0.0.1", loop=loop):
pass
async def prove(self):
anonymous = False
flag = 3
usernamedic = self.read_file(self.parameter['U']) if 'U' in self.parameter.keys() else self.read_file(os.path.join(paths.DICT_PATH, 'ftp_usernames.txt'))
passworddic = self.read_file(self.parameter['P']) if 'P' in self.parameter.keys() else self.read_file(os.path.join(paths.DICT_PATH, 'ftp_passwords.txt'))
async for (username, password) in self.generate_dict(usernamedic, passworddic):
try:
if username == 'anonymous':
if anonymous:
continue
else:
anonymous = True
async with aioftp.ClientSession(self.target_host, self.target_port, username, password) as client:
self.flag = 1
self.req.append({"username": username, "password": password})
self.res.append({"info": username + "/" + password, "key": "ftp burst"})
except Exception as e:
if "timed out" in str(e):
if flag == 0:
return
flag -= 1
async def get_ftp_listing(self, url):
"""Returns list of files at FTP **url**"""
logger.debug("FTP: listing %s", url)
if self.cache and url in self.cache["ftp_list"]:
return self.cache["ftp_list"][url]
parsed = urlparse(url)
async with aioftp.ClientSession(parsed.netloc,
password=self.USER_AGENT+"@") as client:
res = [str(path) for path, _info in await client.list(parsed.path)]
if self.cache:
self.cache["ftp_list"][url] = res
return res
async def get_checksum_from_ftp(self, url, _desc=None):
"""Compute sha256 checksum of content at ftp **url**
Does not show progress monitor at this time (would need to
get file size first)
"""
parsed = urlparse(url)
checksum = sha256()
async with aioftp.ClientSession(parsed.netloc,
password=self.USER_AGENT+"@") as client:
async with client.download_stream(parsed.path) as stream:
async for block in stream.iter_by_block():
checksum.update(block)
return checksum.hexdigest()
async def get_ftp_listing(self, url):
"""Returns list of files at FTP **url**"""
logger.debug("FTP: listing %s", url)
if self.cache and url in self.cache["ftp_list"]:
return self.cache["ftp_list"][url]
parsed = urlparse(url)
async with aioftp.ClientSession(parsed.netloc,
password=self.USER_AGENT+"@") as client:
res = [str(path) for path, _info in await client.list(parsed.path)]
if self.cache:
self.cache["ftp_list"][url] = res
return res