Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def download_file(self,
url,
save_path,
save_name=None,
retry_limit=3,
print_progress=False,
replace=False):
if not os.path.exists(save_path):
utils.mkdir(save_path)
save_name = url.split('/')[-1] if save_name is None else save_name
file_name = os.path.join(save_path, save_name)
retry_times = 0
if replace and os.path.exists(file_name):
os.remove(file_name)
while not (os.path.exists(file_name)):
if retry_times < retry_limit:
retry_times += 1
else:
tips = "Cannot download {0} within retry limit {1}".format(
url, retry_limit)
return False, tips, None
r = requests.get(url, stream=True)
total_length = r.headers.get('content-length')
def __init__(self, config_file_path=None):
if not config_file_path:
config_file_path = os.path.join(CONF_HOME, 'config.json')
if not os.path.exists(CONF_HOME):
utils.mkdir(CONF_HOME)
if not os.path.exists(config_file_path):
with open(config_file_path, 'w+') as fp:
lock.flock(fp, lock.LOCK_EX)
fp.write(json.dumps(default_server_config))
lock.flock(fp, lock.LOCK_UN)
with open(config_file_path) as fp:
self.config = json.load(fp)
fp_lock = open(config_file_path)
lock.flock(fp_lock, lock.LOCK_EX)
utils.check_url(self.config['server_url'])
self.server_url = self.config['server_url']
self.request()
self._load_resource_list_file_if_valid()
lock.flock(fp_lock, lock.LOCK_UN)
with open(os.path.join(self.helper.model_path(), "__model__"),
"rb") as file:
program_desc_str = file.read()
rename_program = fluid.framework.Program.parse_from_string(
program_desc_str)
varlist = {
var: block
for block in rename_program.blocks for var in block.vars
if self.get_name_prefix() not in var
}
for var, block in varlist.items():
old_name = var
new_name = self.get_var_name_with_prefix(old_name)
block._rename_var(old_name, new_name)
utils.mkdir(self.helper.model_path())
with open(
os.path.join(self.helper.model_path(), "__model__"),
"wb") as f:
f.write(rename_program.desc.serialize_to_string())
for file in os.listdir(self.helper.model_path()):
if (file == "__model__" or self.get_name_prefix() in file):
continue
os.rename(
os.path.join(self.helper.model_path(), file),
os.path.join(self.helper.model_path(),
self.get_var_name_with_prefix(file)))
# create processor file
if self.processor:
self._dump_processor()
def _dump_processor(self):
import inspect
pymodule = inspect.getmodule(self.processor)
pycode = inspect.getsource(pymodule)
processor_path = self.helper.processor_path()
processor_md5 = utils.md5(pycode)
processor_md5 += str(time.time())
processor_name = utils.md5(processor_md5)
output_file = os.path.join(processor_path, processor_name + ".py")
utils.mkdir(processor_path)
with open(output_file, "w") as file:
file.write(pycode)
utils.from_pyobj_to_module_attr(
processor_name, self.desc.attr.map.data['processor_info'])