Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ftp_burst(host, port):
if not port_check(host, port):
return
if anonymous_login(host, port):
logger.info('try burst {}:{} use username:{} password:{}'.format(
host, port, 'anonymous', ''))
result_queue.put(('anonymous', ''))
return
try:
task_init(host, port)
run_threads(4, task_thread)
except Exception:
pass
def task_thread(task_queue, result_queue):
while not task_queue.empty():
host, port, username, password = task_queue.get()
logger.info('try burst {}:{} use username:{} password:{}'.format(
host, port, username, password))
if ssh_login(host, port, username, password):
with task_queue.mutex:
task_queue.queue.clear()
result_queue.put((username, password))
cidr_set.add(i)
conf.url = []
else:
cidr_text = input("Please input CIDR address:")
cidr_set.add(cidr_text)
count = 0
for i in cidr_set:
try:
network = ip_network(i, strict=False)
for host in network.hosts():
self.add_target(host.exploded)
count += 1
except ValueError:
logger.error("[PLUGIN] error format from " + i)
info_msg = "[PLUGIN] get {0} target(s) from CIDR".format(count)
logger.info(info_msg)
def init_censys_api(self):
self.censys = Censys(uid=conf.censys_uid, secret=conf.censys_secret)
if self.censys.get_resource_info():
info_msg = "[PLUGIN] Censys credits limit {0}".format(self.censys.credits)
logger.info(info_msg)