Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_triangle_4b(self):
comp = Composition()
A = TransferMechanism(function=Linear(slope=5.0, intercept=2.0), name='scheduler-pytests-A')
B = TransferMechanism(function=Linear(intercept=4.0), name='scheduler-pytests-B')
C = TransferMechanism(function=Linear(intercept=1.5), name='scheduler-pytests-C')
for m in [A, B, C]:
comp.add_node(m)
comp.add_projection(MappingProjection(), A, B)
comp.add_projection(MappingProjection(), A, C)
sched = Scheduler(composition=comp)
sched.add_condition(A, EveryNPasses(1))
sched.add_condition(B, EveryNCalls(A, 2))
sched.add_condition(C, All(WhenFinished(A), AfterNCalls(B, 3)))
termination_conds = {}
termination_conds[TimeScale.RUN] = AfterNTrials(1)
termination_conds[TimeScale.TRIAL] = AfterNCalls(C, 1)
output = []
i = 0
A.is_finished_flag = False
for step in sched.run(termination_conds=termination_conds):
if i == 10:
A.is_finished_flag = True
map_nouns_h1 = MappingProjection(matrix=np.random.rand(8,8),
name="map_nouns_h1",
sender=nouns_in,
receiver=h1)
map_rels_h2 = MappingProjection(matrix=np.random.rand(3,15),
name="map_relh2",
sender=rels_in,
receiver=h2)
map_h1_h2 = MappingProjection(matrix=np.random.rand(8,15),
name="map_h1_h2",
sender=h1,
receiver=h2)
map_h2_I = MappingProjection(matrix=np.random.rand(15,8),
name="map_h2_I",
sender=h2,
receiver=out_sig_I)
map_h2_is = MappingProjection(matrix=np.random.rand(15,12),
name="map_h2_is",
sender=h2,
receiver=out_sig_is)
map_h2_has = MappingProjection(matrix=np.random.rand(15,9),
name="map_h2_has",
sender=h2,
receiver=out_sig_has)
map_h2_can = MappingProjection(matrix=np.random.rand(15,9),
name="map_h2_can",
sender=h2,
receiver=out_sig_has)
map_h2_can = MappingProjection(matrix=np.random.rand(15,9),
name="map_h2_can",
sender=h2,
receiver=out_sig_can)
# SET UP PROJECTIONS FOR SYSTEM
map_nouns_h1_sys = MappingProjection(matrix=map_nouns_h1.matrix.copy(),
name="map_nouns_h1_sys",
sender=nouns_in_sys,
receiver=h1_sys)
map_rels_h2_sys = MappingProjection(matrix=map_rels_h2.matrix.copy(),
name="map_relh2_sys",
sender=rels_in_sys,
receiver=h2_sys)
map_h1_h2_sys = MappingProjection(matrix=map_h1_h2.matrix.copy(),
name="map_h1_h2_sys",
sender=h1_sys,
receiver=h2_sys)
map_h2_I_sys = MappingProjection(matrix=map_h2_I.matrix.copy(),
name="map_h2_I_sys",
sender=h2_sys,
receiver=out_sig_I_sys)
map_h2_is_sys = MappingProjection(matrix=map_h2_is.matrix.copy(),
name="map_h2_is_sys",
def test_params_stay_separate(self,mode):
xor_in = TransferMechanism(name='xor_in',
default_variable=np.zeros(2))
xor_hid = TransferMechanism(name='xor_hid',
default_variable=np.zeros(10),
function=Logistic())
xor_out = TransferMechanism(name='xor_out',
default_variable=np.zeros(1),
function=Logistic())
hid_m = np.random.rand(2,10)
out_m = np.random.rand(10,1)
hid_map = MappingProjection(name='hid_map',
matrix=hid_m.copy(),
sender=xor_in,
receiver=xor_hid)
out_map = MappingProjection(name='out_map',
matrix=out_m.copy(),
sender=xor_hid,
receiver=xor_out)
xor = AutodiffComposition(param_init_from_pnl=True,
learning_rate=10.0,
optimizer_type="sgd")
xor.add_node(xor_in)
xor.add_node(xor_hid)
xor.add_node(xor_out)
outer_composition.add_node(inner_composition_2)
outer_composition.add_node(mechanism_d)
inner_composition_1._analyze_graph()
outer_composition.add_projection(projection=MappingProjection(), sender=inner_composition_1,
receiver=mechanism_d)
inner_composition_2._analyze_graph()
outer_composition.add_projection(projection=MappingProjection(), sender=inner_composition_2,
receiver=mechanism_d)
sched = Scheduler(composition=outer_composition)
# FIX: order of InputPorts on inner composition 1 is not stable
output = outer_composition.run(
inputs={
inner_composition_1: {A: [[2.0], [1.5], [2.5]],
B: [[1.0], [1.5], [1.5]]},
inner_composition_2: [[12.0], [11.5], [12.5]]},
scheduler=sched,
bin_execute=mode
)
# trial 0:
# inner composition 1 = (0.5*2.0 + 2.0*1.0) * 3.0 = 9.0
def test_WhenFinishedAny_1(self):
comp = Composition()
A = TransferMechanism(function=Linear(slope=5.0, intercept=2.0), name='A')
A.is_finished_flag = True
B = TransferMechanism(function=Linear(intercept=4.0), name='B')
B.is_finished_flag = True
C = TransferMechanism(function=Linear(intercept=1.5), name='C')
for m in [A, B, C]:
comp.add_node(m)
comp.add_projection(MappingProjection(), A, C)
comp.add_projection(MappingProjection(), B, C)
sched = Scheduler(composition=comp)
sched.add_condition(A, EveryNPasses(1))
sched.add_condition(B, EveryNPasses(1))
sched.add_condition(C, WhenFinishedAny(A, B))
termination_conds = {}
termination_conds[TimeScale.RUN] = AfterNTrials(1)
termination_conds[TimeScale.TRIAL] = AfterNCalls(C, 1)
output = list(sched.run(termination_conds=termination_conds))
expected_output = [
set([A, B]), C
]
assert output == pytest.helpers.setify_expected_output(expected_output)
default_variable=np.zeros(9),
function=Logistic())
# SET UP PROJECTIONS FOR SEMANTIC NET
map_nouns_h1 = MappingProjection(matrix=np.random.rand(8,8),
name="map_nouns_h1",
sender=nouns_in,
receiver=h1)
map_rels_h2 = MappingProjection(matrix=np.random.rand(3,15),
name="map_relh2",
sender=rels_in,
receiver=h2)
map_h1_h2 = MappingProjection(matrix=np.random.rand(8,15),
name="map_h1_h2",
sender=h1,
receiver=h2)
map_h2_I = MappingProjection(matrix=np.random.rand(15,8),
name="map_h2_I",
sender=h2,
receiver=out_sig_I)
map_h2_is = MappingProjection(matrix=np.random.rand(15,12),
name="map_h2_is",
sender=h2,
receiver=out_sig_is)
map_h2_has = MappingProjection(matrix=np.random.rand(15,9),
name="map_h2_has",
out_sig_is_sys = TransferMechanism(name="sig_outs_is_sys",
default_variable=np.zeros(12),
function=Logistic())
out_sig_has_sys = TransferMechanism(name="sig_outs_has_sys",
default_variable=np.zeros(9),
function=Logistic())
out_sig_can_sys = TransferMechanism(name="sig_outs_can_sys",
default_variable=np.zeros(9),
function=Logistic())
# SET UP PROJECTIONS FOR SEMANTIC NET
map_nouns_h1 = MappingProjection(matrix=np.random.rand(8,8),
name="map_nouns_h1",
sender=nouns_in,
receiver=h1)
map_rels_h2 = MappingProjection(matrix=np.random.rand(3,15),
name="map_relh2",
sender=rels_in,
receiver=h2)
map_h1_h2 = MappingProjection(matrix=np.random.rand(8,15),
name="map_h1_h2",
sender=h1,
receiver=h2)
map_h2_I = MappingProjection(matrix=np.random.rand(15,8),
name="map_h2_I",
"""Tests same configuration as control of InputPort in tests/mechansims/test_identicalness_of_control_and_gating
"""
Input_Layer = TransferMechanism(name='Input Layer', function=Logistic, size=2)
Hidden_Layer_1 = TransferMechanism(name='Hidden Layer_1', function=Logistic, size=5)
Hidden_Layer_2 = TransferMechanism(name='Hidden Layer_2', function=Logistic, size=4)
Output_Layer = TransferMechanism(name='Output Layer', function=Logistic, size=3)
Gating_Mechanism = GatingMechanism(size=[1], gate=[Hidden_Layer_1, Hidden_Layer_2, Output_Layer])
Input_Weights_matrix = (np.arange(2 * 5).reshape((2, 5)) + 1) / (2 * 5)
Middle_Weights_matrix = (np.arange(5 * 4).reshape((5, 4)) + 1) / (5 * 4)
Output_Weights_matrix = (np.arange(4 * 3).reshape((4, 3)) + 1) / (4 * 3)
# This projection is specified in add_backpropagation_learning_pathway method below
Input_Weights = MappingProjection(name='Input Weights',matrix=Input_Weights_matrix)
# This projection is "discovered" by add_backpropagation_learning_pathway method below
Middle_Weights = MappingProjection(name='Middle Weights',sender=Hidden_Layer_1,receiver=Hidden_Layer_2,
matrix={
VALUE: Middle_Weights_matrix,
FUNCTION: AccumulatorIntegrator,
FUNCTION_PARAMS: {
DEFAULT_VARIABLE: Middle_Weights_matrix,
INITIALIZER: Middle_Weights_matrix,
RATE: Middle_Weights_matrix
},
}
)
Output_Weights = MappingProjection(sender=Hidden_Layer_2, receiver=Output_Layer, matrix=Output_Weights_matrix)
pathway = [Input_Layer, Input_Weights, Hidden_Layer_1, Hidden_Layer_2, Output_Layer]
comp = Composition()
learning_components = comp.add_backpropagation_learning_pathway(pathway=pathway,
loss_function=None)
"""
from psyneulink.core.components.projections.pathway.mappingprojection import MappingProjection
from psyneulink.core.components.ports.parameterport import ParameterPort
# this mirrors the transformation in _function
# it is a hack, and a general solution should be found
squeezed = np.array(self.defaults.variable)
if squeezed.ndim > 1:
squeezed = np.squeeze(squeezed)
size = safe_len(squeezed)
matrix = self.parameters.matrix._get(context)
if isinstance(matrix, MappingProjection):
matrix = matrix._parameter_ports[MATRIX]
elif isinstance(matrix, ParameterPort):
pass
else:
matrix = get_matrix(matrix, size, size)
self.parameters.matrix._set(matrix, context)
self._hollow_matrix = get_matrix(HOLLOW_MATRIX, size, size)
default_variable = [self.defaults.variable,
self.defaults.variable]
if self.metric == ENTROPY:
self.metric_fct = Distance(default_variable=default_variable, metric=CROSS_ENTROPY, normalize=self.normalize)
elif self.metric in DISTANCE_METRICS._set():