Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_measurement_columns():
"""Test get_measurement_columns function"""
module = cellprofiler.modules.trackobjects.TrackObjects()
module.object_name.value = OBJECT_NAME
module.tracking_method.value = "Distance"
module.pixel_radius.value = 10
columns = module.get_measurement_columns(None)
assert len(columns) == len(cellprofiler.modules.trackobjects.F_ALL) + len(
cellprofiler.modules.trackobjects.F_IMAGE_ALL
)
for object_name, features in (
(OBJECT_NAME, cellprofiler.modules.trackobjects.F_ALL),
(cellprofiler.measurement.IMAGE, cellprofiler.modules.trackobjects.F_IMAGE_ALL),
):
for feature in features:
if object_name == OBJECT_NAME:
name = "_".join(
(cellprofiler.modules.trackobjects.F_PREFIX, feature, "10")
)
else:
name = "_".join(
(
cellprofiler.modules.trackobjects.F_PREFIX,
feature,
OBJECT_NAME,
"10",
)
)
index = [column[1] for column in columns].index(name)
3
4
5
6
7
8
9
10
"""
pipeline, module, filename = make_pipeline(csv_text)
module.wants_rows.value = True
module.row_range.min = 4
module.row_range.max = 6
m = pipeline.run()
assert isinstance(m, cellprofiler.measurement.Measurements)
data = m.get_all_measurements(cellprofiler.measurement.IMAGE, "Test_Measurement")
assert numpy.all(data == numpy.arange(4, 7))
os.remove(filename)
prefix = "Foo_"
numpy.random.seed(14887)
module = cellprofiler.modules.exporttospreadsheet.ExportToSpreadsheet()
module.set_module_num(1)
module.wants_everything.value = False
module.wants_prefix.value = True
module.prefix.value = prefix
module.directory.dir_choice = (
cellprofiler.modules.exporttospreadsheet.ABSOLUTE_FOLDER_NAME
)
module.directory.custom_path = output_dir
module.wants_everything.value = True
m = cellprofiler.measurement.Measurements()
image_measurements = numpy.random.uniform(size=4)
m.add_all_measurements(
cellprofiler.measurement.IMAGE, "my_measurement", image_measurements
)
image_set_list = cellprofiler.image.ImageSetList()
image_set = image_set_list.get_image_set(0)
object_set = cellprofiler.object.ObjectSet()
workspace = cellprofiler.workspace.Workspace(
make_measurements_pipeline(m), module, image_set, object_set, m, image_set_list
)
module.post_run(workspace)
path = os.path.join(output_dir, prefix + "Image.csv")
assert os.path.exists(path)
workspace = self.make_workspace(primary_labels, secondary_labels)
module = workspace.module
module.run(workspace)
measurements = workspace.measurements
self.assertTrue("Image" in measurements.get_object_names())
count_feature = "Count_%s" % TERTIARY
self.assertTrue(count_feature in
measurements.get_feature_names("Image"))
value = measurements.get_current_measurement("Image", count_feature)
self.assertEqual(np.product(value.shape), 1)
self.assertEqual(value, 0)
self.assertTrue(TERTIARY in workspace.object_set.get_object_names())
output_objects = workspace.object_set.get_objects(TERTIARY)
self.assertTrue(np.all(output_objects.segmented == primary_labels))
columns = module.get_measurement_columns(workspace.pipeline)
for object_name in (cpm.IMAGE, PRIMARY, SECONDARY, TERTIARY):
ocolumns = [x for x in columns if x[0] == object_name]
features = measurements.get_feature_names(object_name)
self.assertEqual(len(ocolumns), len(features))
self.assertTrue(all([column[1] in features for column in ocolumns]))
module.image_directory.custom_path = test_path
m = cellprofiler.measurement.Measurements()
workspace = cellprofiler.workspace.Workspace(
pipeline,
module,
m,
cellprofiler.object.ObjectSet(),
m,
cellprofiler.image.ImageSetList(),
)
assert module.prepare_run(workspace)
assert (
m.get_measurement(cellprofiler.measurement.IMAGE, "FileName_DNA", 1)
== test_filename
)
path = m.get_measurement(cellprofiler.measurement.IMAGE, "PathName_DNA", 1)
assert path == test_path
assert m.get_measurement(
cellprofiler.measurement.IMAGE, "URL_DNA", 1
) == cellprofiler.modules.loadimages.pathname2url(
os.path.join(test_path, test_filename)
)
module.prepare_group(workspace, {}, [1])
module.run(workspace)
img = workspace.image_set.get_image("DNA", must_be_grayscale=True)
assert tuple(img.pixel_data.shape) == test_shape
def test_measurement_images():
workspace, module = make_workspace(
(numpy.zeros((10, 10)), numpy.zeros((10, 10)), numpy.zeros((10, 10))),
(None, None, None),
)
assert isinstance(module, cellprofiler.modules.align.Align)
for measurement in ("Xshift", "Yshift"):
image_names = module.get_measurement_images(
workspace.pipeline,
cellprofiler.measurement.IMAGE,
cellprofiler.modules.align.C_ALIGN,
measurement,
)
assert len(image_names) == 3
for i in range(3):
assert "Aligned%d" % i in image_names
def fetch_provider(self, name, measurements, is_image_name=True):
path_base = self.image_path
if is_image_name:
url_feature = cellprofiler.measurement.C_URL + "_" + name
series_feature = cellprofiler.measurement.C_SERIES + "_" + name
frame_feature = cellprofiler.measurement.C_FRAME + "_" + name
else:
url_feature = cellprofiler.measurement.C_OBJECTS_URL + "_" + name
series_feature = cellprofiler.measurement.C_OBJECTS_SERIES + "_" + name
frame_feature = cellprofiler.measurement.C_OBJECTS_FRAME + "_" + name
url = measurements.get_measurement(cellprofiler.measurement.IMAGE, url_feature)
url = url.encode("utf-8")
full_filename = loadimages.url2pathname(url)
path, filename = os.path.split(full_filename)
if measurements.has_feature(cellprofiler.measurement.IMAGE, series_feature):
series = measurements[cellprofiler.measurement.IMAGE, series_feature]
else:
series = None
if measurements.has_feature(cellprofiler.measurement.IMAGE, frame_feature):
frame = measurements[cellprofiler.measurement.IMAGE, frame_feature]
else:
frame = None
return loadimages.LoadImagesImageProvider(
name,
path,
filename,
rescale=self.rescale.value and is_image_name,
series=series,
index=frame,
)
import six
import cellprofiler.module as cpm
import cellprofiler.measurement as cpmeas
import cellprofiler.setting as cps
from cellprofiler.measurement import R_PARENT
O_MULTIPLY = "Multiply"
O_DIVIDE = "Divide"
O_ADD = "Add"
O_SUBTRACT = "Subtract"
O_NONE = "None"
O_ALL = [O_MULTIPLY, O_DIVIDE, O_ADD, O_SUBTRACT, O_NONE]
MC_IMAGE = cpmeas.IMAGE
MC_OBJECT = "Object"
MC_ALL = [MC_IMAGE, MC_OBJECT]
C_MATH = "Math"
class CalculateMath(cpm.Module):
module_name = "CalculateMath"
category = "Data Tools"
variable_revision_number = 2
def create_settings(self):
# XXX needs to use cps.SettingsGroup
class Operand(object):
"""Represents the collection of settings needed by each operand"""
int_features = []
int_data = []
string_features = []
string_data = []
metadata = [double_features, float_features, int_features, string_features]
for object_name, features in list(feature_dict.items()):
df = []
double_features.append((object_name, df))
ff = []
float_features.append((object_name, ff))
intf = []
int_features.append((object_name, intf))
sf = []
string_features.append((object_name, sf))
if object_name == cellprofiler.measurement.IMAGE:
object_counts = [] * n_image_sets
else:
object_numbers = m[
object_name, cellprofiler.measurement.OBJECT_NUMBER, image_numbers
]
object_counts = [len(x) for x in object_numbers]
for feature, data_type in features:
if data_type == "java.lang.String":
continue
if not m.has_feature(object_name, feature):
data = numpy.zeros(numpy.sum(object_counts))
else:
data = m[object_name, feature, image_numbers]
temp = []
for i, (di, count) in enumerate(zip(data, object_counts)):
if count == 0:
filter(
lambda x: image_set_start <= x <= image_set_end,
measurements.get_image_numbers(),
)
)
self.post_event(AnalysisStarted())
posted_analysis_started = True
# reset the status of every image set that needs to be processed
has_groups = measurements.has_groups()
if self.pipeline.requires_aggregation():
overwrite = True
if has_groups and not overwrite:
if not measurements.has_feature(
cellprofiler.measurement.IMAGE, self.STATUS
):
overwrite = True
else:
group_status = {}
for image_number in measurements.get_image_numbers():
group_number = measurements[
cellprofiler.measurement.IMAGE,
cellprofiler.measurement.GROUP_NUMBER,
image_number,
]
status = measurements[
cellprofiler.measurement.IMAGE, self.STATUS, image_number
]
if status != self.STATUS_DONE:
group_status[group_number] = self.STATUS_UNPROCESSED
elif group_number not in group_status: