Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with self.assertRaises(ValueError):
PathManager.copy(
self._tmpfile, self._tmpfile, foo="foo" # type: ignore
)
with self.assertRaises(ValueError):
PathManager.exists(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.get_local_path(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.isdir(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.isfile(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.ls(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.mkdirs(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.open(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.rm(self._tmpfile, foo="foo") # type: ignore
PathManager.set_strict_kwargs_checking(False)
PathManager.copy(
self._tmpfile, self._tmpfile, foo="foo" # type: ignore
)
PathManager.exists(self._tmpfile, foo="foo") # type: ignore
PathManager.get_local_path(self._tmpfile, foo="foo") # type: ignore
PathManager.isdir(self._tmpfile, foo="foo") # type: ignore
PathManager.isfile(self._tmpfile, foo="foo") # type: ignore
PathManager.ls(self._tmpdir, foo="foo") # type: ignore
PathManager.mkdirs(self._tmpdir, foo="foo") # type: ignore
self._remote_uri, self._remote_uri, foo="foo" # type: ignore
)
with self.assertRaises(NotImplementedError):
PathManager.exists(self._remote_uri, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.get_local_path(
self._remote_uri, foo="foo" # type: ignore
)
with self.assertRaises(NotImplementedError):
PathManager.isdir(self._remote_uri, foo="foo") # type: ignore
with self.assertRaises(NotImplementedError):
PathManager.isfile(self._remote_uri, foo="foo") # type: ignore
with self.assertRaises(NotImplementedError):
PathManager.ls(self._remote_uri, foo="foo") # type: ignore
with self.assertRaises(NotImplementedError):
PathManager.mkdirs(self._remote_uri, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.open(self._remote_uri, foo="foo") # type: ignore
with self.assertRaises(NotImplementedError):
PathManager.rm(self._remote_uri, foo="foo") # type: ignore
PathManager.set_strict_kwargs_checking(False)
PathManager.get_local_path(self._remote_uri, foo="foo") # type: ignore
f = PathManager.open(self._remote_uri, foo="foo") # type: ignore
f.close()
PathManager.set_strict_kwargs_checking(True)
with self.assertRaises(ValueError):
PathManager.open(self._tmpfile, foo="foo") # type: ignore
with self.assertRaises(ValueError):
PathManager.rm(self._tmpfile, foo="foo") # type: ignore
PathManager.set_strict_kwargs_checking(False)
PathManager.copy(
self._tmpfile, self._tmpfile, foo="foo" # type: ignore
)
PathManager.exists(self._tmpfile, foo="foo") # type: ignore
PathManager.get_local_path(self._tmpfile, foo="foo") # type: ignore
PathManager.isdir(self._tmpfile, foo="foo") # type: ignore
PathManager.isfile(self._tmpfile, foo="foo") # type: ignore
PathManager.ls(self._tmpdir, foo="foo") # type: ignore
PathManager.mkdirs(self._tmpdir, foo="foo") # type: ignore
f = PathManager.open(self._tmpfile, foo="foo") # type: ignore
f.close()
with open(os.path.join(self._tmpdir, "test_rm.txt"), "w") as f:
rm_file = f.name
f.write(self._tmpfile_contents)
f.flush()
PathManager.rm(rm_file, foo="foo") # type: ignore
def evaluate(self):
if self._distributed:
comm.synchronize()
self._predictions = comm.gather(self._predictions, dst=0)
self._predictions = list(itertools.chain(*self._predictions))
if not comm.is_main_process():
return {}
if len(self._predictions) == 0:
self._logger.warning("[COCOEvaluator] Did not receive valid predictions.")
return {}
if self._output_dir:
PathManager.mkdirs(self._output_dir)
file_path = os.path.join(self._output_dir, "instances_predictions.pth")
with PathManager.open(file_path, "wb") as f:
torch.save(self._predictions, f)
self._results = OrderedDict()
if "proposals" in self._predictions[0]:
self._eval_box_proposals()
if "instances" in self._predictions[0]:
self._eval_predictions(set(self._tasks))
# Copy so the caller can do whatever with results
return copy.deepcopy(self._results)
* Pixel Accuracy (pACC)
"""
if self._distributed:
synchronize()
conf_matrix_list = all_gather(self._conf_matrix)
self._predictions = all_gather(self._predictions)
self._predictions = list(itertools.chain(*self._predictions))
if not is_main_process():
return
self._conf_matrix = np.zeros_like(self._conf_matrix)
for conf_matrix in conf_matrix_list:
self._conf_matrix += conf_matrix
if self._output_dir:
PathManager.mkdirs(self._output_dir)
file_path = os.path.join(self._output_dir, "sem_seg_predictions.json")
with PathManager.open(file_path, "w") as f:
f.write(json.dumps(self._predictions))
acc = np.zeros(self._num_classes, dtype=np.float)
iou = np.zeros(self._num_classes, dtype=np.float)
tp = self._conf_matrix.diagonal()[:-1].astype(np.float)
pos_gt = np.sum(self._conf_matrix[:-1, :-1], axis=0).astype(np.float)
class_weights = pos_gt / np.sum(pos_gt)
pos_pred = np.sum(self._conf_matrix[:-1, :-1], axis=1).astype(np.float)
acc_valid = pos_gt > 0
acc[acc_valid] = tp[acc_valid] / pos_gt[acc_valid]
iou_valid = (pos_gt + pos_pred) > 0
union = pos_gt + pos_pred - tp
iou[acc_valid] = tp[acc_valid] / union[acc_valid]
macc = np.sum(acc) / np.sum(acc_valid)
def default_setup(cfg, args):
"""
Perform some basic common setups at the beginning of a job, including:
1. Set up the detectron2 logger
2. Log basic information about environment, cmdline arguments, and config
3. Backup the config to the output directory
Args:
cfg (CfgNode): the full config to be used
args (argparse.NameSpace): the command line arguments to be logged
"""
output_dir = cfg.OUTPUT_DIR
if comm.is_main_process() and output_dir:
PathManager.mkdirs(output_dir)
rank = comm.get_rank()
setup_logger(output_dir, distributed_rank=rank, name="fvcore")
logger = setup_logger(output_dir, distributed_rank=rank)
logger.info("Rank of current process: {}. World size: {}".format(rank, comm.get_world_size()))
logger.info("Environment info:\n" + collect_env_info())
logger.info("Command line arguments: " + str(args))
if hasattr(args, "config_file") and args.config_file != "":
logger.info(
"Contents of args.config_file={}:\n{}".format(
args.config_file, PathManager.open(args.config_file, "r").read()
)
)
abbrev_name=str(abbrev_name),
)
else:
formatter = plain_formatter
ch.setFormatter(formatter)
logger.addHandler(ch)
# file logging: all workers
if output is not None:
if output.endswith(".txt") or output.endswith(".log"):
filename = output
else:
filename = os.path.join(output, "log.txt")
if distributed_rank > 0:
filename = filename + ".rank{}".format(distributed_rank)
PathManager.mkdirs(os.path.dirname(filename))
fh = logging.StreamHandler(_cached_log_stream(filename))
fh.setLevel(logging.DEBUG)
fh.setFormatter(plain_formatter)
logger.addHandler(fh)
return logger
def evaluate(self):
if self._distributed:
comm.synchronize()
self._predictions = comm.gather(self._predictions, dst=0)
self._predictions = list(itertools.chain(*self._predictions))
if not comm.is_main_process():
return
if len(self._predictions) == 0:
self._logger.warning("[LVISEvaluator] Did not receive valid predictions.")
return {}
if self._output_dir:
PathManager.mkdirs(self._output_dir)
file_path = os.path.join(self._output_dir, "instances_predictions.pth")
with PathManager.open(file_path, "wb") as f:
torch.save(self._predictions, f)
self._results = OrderedDict()
if "proposals" in self._predictions[0]:
self._eval_box_proposals()
if "instances" in self._predictions[0]:
self._eval_predictions(set(self._tasks))
# Copy so the caller can do whatever with results
return copy.deepcopy(self._results)