Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def grabber_log(*args):
if setting.getboolean("errorlog"):
content = time.strftime("%Y-%m-%dT%H:%M:%S%z") + "\n" + pformat(args) + "\n\n"
content_write(profile("grabber.log"), content, append=True)
self.pre_url = None
self.create_view()
self.pool_index = {
id(mission_manager.view): self.view_table,
id(mission_manager.library): self.library_table
}
self.bindevent()
self.register_listeners()
printer.add_listener(self.sp_callback)
if (setting.getboolean("libraryautocheck") and
time() - setting.getfloat("lastcheckupdate", 0) > 24 * 60 * 60):
download_manager.start_check_update()
self.update_table(mission_manager.view)
self.update_table(mission_manager.library)
self.save()
self.update()
self.root.mainloop()
def debug_log(*args):
if setting.getboolean("errorlog"):
content_write(profile("debug.log"), ", ".join(args) + "\n", append=True)
def debug_log(*args):
if setting.getboolean("errorlog"):
content_write(profile("debug.log"), ", ".join(args) + "\n", append=True)
self.create_view()
self.pool_index = {
id(mission_manager.view): self.view_table,
id(mission_manager.library): self.library_table
}
self.bindevent()
self.register_listeners()
printer.add_listener(self.sp_callback)
if (setting.getboolean("libraryautocheck") and
time() - setting.getfloat("lastcheckupdate", 0) > 24 * 60 * 60):
download_manager.start_check_update()
self.update_table(mission_manager.view)
self.update_table(mission_manager.library)
self.save()
self.update()
self.root.mainloop()
def save(self):
"""Save mission periodly"""
mission_manager.save()
self.root.after(int(setting.getfloat("autosave", 5) * 60 * 1000), self.save)
def _(event):
"""After download, execute command."""
if event.target is not self.download_thread:
return
cmd = event.data.module.config.get("runafterdownload")
default_cmd = setting.get("runafterdownload")
commands = []
if cmd:
commands.append(cmd)
if default_cmd and default_cmd not in commands:
commands.append(default_cmd)
for command in commands:
command += " " + quote(path_join(
profile(event.data.module.config["savepath"]),
safefilepath(event.data.title)
))
try:
await_(subprocess.call, command, shell=True) # nosec
self.bind_event()
self.thread = worker.current()
self.loop = TkinterLoop(self.root, worker.update)
self.pool_index = {
id(mission_manager.view): self.view_table,
id(mission_manager.library): self.library_table
}
self.register_listeners()
printer.add_listener(self.sp_callback)
if (setting.getboolean("libraryautocheck") and
time() - setting.getfloat("lastcheckupdate", 0) > setting.getfloat("autocheck_interval") * 60 * 60):
download_manager.start_check_update()
self.update_table(mission_manager.view)
self.update_table(mission_manager.library)
self.save()
self.loop.start()
def start_check_update(self, missions=None):
"""Start checking library update."""
if self.library_thread:
print("Already checking update")
return
# set mission state to ANALYZE_INIT
setting["lastcheckupdate"] = str(time())
if missions is None:
missions = mission_manager.get_all("library", lambda m: m.state not in ("DOWNLOADING", "ANALYZING"))
for mission in missions:
mission.state = "ANALYZE_INIT"
missions = set(missions)
def gen_missions():
while True:
mission = mission_manager.get("library", lambda m: m.state in ("ANALYZE_INIT", "ERROR") and m in missions)
if not mission:
return
yield mission