Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
See Also
--------
run_unit_tests
run_examples
run_system
run_release
'''
# Print test set welcome
print('\n-'.ljust(OUTPUT_WIDTH + 1, '-'))
print('Running Science Tests')
print('-'.ljust(OUTPUT_WIDTH, '-'))
# Get setup
config = read_config(config_file)
# drop invalid driver tests
config = drop_tests(config, driver)
test_results = OrderedDict()
# Run individual tests
for i, (testname, test_dict) in enumerate(config.items()):
# print out status info
print('Running test {0}/{1}: {2}'.format(i + 1, len(config.items()),
testname))
# Setup directories for test
dirs = setup_test_dirs(testname, out_dir,
mkdirs=['results', 'state', 'logs', 'plots'])
See Also
--------
run_unit_tests
run_system
run_science
run_release
'''
# Print test set welcome
print('\n-'.ljust(OUTPUT_WIDTH + 1, '-'))
print('Running Examples')
print('-'.ljust(OUTPUT_WIDTH, '-'))
# Get setup
config = read_config(config_file)
# drop invalid driver tests
config = drop_tests(config, driver)
test_results = OrderedDict()
# Run individual examples
for i, (testname, test_dict) in enumerate(config.items()):
# print out status info
print('Running test {0}/{1}: {2}'.format(i + 1, len(config.items()),
testname))
# Setup directories for test
dirs = setup_test_dirs(testname, out_dir,
mkdirs=['results', 'state', 'logs', 'plots'])
def check_for_nans(df):
"""check if dataframe has nans in it"""
if df.isnull().any().any():
raise VICTestError('VIC output has nans in it!')
else:
return
# -------------------------------------------------------------------- #
default='$WORKDIR/VIC_tests_{0}'.format(ymd))
parser.add_argument('--data_dir', type=str,
help='directory to find test data',
default=os.path.join(test_dir, '../samples/VIC_sample_data'))
args = parser.parse_args()
# Define test directories
data_dir = args.data_dir
out_dir = os.path.expandvars(args.output_dir)
os.makedirs(out_dir, exist_ok=True)
# Validate input directories
if not (len(args.tests) == 1 and args.tests[0] == 'unit'):
for d in [data_dir, test_dir]:
if not os.path.exists(d):
raise VICTestError('Directory: {0} does not exist'.format(d))
# Print welcome information
print(description)
print('\nStarting tests now...Start Time: {0}\n'.format(starttime))
print('Running Test Set: {0}'.format(', '.join(args.tests)))
# Setup VIC executable
if not (len(args.tests) == 1 and args.tests[0] == 'unit'):
vic_exe = VIC(args.vic_exe)
print('VIC version information:\n\n{0}'.format(vic_exe.version.decode()))
# run test sets
# unit
if any(i in ['all', 'unit'] for i in args.tests):
test_results['unit'] = run_unit_tests(test_dir)
def process_error(error, vic_exe):
'''Helper function to process possible error raised during testing'''
tail = None
if isinstance(error, VICRuntimeError):
test_comment = 'Test failed during simulation'
tail = vic_exe.stderr
elif isinstance(error, VICTestError):
test_comment = 'Test failed during testing of output files'
elif isinstance(error, VICValgrindError):
test_comment = 'Test failed due to memory error detected by valgrind'
tail = vic_exe.stderr
elif isinstance(error, VICReturnCodeError):
test_comment = 'Test failed due to incorrect return code'
tail = vic_exe.stderr
elif isinstance(error, AssertionError):
test_comment = 'AssertionError raised during testing'
else:
test_comment = 'Unknown test failure'
traceback.print_stack()
print('\t{0}'.format(test_comment))
print('\t{0}'.format(error))
if tail is not None:
def check_completed(df, start, end):
if not df.index[0] == start:
raise VICTestError(
'Start dates ({0} and {1}) do not match'.format(df.index[0], start))
if not df.index[-1] == end:
raise VICTestError(
'End dates ({0} and {1}) do not match'.format(df.index[-1], end))
return
days=1)).strftime('%Y%m%d'),
0))
if state_format == 'ASCII':
states = read_ascii_state(state_fname)
elif state_format == 'BINARY':
states = read_binary_state(state_fname)
# Compare split run states with full run
# --- If ASCII state file, check if almost the same ---#
if state_format == 'ASCII':
np.testing.assert_almost_equal(states, states_full_run, decimal=3,
err_msg='States are not a '
'close match')
# --- If BINARY state file, check if exactly the same ---#
elif state_format == 'BINARY':
if states != states_full_run:
raise VICTestError('Restart causes inexact state outputs!')
elif driver == 'image':
# Read the state file at the end of the last period of run
state_fname = os.path.join(
state_basedir,
'{}_{}'.format(
run_last_period_start_date.strftime('%Y%m%d'),
run_last_period_end_date.strftime('%Y%m%d')),
'states.{}_{:05d}.nc'.format(
(run_last_period_end_date +
datetime.timedelta(
days=1)).strftime('%Y%m%d'),
0))
ds_states = xr.open_dataset(state_fname)
# Compare split run states with full run
for var in ds_states.data_vars:
def run_scaling(args):
'''wrapper function for scaling tests'''
config = hosts[args.host]
vic_exe = VIC(args.vic_exe)
# write timing file header
header = string.Template(table_header)
header_kwargs = get_header_info(args.vic_exe, args.global_param)
header = header.safe_substitute(**header_kwargs)
with open(args.timing, 'w') as f:
f.write(header)
for i, kwargs in enumerate(config.profile):
if config.template:
# run on a cluster of some kind
# start by printing the template
print('-'.ljust(OUT_WIDTH, '-'))
print('{host} template'.format(
host=args.host).center(OUT_WIDTH))
print('-'.ljust(OUT_WIDTH, '-'))
os.makedirs(out_dir, exist_ok=True)
# Validate input directories
if not (len(args.tests) == 1 and args.tests[0] == 'unit'):
for d in [data_dir, test_dir]:
if not os.path.exists(d):
raise VICTestError('Directory: {0} does not exist'.format(d))
# Print welcome information
print(description)
print('\nStarting tests now...Start Time: {0}\n'.format(starttime))
print('Running Test Set: {0}'.format(', '.join(args.tests)))
# Setup VIC executable
if not (len(args.tests) == 1 and args.tests[0] == 'unit'):
vic_exe = VIC(args.vic_exe)
print('VIC version information:\n\n{0}'.format(vic_exe.version.decode()))
# run test sets
# unit
if any(i in ['all', 'unit'] for i in args.tests):
test_results['unit'] = run_unit_tests(test_dir)
# system
if any(i in ['all', 'system'] for i in args.tests):
test_results['system'] = run_system(args.system, vic_exe, data_dir,
os.path.join(out_dir, 'system'),
args.driver)
# science
if any(i in ['all', 'science'] for i in args.tests):
test_results['science'] = run_science(args.science, vic_exe, data_dir,
os.path.join(out_dir, 'science'),
See Also
--------
run_unit_tests
run_examples
run_science
run_release
'''
# Print test set welcome
print('\n-'.ljust(OUTPUT_WIDTH + 1, '-'))
print('Running System Tests')
print('-'.ljust(OUTPUT_WIDTH, '-'))
# Get setup
config = read_configobj(config_file)
# drop invalid driver tests
config = drop_tests(config, driver)
test_results = OrderedDict()
# Run individual system tests
for i, (testname, test_dict) in enumerate(config.items()):
# print out status info
print('Running test {0}/{1}: {2}'.format(i + 1, len(config.items()),
testname))
# Setup directories for test
dirs = setup_test_dirs(testname, out_dir,
mkdirs=['results', 'state', 'logs', 'plots'])