Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if request.path.split("/")[-1] not in cv_module:
return {"error": "Module {} is not available.".format(module_name)}
req_id = request.data.get("id")
global use_gpu, batch_size_dict
img_base64 = request.form.getlist("image")
extra_info = {}
for item in list(request.form.keys()):
extra_info.update({item: request.form.getlist(item)})
file_name_list = []
if img_base64 != []:
for item in img_base64:
ext = item.split(";")[0].split("/")[-1]
if ext not in ["jpeg", "jpg", "png"]:
return {"result": "Unrecognized file type"}
filename = req_id + "_" \
+ utils.md5(str(time.time())+item[0:20]) \
+ "." \
+ ext
img_data = base64.b64decode(item.split(',')[-1])
file_name_list.append(filename)
with open(filename, "wb") as fp:
fp.write(img_data)
else:
file = request.files.getlist("image")
for item in file:
file_name = req_id + "_" + item.filename
item.save(file_name)
file_name_list.append(file_name)
module = default_module_manager.get_module(module_name)
predict_func_name = cv_module_method.get(module_name, "")
if predict_func_name != "":
predict_func = eval(predict_func_name)
def predict_iamge(module_name):
global results_dict
req_id = request.data.get("id")
img_base64 = request.form.get("input_img", "")
received_file_name = request.form.get("input_file", "")
ext = received_file_name.split(".")[-1]
if ext == "":
return {"result": "Unrecognized file type"}
score = time.time()
filename = utils.md5(str(time.time()) + str(img_base64)) + "." + ext
base64_head = img_base64.split(',')[0]
img_data = base64.b64decode(img_base64.split(',')[-1])
with open(filename, "wb") as fp:
fp.write(img_data)
file_list = [filename]
if queues_dict[module_name].qsize(
) + 1 > queues_dict[module_name].get_attribute("maxsize"):
return {"result": "Too many visitors now, please come back later."}
data_2_item(file_list, req_id, score, module_name)
data_num = len(file_list)
results = []
result_len = 0
while result_len != data_num:
result_len = len(results_dict.get(req_id, []))
results = results_dict.get(req_id)
results = [i[1] for i in sorted(results, key=lambda k: k[0])]
def __init__(self, module_home=None):
self.local_modules_dir = module_home if module_home else MODULE_HOME
self.modules_dict = {}
if not os.path.exists(self.local_modules_dir):
utils.mkdir(self.local_modules_dir)
elif os.path.isfile(self.local_modules_dir):
raise ValueError("Module home should be a folder, not a file")
def _dump_assets(self):
utils.mkdir(self.helper.assets_path())
for asset in self.assets:
filename = os.path.basename(asset)
newfile = os.path.join(self.helper.assets_path(), filename)
copyfile(asset, newfile)
def request(self):
if not os.path.exists(CACHE_HOME):
utils.mkdir(CACHE_HOME)
try:
r = requests.get(self.get_server_url() + '/' + 'search')
data = json.loads(r.text)
cache_path = os.path.join(CACHE_HOME, RESOURCE_LIST_FILE)
with open(cache_path, 'w+') as fp:
yaml.safe_dump({'resource_list': data['data']}, fp)
return True
except:
if self.config.get('debug', False):
raise
else:
pass
try:
file_url = self.config[
'resource_storage_server_url'] + RESOURCE_LIST_FILE
result, tips, self.resource_list_file = default_downloader.download_file(
def _dump_processor(self):
import inspect
pymodule = inspect.getmodule(self.processor)
pycode = inspect.getsource(pymodule)
processor_path = self.helper.processor_path()
processor_md5 = utils.md5(pycode)
processor_md5 += str(time.time())
processor_name = utils.md5(processor_md5)
output_file = os.path.join(processor_path, processor_name + ".py")
utils.mkdir(processor_path)
with open(output_file, "w") as file:
file.write(pycode)
utils.from_pyobj_to_module_attr(
processor_name, self.desc.attr.map.data['processor_info'])
def _init_with_url(self, url):
utils.check_url(url)
result, tips, module_dir = default_downloader.download_file_and_uncompress(
url, save_path=".")
if not result:
logger.error(tips)
raise RuntimeError(tips)
else:
self._init_with_module_file(module_dir)
def execute(self, argv):
all_modules = default_module_manager.all_modules()
if utils.is_windows():
placeholders = [20, 40]
else:
placeholders = [25, 50]
tp = TablePrinter(
titles=["ModuleName", "Path"], placeholders=placeholders)
for module_name, module_dir in all_modules.items():
tp.add_line(contents=[module_name, module_dir[0]])
print(tp.get_text())
return True
def _check_img(img):
if isinstance(img, str):
utils.check_path(img)
img = Image.open(img)
return img
def before_request():
request.data = {"id": utils.md5(request.remote_addr + str(time.time()))}
pass