Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_recurrent_mech_system_auto_and_hetero_change(self):
R = RecurrentTransferMechanism(
size=4,
auto=[1, 2, 3, 4],
hetero=[[-1, -2, -3, -4]] * 4)
T = TransferMechanism(
size=5,
function=Linear)
p = Process(size=4, pathway=[R, T], prefs=TestRecurrentTransferMechanismInSystem.simple_prefs)
s = System(processes=[p], prefs=TestRecurrentTransferMechanismInSystem.simple_prefs)
s.run(inputs={R: [[1, 2, 3, -0.5]]})
np.testing.assert_allclose(R.parameters.value.get(s), [[1., 2., 3., -0.5]])
np.testing.assert_allclose(T.parameters.value.get(s), [[5.5, 5.5, 5.5, 5.5, 5.5]])
R.parameters.hetero.set(0, s)
s.run(inputs={R: [[-1.5, 0, 1, 2]]})
np.testing.assert_allclose(R.parameters.value.get(s), [[-.5, 4, 10, 0]])
np.testing.assert_allclose(T.parameters.value.get(s), [[13.5, 13.5, 13.5, 13.5, 13.5]])
R.parameters.auto.set([0, 0, 0, 0], s)
s.run(inputs={R: [[12, 11, 10, 9]]})
np.testing.assert_allclose(R.parameters.value.get(s), [[12, 11, 10, 9]])
np.testing.assert_allclose(T.parameters.value.get(s), [[42, 42, 42, 42, 42]])
TaskExecutionProcess = Process(
# default_variable=[0],
size=1,
pathway=[(Input), IDENTITY_MATRIX, (Decision)],
name='TaskExecutionProcess',
)
RewardProcess = Process(
# default_variable=[0],
size=1,
pathway=[(Reward)],
name='RewardProcess',
)
# System:
mySystem = System(
processes=[TaskExecutionProcess, RewardProcess],
controller=EVCControlMechanism,
enable_controller=True,
monitor_for_control=[
Reward,
Decision.PROBABILITY_UPPER_THRESHOLD,
(Decision.RESPONSE_TIME, -1, 1)
],
name='EVC Test System',
# prefs={RECORD_SIMULATION_PREF:True}
)
TaskExecutionProcess.prefs.paramValidationPref = False
RewardProcess.prefs.paramValidationPref = False
mySystem.prefs.paramValidationPref = False
mySystem.recordSimulationPref = True
learning=LEARNING,
target=[2, 2],
name='Color Naming',
prefs=process_prefs,
)
word_reading_process = Process(
default_variable=[.5, 3],
pathway=[words, WH_Weights, hidden],
name='Word Reading',
learning=LEARNING,
target=[3, 3],
prefs=process_prefs,
)
s = System(
processes=[color_naming_process, word_reading_process],
targets=[20, 20],
name='Stroop Model',
prefs=system_prefs,
)
def show_target():
print('\nColor Naming\n\tInput: {}\n\tTarget: {}'.format([np.ndarray.tolist(item.parameters.value.get(s)) for item in colors.input_ports], s.targets))
print('Wording Reading:\n\tInput: {}\n\tTarget: {}\n'.format([np.ndarray.tolist(item.parameters.value.get(s)) for item in words.input_ports], s.targets))
print('Response: \n', response.output_port.parameters.value.get(s))
print('Hidden-Output:')
print(HO_Weights.get_mod_matrix(s))
print('Color-Hidden:')
print(CH_Weights.get_mod_matrix(s))
print('Word-Hidden:')
print(WH_Weights.get_mod_matrix(s))
L_spec = LeabraMechanism(input_size=in_size, output_size=out_size, hidden_layers=num_hidden, training_flag=train)
random.seed(random_seed)
leabra_net = build_leabra_network(in_size, out_size, num_hidden, None, train)
leabra_net2 = copy.deepcopy(leabra_net)
L_net = LeabraMechanism(leabra_net2)
# leabra_net should be identical to the network inside L_net
T1_spec = TransferMechanism(name='T1', size=in_size, function=Linear)
T2_spec = TransferMechanism(name='T2', size=out_size, function=Linear)
T1_net = TransferMechanism(name='T1', size=in_size, function=Linear)
T2_net = TransferMechanism(name='T2', size=out_size, function=Linear)
p1_spec = Process(pathway=[T1_spec, L_spec])
proj_spec = MappingProjection(sender=T2_spec, receiver=L_spec.input_states[1])
p2_spec = Process(pathway=[T2_spec, proj_spec, L_spec])
s_spec = System(processes=[p1_spec, p2_spec])
p1_net = Process(pathway=[T1_net, L_net])
proj_net = MappingProjection(sender=T2_net, receiver=L_net.input_states[1])
p2_net = Process(pathway=[T2_net, proj_net, L_net])
s_net = System(processes=[p1_net, p2_net])
for i in range(num_trials): # training round
out_spec = s_spec.run(inputs={T1_spec: inputs[i], T2_spec: train_data[i]})
pnl_output_spec = out_spec[-1][0]
leabra_output = train_leabra_network(leabra_net, inputs[i], train_data[i])
diffs_spec = np.abs(np.array(pnl_output_spec) - np.array(leabra_output))
out_net = s_net.run(inputs={T1_net: inputs[i], T2_net: train_data[i]})
pnl_output_net = out_net[-1][0]
diffs_net = np.abs(np.array(pnl_output_net) - np.array(leabra_output))
assert all(diffs_spec < precision) and all(diffs_net < precision)
# assert np.sum(np.abs(pnl_output_spec - np.array(train_data[0]))) < 0.1
def test_dict_of_floats(self):
input_labels_dict = {"red": 1,
"green": 0}
output_labels_dict = {"red": 1,
"green":0}
M = ProcessingMechanism(params={INPUT_LABELS_DICT: input_labels_dict,
OUTPUT_LABELS_DICT: output_labels_dict})
P = Process(pathway=[M])
S = System(processes=[P])
store_output_labels = []
def call_after_trial():
store_output_labels.append(M.get_output_labels(S))
S.run(inputs=['red', 'green', 'green', 'red'],
call_after_trial=call_after_trial)
assert np.allclose(S.results, [[[1.]], [[0.]], [[0.]], [[1.]]])
assert store_output_labels == [['red'], ['green'], ['green'], ['red']]
store_output_labels = []
S.run(inputs=[1, 'green', 0, 'red'],
call_after_trial=call_after_trial)
assert np.allclose(S.results, [[[1.]], [[0.]], [[0.]], [[1.]], [[1.]], [[0.]], [[0.]], [[1.]]])
assert store_output_labels == [['red'], ['green'], ['green'], ['red']]
function=Linear(slope=1.0),
)
p = Process(
default_variable=[0],
pathway=[A, B, D],
name='p'
)
q = Process(
default_variable=[0],
pathway=[A, C, D],
name='q'
)
s = System(
processes=[p, q],
name='s'
)
term_conds = {TimeScale.TRIAL: AfterNCalls(D, 1)}
stim_list = {A: [[1]]}
sched = Scheduler(system=s)
sched.add_condition(B, EveryNCalls(A, 1))
sched.add_condition(C, EveryNCalls(A, 2))
sched.add_condition(D, Any(EveryNCalls(B, 3), EveryNCalls(C, 3)))
s.scheduler_processing = sched
s.run(
inputs=stim_list,
termination_processing=term_conds
def test_system_run_with_condition(self):
# Construction
T = TransferMechanism()
P = Process(pathway=[T])
S = System(processes=[P])
# Runtime param used for noise
# ONLY mechanism value should reflect runtime param -- attr should be changed back by the time we inspect it
S.run(inputs={T: 2.0},
runtime_params={T: {"noise": (10.0, AtTrial(1))}},
num_trials=4)
# Runtime param NOT used for noise
S.run(inputs={T: 2.0})
assert np.allclose(S.results, [[np.array([2.])], # Trial 0 - condition not satisfied yet
[np.array([12.])], # Trial 1 - condition satisfied
[np.array([2.])], # Trial 2 - condition no longer satisfied (not sticky)
[np.array([2.])], # Trial 3 - condition no longer satisfied (not sticky)
[np.array([2.])]]) # New run (runtime param no longer applies)
def test_dict_of_floats(self):
input_labels_dict = {"red": 1,
"green":0}
M = ProcessingMechanism(params={INPUT_LABELS_DICT: input_labels_dict})
P = Process(pathway=[M])
S = System(processes=[P])
store_input_labels = []
def call_after_trial():
store_input_labels.append(M.get_input_labels(S))
S.run(inputs=['red', 'green', 'green', 'red'],
call_after_trial=call_after_trial)
assert np.allclose(S.results, [[[1.]], [[0.]], [[0.]], [[1.]]])
assert store_input_labels == [['red'], ['green'], ['green'], ['red']]
S.run(inputs=[1, 'green', 0, 'red'])
assert np.allclose(S.results, [[[1.]], [[0.]], [[0.]], [[1.]], [[1.]], [[0.]], [[0.]], [[1.]]])
function=Linear(slope=2.0),
)
p = Process(
default_variable=[0],
pathway=[A, C],
name='p'
)
q = Process(
default_variable=[0],
pathway=[B, C],
name='q'
)
s = System(
processes=[p, q],
name='s'
)
term_conds = {TimeScale.TRIAL: AfterNCalls(C, 2)}
stim_list = {A: [[1]], B: [[2]]}
sched = Scheduler(system=s)
sched.add_condition(C, All(EveryNCalls(A, 1), EveryNCalls(B, 1)))
s.scheduler_processing = sched
s.run(
inputs=stim_list,
termination_processing=term_conds
)
def test_branch(self):
from psyneulink.core.globals.keywords import ENABLED
mech_1 = TransferMechanism(name='M1', size=1)
mech_2 = TransferMechanism(name='M2', size=2)
mech_3 = TransferMechanism(name='M3', size=3)
mech_4 = TransferMechanism(name='M4', size=4)
mech_5 = TransferMechanism(name='M5', size=5)
mech_6 = TransferMechanism(name='M6', size=6)
process_A = Process(pathway=[mech_1, mech_2, mech_3, mech_4], learning=ENABLED, name='Process A')
process_B = Process(pathway=[mech_5, mech_6, mech_4], learning=ENABLED, name='Process B')
S = System(processes=[process_A, process_B])
lm = mech_1.efferents[0].learning_mechanism
assert 'LearningMechanism for MappingProjection from M2 to M3' in [m.name for m in S.learningGraph[lm]]
lm = mech_2.efferents[0].learning_mechanism
assert 'LearningMechanism for MappingProjection from M3 to M4' in [m.name for m in S.learningGraph[lm]]
lm = mech_3.efferents[0].learning_mechanism
assert 'M4 ComparatorMechanism' in [m.name for m in S.learningGraph[lm]]
cm = mech_4.efferents[0].receiver.owner
assert cm in S.learningGraph.keys()
lm = mech_5.efferents[0].learning_mechanism
assert 'LearningMechanism for MappingProjection from M6 to M4' in [m.name for m in S.learningGraph[lm]]
lm = mech_6.efferents[0].learning_mechanism
assert 'M4 ComparatorMechanism' in [m.name for m in S.learningGraph[lm]]