Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, configs, time, test_dataset_id, query):
logger.info('dtlpy version: ' + str(dl.__version__))
logger.info('dtlpy info: ' + str(dl.info()))
time = int(time)
dl.setenv('prod')
configs = json.loads(configs)
query = json.loads(query)
self.configs_input = dl.FunctionIO(type='Json', name='configs', value=configs)
self.service = dl.services.get('zazu')
project_name = configs['dataloop']['project']
self.project = dl.projects.get(project_name)
test_dataset = self.project.datasets.get(dataset_id=test_dataset_id)
maybe_download_pred_data(dataset_obj=test_dataset, val_query=query)
# add gt annotations
filters = dl.Filters()
filters.custom_filter = query
dataset_name = test_dataset.name
path_to_dataset = os.path.join(os.getcwd(), dataset_name)
# only download if doesnt exist
if not os.path.exists(path_to_dataset):
download_and_organize(path_to_dataset=path_to_dataset, dataset_obj=test_dataset, filters=filters)
json_file_path = os.path.join(path_to_dataset, 'json')
self.model_obj = self.project.models.get(model_name='retinanet')
self.adapter = self.model_obj.build(local_path=os.getcwd())
logger.info('model built')
while 1:
self.compute = precision_recall_compute()
self.compute.add_dataloop_local_annotations(json_file_path)
logger.info("running new execution")
if self.remote:
dataset_obj = get_dataset_obj(optimal_model.dataloop)
self.project = dl.projects.get(project_id=dataset_obj.projects[0])
self.dataset_id = dataset_obj.id
try:
self.train_query = optimal_model.dataloop['train_query']
except:
self.train_query = dl.Filters().prepare()['filter']
try:
# TODO: TRAIN QUERY IS STILL BEING COPPIED
try:
self.val_query = deepcopy(self.train_query)
except:
self.val_query = dl.Filters().prepare()
self.val_query['filter']['$and'][0]['dir'] = optimal_model.dataloop['test_dir']
except:
try:
self.val_query = optimal_model.dataloop['val_query']
except:
self.val_query = dl.Filters().prepare()['filter']
with open('global_configs.json', 'r') as fp:
global_project_name = json.load(fp)['project']
self.global_project = dl.projects.get(project_name=global_project_name)
# TODO: dont convert here
if self.optimal_model.name == 'yolov3':
if self.optimal_model.data['annotation_type'] == 'coco':
self._convert_coco_to_yolo_format()
def download_and_organize_pred_data(path_to_dataset, dataset_obj, filters=None):
if filters is None:
query = dl.Filters().prepare()['filter']
filters = dl.Filters()
filters.custom_filter = query
if not os.path.exists(os.path.dirname(path_to_dataset)):
os.mkdir(os.path.dirname(path_to_dataset))
os.mkdir(path_to_dataset)
dataset_obj.items.download(local_path=path_to_dataset, filters=filters)
images_folder = os.path.join(path_to_dataset, 'items')
logger.info('downloaded ' + str(len(os.listdir(images_folder))) + ' to ' + images_folder)
# move to imgs and annotations to fixed format
for path, su1bdirs, files in os.walk(images_folder):
for name in files:
filename, ext = os.path.splitext(name)
if ext.lower() not in ['.jpg', '.jpeg', '.png']:
continue
def maybe_download_data(dataset_obj, train_query, val_query):
# check if data is downloaded if not then download
train_filters = dl.Filters()
train_filters.custom_filter = train_query
val_filters = dl.Filters()
val_filters.custom_filter = val_query
logger.info('train query: ' + str(train_query))
logger.info('filters: ' + str(train_filters.prepare()))
logger.info('val query: ' + str(val_query))
logger.info('filters: ' + str(val_filters.prepare()))
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
path_to_put_data = os.path.join(parent_dir, 'data')
if not os.path.exists(path_to_put_data):
os.mkdir(path_to_put_data)
if data_format == 'dataloop':
dataset_name = dataset_obj.name
path_to_dataset = os.path.join(path_to_put_data, dataset_name, 'train')
def download_and_organize(path_to_dataset, dataset_obj, filters=None):
if filters is None:
query = dl.Filters().prepare()['filter']
filters = dl.Filters()
filters.custom_filter = query
if not os.path.exists(os.path.dirname(path_to_dataset)):
os.mkdir(os.path.dirname(path_to_dataset))
os.mkdir(path_to_dataset)
dataset_obj.items.download(local_path=path_to_dataset, filters=filters)
dataset_obj.download_annotations(local_path=path_to_dataset, filters=filters)
images_folder = os.path.join(path_to_dataset, 'items')
json_folder = os.path.join(path_to_dataset, 'json')
logger.info('downloaded ' + str(len(os.listdir(images_folder))) + ' to ' + images_folder)
logger.info('downloaded ' + str(len(os.listdir(json_folder))) + ' to ' + json_folder)
# move to imgs and annotations to fixed format
for path, su1bdirs, files in os.walk(images_folder):
for name in files:
filename, ext = os.path.splitext(name)
def maybe_download_data(dataset_obj, train_query, val_query):
# check if data is downloaded if not then download
train_filters = dl.Filters()
train_filters.custom_filter = train_query
val_filters = dl.Filters()
val_filters.custom_filter = val_query
logger.info('train query: ' + str(train_query))
logger.info('filters: ' + str(train_filters.prepare()))
logger.info('val query: ' + str(val_query))
logger.info('filters: ' + str(val_filters.prepare()))
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
path_to_put_data = os.path.join(parent_dir, 'data')
if not os.path.exists(path_to_put_data):
os.mkdir(path_to_put_data)
if data_format == 'dataloop':
dataset_name = dataset_obj.name
path_to_dataset = os.path.join(path_to_put_data, dataset_name, 'train')
path_to_val_dataset = os.path.join(path_to_put_data, dataset_name, 'val')
def maybe_download_pred_data(dataset_obj, val_query):
# check if data is downloaded if not then download
val_filters = dl.Filters()
val_filters.custom_filter = val_query
logger.info('val query: ' + str(val_query))
logger.info('filters: ' + str(val_filters.prepare()))
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
path_to_put_data = os.path.join(parent_dir, 'data')
if not os.path.exists(path_to_put_data):
os.mkdir(path_to_put_data)
if data_format == 'dataloop':
dataset_name = dataset_obj.name
path_to_pred_dataset = os.path.join(path_to_put_data, dataset_name, 'predict_on')
# download d.names
try:
dataset_obj.items.get('/d.names').download(local_path=os.path.join(path_to_put_data, dataset_name))