Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_windows_short_path_file(self):
if platform.system() != 'Windows':
return
original_path = os.path.join(_TMPDIR, 'new path', 'my_file.csv')
os.makedirs(os.path.split(original_path)[0], exist_ok=True)
assert os.path.exists(os.path.split(original_path)[0])
assert ' ' in original_path
assert os.path.splitext(original_path)[1] == '.csv'
short_path = windows_short_path(original_path)
assert os.path.exists(os.path.split(short_path)[0])
assert original_path != short_path
assert ' ' not in short_path
assert os.path.splitext(short_path)[1] == '.csv'
):
open(fname, 'x').close()
CmdStanArgs(
model_name='bernoulli',
model_exe='bernoulli.exe',
chain_ids=[1, 2, 3, 4],
output_dir=fname,
method_args=sampler_args,
)
if os.path.exists(fname):
os.remove(fname)
# TODO: read-only dir test for Windows - set ACLs, not mode
if platform.system() == 'Darwin' or platform.system() == 'Linux':
with self.assertRaises(ValueError):
read_only = os.path.join(_TMPDIR, 'read_only')
os.mkdir(read_only, mode=0o444)
CmdStanArgs(
model_name='bernoulli',
model_exe='bernoulli.exe',
chain_ids=[1, 2, 3, 4],
output_dir=read_only,
method_args=sampler_args,
)
dict_zero_vec = {'a': []}
file_zero_vec = os.path.join(_TMPDIR, 'empty_vec.json')
jsondump(file_zero_vec, dict_zero_vec)
with open(file_zero_vec) as fd:
cmp(json.load(fd), dict_zero_vec)
dict_zero_matrix = {'a': [[], [], []]}
file_zero_matrix = os.path.join(_TMPDIR, 'empty_matrix.json')
jsondump(file_zero_matrix, dict_zero_matrix)
with open(file_zero_matrix) as fd:
cmp(json.load(fd), dict_zero_matrix)
arr = np.zeros(shape=(5, 0))
dict_zero_matrix = {'a': arr}
file_zero_matrix = os.path.join(_TMPDIR, 'empty_matrix.json')
jsondump(file_zero_matrix, dict_zero_matrix)
with open(file_zero_matrix) as fd:
cmp(json.load(fd), dict_zero_matrix)
try:
os.makedirs(dir, exist_ok=True)
with open(test_path, 'w'):
pass
os.remove(test_path) # cleanup
except OSError:
raise Exception('cannot save to path: {}'.format(dir))
for i in range(self.chains):
if not os.path.exists(self._csv_files[i]):
raise ValueError(
'cannot access csv file {}'.format(self._csv_files[i])
)
path, filename = os.path.split(self._csv_files[i])
if path == _TMPDIR: # cleanup tmpstr in filename
root, ext = os.path.splitext(filename)
rlist = root.split('-')
root = '-'.join(rlist[:-1])
filename = ''.join([root, ext])
to_path = os.path.join(dir, filename)
if os.path.exists(to_path):
raise ValueError(
'file exists, not overwriting: {}'.format(to_path)
)
try:
self._logger.debug(
'saving tmpfile: "%s" as: "%s"', self._csv_files[i], to_path
)
shutil.move(self._csv_files[i], to_path)
self._csv_files[i] = to_path
def summary(self) -> pd.DataFrame:
"""
Run cmdstan/bin/stansummary over all output csv files.
Echo stansummary stdout/stderr to console.
Assemble csv tempfile contents into pandasDataFrame.
"""
cmd_path = os.path.join(
cmdstan_path(), 'bin', 'stansummary' + EXTENSION
)
tmp_csv_file = 'stansummary-{}-{}-chain-'.format(
self.runset._args.model_name, self.runset.chains
)
tmp_csv_path = create_named_text_file(
dir=_TMPDIR, prefix=tmp_csv_file, suffix='.csv'
)
cmd = [
cmd_path,
'--csv_file={}'.format(tmp_csv_path),
] + self.runset.csv_files
do_command(cmd, logger=self.runset._logger)
with open(tmp_csv_path, 'rb') as fd:
summary_data = pd.read_csv(
fd, delimiter=',', header=0, index_col=0, comment='#'
)
mask = [x == 'lp__' or not x.endswith('__') for x in summary_data.index]
return summary_data[mask]
csv_file = os.path.join(
output_dir, '{}-{}.{}'.format(file_basename, i + 1, 'csv')
)
self._csv_files[i] = csv_file
stdout_file = ''.join(
[os.path.splitext(csv_file)[0], '-stdout.txt']
)
self._stdout_files[i] = stdout_file
stderr_file = ''.join(
[os.path.splitext(csv_file)[0], '-stderr.txt']
)
self._stderr_files[i] = stderr_file
if args.save_diagnostics:
if args.output_dir is None:
diag_file = create_named_text_file(
dir=_TMPDIR,
prefix='{}-diagnostic-{}-'.format(file_basename, i + 1),
suffix='.csv',
)
else:
diag_file = os.path.join(
output_dir,
'{}-diagnostic-{}.{}'.format(
file_basename, i + 1, 'csv'
),
)
self._diagnostic_files[i] = diag_file
self._cmds.append(
args.compose_command(
i, self._csv_files[i], self._diagnostic_files[i]
)
)
'chains must be positive integer value, '
'found {}'.format(chains)
)
self._retcodes = [-1 for _ in range(chains)]
# stdout, stderr are written to text files
# prefix: ``--``
# suffixes: ``-stdout.txt``, ``-stderr.txt``
now = datetime.now()
now_str = now.strftime('%Y%m%d%H%M')
file_basename = '-'.join([args.model_name, now_str])
if args.output_dir is not None:
output_dir = args.output_dir
else:
output_dir = _TMPDIR
self._csv_files = [None for _ in range(chains)]
self._diagnostic_files = [None for _ in range(chains)]
self._stdout_files = [None for _ in range(chains)]
self._stderr_files = [None for _ in range(chains)]
self._cmds = []
for i in range(chains):
if args.output_dir is None:
csv_file = create_named_text_file(
dir=output_dir,
prefix='{}-{}-'.format(file_basename, i + 1),
suffix='.csv',
)
else:
csv_file = os.path.join(
output_dir, '{}-{}.{}'.format(file_basename, i + 1, 'csv')
)
def __init__(self, *objs: Union[str, dict], logger: logging.Logger = None):
self._unlink = [False] * len(objs)
self._paths = [''] * len(objs)
self._logger = logger or get_logger()
i = 0
for obj in objs:
if isinstance(obj, dict):
data_file = create_named_text_file(
dir=_TMPDIR, prefix='', suffix='.json'
)
self._logger.debug('input tempfile: %s', data_file)
if any(
not item
for item in obj
if isinstance(item, (Sequence, np.ndarray))
):
rdump(data_file, obj)
else:
jsondump(data_file, obj)
self._paths[i] = data_file
self._unlink[i] = True
elif isinstance(obj, str):
if not os.path.exists(obj):
raise ValueError("File doesn't exist {}".format(obj))
self._paths[i] = obj