Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
x = np.arange(11) - 5.
y = np.linspace(0, 10, 51)
xx, yy = np.meshgrid(x, y, indexing='ij')
zz = np.sin(yy) + xx
data = MeshgridDataDict(
x=dict(values=xx),
y=dict(values=yy),
z=dict(values=zz, axes=['x', 'y'])
)
data.validate()
x = np.arange(11) - 5.
y = np.linspace(0, 10, 51)
xx, yy = np.meshgrid(x, y, indexing='ij')
zz = np.sin(yy) + xx
data2 = MeshgridDataDict(
reps=dict(values=xx),
y=dict(values=yy),
z=dict(values=zz, axes=['reps', 'y'])
)
data2.validate()
# make app and gui, fc
app = QtGui.QApplication([])
win, fc = flowchartAutoPlot([
('sub', SubtractAverage)
])
win.show()
# feed in data
fc.setInput(dataIn=data)
fc.setInput(dataIn=data2)
def test_xy_selector(qtbot):
"""Basic XY selector node test."""
XYSelector.uiClass = None
fc = linearFlowchart(('xysel', XYSelector))
node = fc.nodes()['xysel']
x = np.arange(5.0)
y = np.linspace(0, 1, 5)
z = np.arange(4.0, 6.0, 1.0)
xx, yy, zz = np.meshgrid(x, y, z, indexing='ij')
vals = xx * yy * zz
data = MeshgridDataDict(
x=dict(values=xx),
y=dict(values=yy),
z=dict(values=zz),
vals=dict(values=vals, axes=['x', 'y', 'z'])
)
assert data.validate()
fc.setInput(dataIn=data)
# this should return None, because no x/y axes were set.
assert fc.outputValues()['dataOut'] is None
# now select two axes, and test that the other one is correctly selected
node.xyAxes = ('x', 'y')
assert num.arrays_equal(
fc.outputValues()['dataOut'].data_vals('vals'),
def makeData():
xvals = np.linspace(0, 10, 51)
reps = np.arange(20)
xx, rr = np.meshgrid(xvals, reps, indexing='ij')
data = sinefunc(xx, amp=0.8, freq=0.25, phase=0.1)
noise = np.random.normal(scale=0.2, size=data.shape)
data += noise
dd = MeshgridDataDict(
x=dict(values=xx),
repetition=dict(values=rr),
sine=dict(values=data, axes=['x', 'repetition']),
)
return dd
SubtractAverage.useUi = False
SubtractAverage.uiClass = None
fc = linearFlowchart(
('Subtract Average', SubtractAverage),
)
node = fc.nodes()['Subtract Average']
x = np.arange(11) - 5.
y = np.linspace(0, 10, 51)
xx, yy = np.meshgrid(x, y, indexing='ij')
zz = np.sin(yy) + xx
zz_ref_avg_y = np.sin(yy) - np.sin(yy).mean()
data = MeshgridDataDict(
x = dict(values=xx),
y = dict(values=yy),
z = dict(values=zz, axes=['x', 'y'])
)
assert data.validate()
fc.setInput(dataIn=data)
assert num.arrays_equal(
zz,
fc.outputValues()['dataOut'].data_vals('z')
)
node.averagingAxis = 'y'
assert num.arrays_equal(
zz_ref_avg_y,
fc.outputValues()['dataOut'].data_vals('z'),
return None
data = data['dataOut'].copy()
self.axesList.emit(data.axes())
dout = None
method, opts = self._grid
if isinstance(data, DataDict):
if method is GridOption.noGrid:
dout = data.expand()
elif method is GridOption.guessShape:
dout = dd.datadict_to_meshgrid(data)
elif method is GridOption.specifyShape:
dout = dd.datadict_to_meshgrid(data, target_shape=opts['shape'])
elif isinstance(data, MeshgridDataDict):
if method is GridOption.noGrid:
dout = dd.meshgrid_to_datadict(data)
elif method is GridOption.guessShape:
dout = data
elif method is GridOption.specifyShape:
self.logger().warning(
f"Data is already on grid. Ignore shape.")
dout = data
else:
self.logger().error(
f"Unknown data type {type(data)}.")
return None
if dout is None:
return None
self._dataStructure = None
self._dataShapes = None
self._dataType = None
self._currentRoles = {}
#: This is a flag to control whether we need to emit signals when
#: a role has changed. broadly speaking, this is only desired when
#: the change comes from the user interacting with the UI, otherwise
#: it might lead to undesired recursion.
self.emitRoleChangeSignal = True
self.choices = {}
self.availableChoices = OrderedDict({
DataDictBase: ['None', ],
DataDict: [],
MeshgridDataDict: [],
})
# guess what the shape likely is.
if target_shape is None:
shp_specs = guess_shape_from_datadict(data)
shps = [shape for (order, shape) in shp_specs.values()]
if len(set(shps)) > 1:
raise ValueError('Cannot determine unique shape for all data.')
ret = list(shp_specs.values())[0]
if ret is None:
raise ValueError('Shape could not be inferred.')
# the guess-function returns both axis order as well as shape.
inner_axis_order, target_shape = ret
# construct new data
newdata = MeshgridDataDict(**data.structure(add_shape=False))
axlist = data.axes(data.dependents()[0])
for k, v in data.data_items():
vals = num.array1d_to_meshgrid(v['values'], target_shape, copy=True)
# if an inner axis order is given, we transpose to transform from that
# to the specified order.
if inner_axis_order is not None:
transpose_idxs = misc.reorder_indices(
inner_axis_order, axlist)
vals = vals.transpose(transpose_idxs)
newdata[k]['values'] = vals
newdata = newdata.sanitize()
newdata.validate()
'C' order (1st the slowest, last the fastest axis) then the
'true' inner order can be specified as a list of axes names, which has
to match the specified axes in all but order. The data is then
transposed to conform to the specified order.
**Note**: if this is given, then `target_shape` needs to be given in
in the order of this inner_axis_order. The output data will keep the
axis ordering specified in the `axes` property.
:param use_existing_shape: if ``True``, simply use the shape that the data
already has. For numpy-array data, this might already be present.
if ``False``, flatten and reshape.
:returns: the generated ``MeshgridDataDict``.
"""
# if the data is empty, return empty MeshgridData
if len([k for k, _ in data.data_items()]) == 0:
return MeshgridDataDict()
if not data.axes_are_compatible():
raise ValueError('Non-compatible axes, cannot grid that.')
if not use_existing_shape and data.is_expandable():
data = data.expand()
elif use_existing_shape:
target_shape = data.dependents()[0].shape
# guess what the shape likely is.
if target_shape is None:
shp_specs = guess_shape_from_datadict(data)
shps = [shape for (order, shape) in shp_specs.values()]
if len(set(shps)) > 1:
raise ValueError('Cannot determine unique shape for all data.')
def process(self, dataIn=None):
if super().process(dataIn=dataIn) is None:
return None
data = dataIn.copy()
if self._averagingAxis in self.dataAxes and \
self.dataType == MeshgridDataDict:
axidx = self.dataAxes.index(self._averagingAxis)
for dep in dataIn.dependents():
avg = data.data_vals(dep).mean(axis=axidx, keepdims=True)
data[dep]['values'] -= avg
return dict(dataOut=data)