Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
D = IntegratorMechanism(
name='D',
default_variable=[0],
function=SimpleIntegrator(
rate=1
)
)
p = Process(
default_variable=[0],
pathway=[A, C],
name='p'
)
p1 = Process(
default_variable=[0],
pathway=[A, D],
name='p1'
)
q = Process(
default_variable=[0],
pathway=[B, C],
name='q'
)
q1 = Process(
default_variable=[0],
pathway=[B, D],
name='q1'
)
def test_udf_system_origin(self):
def myFunction(variable, params, context):
return [variable[0][1], variable[0][0]]
myMech = ProcessingMechanism(function=myFunction, size=3, name='myMech')
T = TransferMechanism(size=2, function=Linear)
p = Process(pathway=[myMech, T])
s = System(processes=[p])
s.run(inputs = {myMech: [[1, 3, 5]]})
assert np.allclose(s.results[0][0], [3, 1])
def test_reinitialize_one_mechanism_at_trial_2_condition(self):
A = TransferMechanism(name='A')
B = TransferMechanism(
name='B',
integrator_mode=True,
integration_rate=0.5
)
C = TransferMechanism(name='C')
abc_process = Process(pathway=[A, B, C])
abc_system = System(processes=[abc_process])
# Set reinitialization condition
B.reinitialize_when = AtTrial(2)
C.log.set_log_conditions('value')
abc_system.run(
inputs={A: [1.0]},
reinitialize_values={B: [0.]},
num_trials=5
)
# Trial 0: 0.5, Trial 1: 0.75, Trial 2: 0.5, Trial 3: 0.75. Trial 4: 0.875
assert np.allclose(
C.log.nparray_dictionary('value')[abc_system.default_execution_id]['value'],
precision = 0.000000001 # how far we accept error between PNL and Leabra output
random_seed = 3 # because Leabra network initializes with small random weights
random.seed(random_seed)
L_spec = LeabraMechanism(input_size=in_size, output_size=out_size, hidden_layers=num_hidden, training_flag=train)
random.seed(random_seed)
leabra_net = build_leabra_network(in_size, out_size, num_hidden, None, train)
leabra_net2 = copy.deepcopy(leabra_net)
L_net = LeabraMechanism(leabra_net2)
# leabra_net should be identical to the network inside L_net
T1_spec = TransferMechanism(name='T1', size=in_size, function=Linear)
T2_spec = TransferMechanism(name='T2', size=out_size, function=Linear)
T1_net = TransferMechanism(name='T1', size=in_size, function=Linear)
T2_net = TransferMechanism(name='T2', size=out_size, function=Linear)
p1_spec = Process(pathway=[T1_spec, L_spec])
proj_spec = MappingProjection(sender=T2_spec, receiver=L_spec.input_states[1])
p2_spec = Process(pathway=[T2_spec, proj_spec, L_spec])
s_spec = System(processes=[p1_spec, p2_spec])
p1_net = Process(pathway=[T1_net, L_net])
proj_net = MappingProjection(sender=T2_net, receiver=L_net.input_states[1])
p2_net = Process(pathway=[T2_net, proj_net, L_net])
s_net = System(processes=[p1_net, p2_net])
for i in range(num_trials): # training round
out_spec = s_spec.run(inputs={T1_spec: inputs[i], T2_spec: train_data[i]})
pnl_output_spec = out_spec[-1][0]
leabra_output = train_leabra_network(leabra_net, inputs[i], train_data[i])
diffs_spec = np.abs(np.array(pnl_output_spec) - np.array(leabra_output))
out_net = s_net.run(inputs={T1_net: inputs[i], T2_net: train_data[i]})
pnl_output_net = out_net[-1][0]
diffs_net = np.abs(np.array(pnl_output_net) - np.array(leabra_output))
name='B',
default_variable=[0],
function=SimpleIntegrator(
rate=1
)
)
C = IntegratorMechanism(
name='C',
default_variable=[0],
function=SimpleIntegrator(
rate=1
)
)
p = Process(
default_variable=[0],
pathway=[A, C],
name='p'
)
q = Process(
default_variable=[0],
pathway=[B, C],
name='q'
)
s = System(
processes=[p, q],
name='s'
)
def test_reinitialize_run(self):
R = RecurrentTransferMechanism(name="R",
initial_value=0.5,
integrator_mode=True,
integration_rate=0.1,
auto=1.0,
noise=0.0)
P = Process(name="P",
pathway=[R])
S = System(name="S",
processes=[P])
R.reinitialize_when = Never()
assert np.allclose(R.integrator_function.previous_value, 0.5)
S.run(inputs={R: 1.0},
num_trials=2,
initialize=True,
initial_values={R: 0.0})
# Trial 1 | variable = 1.0 + 0.0
# integration: 0.9*0.5 + 0.1*1.0 + 0.0 = 0.55 ---> previous value = 0.55
# linear fn: 0.55*1.0 = 0.55
# Trial 2 | variable = 1.0 + 0.55
# integration: 0.9*0.55 + 0.1*1.55 + 0.0 = 0.65 ---> previous value = 0.65
def test_input_not_provided_to_run(self):
T = TransferMechanism(name='T',
default_variable=[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
T2 = TransferMechanism(name='T2',
function=Linear(slope=2.0),
default_variable=[[0.0, 0.0]])
P = Process(pathway=[T, T2])
S = System(processes=[P])
run_result = S.run()
assert np.allclose(T.parameters.value.get(S), [[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
assert np.allclose(run_result, [[np.array([2.0, 4.0])]])
def test_heterogeneous_variables_drop_outer_list(self):
# from psyneulink.core.components.mechanisms.processing.objectivemechanism import ObjectiveMechanism
a = TransferMechanism(name='a', default_variable=[[0.0], [0.0,0.0]])
p1 = Process(pathway=[a])
s = System(
processes=[p1]
)
inputs = {a: [[1.0], [2.0, 2.0]]}
s.run(inputs)
name='p'
)
p1 = Process(
default_variable=[0],
pathway=[A, D],
name='p1'
)
q = Process(
default_variable=[0],
pathway=[B, C],
name='q'
)
q1 = Process(
default_variable=[0],
pathway=[B, D],
name='q1'
)
s = System(
processes=[p, p1, q, q1],
name='s'
)
term_conds = {TimeScale.TRIAL: All(AfterNCalls(C, 1), AfterNCalls(D, 1))}
stim_list = {A: [[1]], B: [[1]]}
sched = Scheduler(system=s)
sched.add_condition(B, EveryNCalls(A, 2))
sched.add_condition(C, EveryNCalls(A, 1))
output_states=[
DECISION_VARIABLE,
RESPONSE_TIME,
PROBABILITY_UPPER_THRESHOLD
],
name='Decision'
)
# Processes:
TaskExecutionProcess = Process(
default_variable=[0],
pathway=[Input, IDENTITY_MATRIX, Decision],
name='TaskExecutionProcess'
)
RewardProcess = Process(
default_variable=[0],
pathway=[Reward],
name='RewardProcess'
)
# System:
mySystem = System(
processes=[TaskExecutionProcess, RewardProcess],
controller=EVCControlMechanism,
enable_controller=True,
monitor_for_control=[
Reward,
Decision.PROBABILITY_UPPER_THRESHOLD,
(Decision.RESPONSE_TIME, -1, 1)
],
control_signals=[