Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pass
def get_estimated_memory(self):
return 100
def get_estimated_time(self):
return 1
def get_dependencies(self):
if 'dependencies' in self.parameters:
return self.parameters['dependencies']
else:
return []
class SimpleParallelAnalysisTask(analysistask.ParallelAnalysisTask):
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
def _run_analysis(self, fragmentIndex):
pass
def get_estimated_memory(self):
return 100
def get_estimated_time(self):
return 1
def get_dependencies(self):
if 'dependencies' in self.parameters:
return self.parameters['dependencies']
def test_save_environment(simple_task):
task1 = simple_task
task1.run()
environment = dict(os.environ)
if isinstance(simple_task, analysistask.ParallelAnalysisTask):
taskEnvironment = simple_task.dataSet.get_analysis_environment(
simple_task, 0)
else:
taskEnvironment = simple_task.dataSet.get_analysis_environment(
simple_task)
assert environment == taskEnvironment
from typing import List
from typing import Union
import numpy as np
from skimage import transform
from skimage import feature
import cv2
from merlin.core import analysistask
from merlin.util import aberration
class Warp(analysistask.ParallelAnalysisTask):
"""
An abstract class for warping a set of images so that the corresponding
pixels align between images taken in different imaging rounds.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
if 'write_fiducial_images' not in self.parameters:
self.parameters['write_fiducial_images'] = False
if 'write_aligned_images' not in self.parameters:
self.parameters['write_aligned_images'] = False
self.writeAlignedFiducialImages = self.parameters[
'write_fiducial_images']
featureDB = self.get_feature_database()
featureDB.write_features(featureList, fragmentIndex)
def _read_and_filter_image_stack(self, fov: int, channelIndex: int,
filterSigma: float) -> np.ndarray:
filterSize = int(2*np.ceil(2*filterSigma)+1)
warpTask = self.dataSet.load_analysis_task(
self.parameters['warp_task'])
return np.array([cv2.GaussianBlur(
warpTask.get_aligned_image(fov, channelIndex, z),
(filterSize, filterSize), filterSigma)
for z in range(len(self.dataSet.get_z_positions()))])
class CleanCellBoundaries(analysistask.ParallelAnalysisTask):
'''
A task to construct a network graph where each cell is a node, and overlaps
are represented by edges. This graph is then refined to assign cells to the
fov they are closest to (in terms of centroid). This graph is then refined
to eliminate overlapping cells to leave a single cell occupying a given
position.
'''
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
self.segmentTask = self.dataSet.load_analysis_task(
self.parameters['segment_task'])
self.alignTask = self.dataSet.load_analysis_task(
self.parameters['global_align_task'])
def fragment_count(self):
tasksComplete = {k: a for k, a in self.analysisTasks.items()
if a.is_complete()}
tasksWaiting = {k: a for k, a in self.analysisTasks.items()
if not a.is_complete() and not a.is_started()}
tasksRunning = {k: a for k, a in self.analysisTasks.items()
if a.is_running()}
tasksReady = {k: a for k, a in tasksWaiting.items()
if all([a2 in tasksComplete
for a2 in self.dependencyGraph[k]])}
parallelTaskRunning = any(
[a.is_parallel() for a in tasksRunning.values()])
for k,a in tasksReady.items():
if k not in self.tasksStarted:
if isinstance(a, analysistask.ParallelAnalysisTask):
if not parallelTaskRunning:
self._start_task(k)
parallelTaskRunning = True
else:
self._start_task(k)
import numpy as np
import pandas
import os
import tempfile
from merlin.core import dataset
from merlin.core import analysistask
from merlin.util import decoding
from merlin.util import barcodedb
from merlin.data.codebook import Codebook
class BarcodeSavingParallelAnalysisTask(analysistask.ParallelAnalysisTask):
"""
An abstract analysis class that saves barcodes into a barcode database.
"""
def __init__(self, dataSet: dataset.DataSet, parameters=None,
analysisName=None):
super().__init__(dataSet, parameters, analysisName)
def _reset_analysis(self, fragmentIndex: int = None) -> None:
super()._reset_analysis(fragmentIndex)
self.get_barcode_database().empty_database(fragmentIndex)
def get_barcode_database(self) -> barcodedb.BarcodeDB:
""" Get the barcode database this analysis task saves barcodes into.
featureList = [spatialfeature.SpatialFeature.feature_from_label_matrix(
(watershedCombinedOutput == i), fragmentIndex,
globalTask.fov_to_global_transform(fragmentIndex), zPos)
for i in np.unique(watershedCombinedOutput) if i != 0]
featureDB = self.get_feature_database()
featureDB.write_features(featureList, fragmentIndex)
def _read_image_stack(self, fov: int, channelIndex: int) -> np.ndarray:
warpTask = self.dataSet.load_analysis_task(
self.parameters['warp_task'])
return np.array([warpTask.get_aligned_image(fov, channelIndex, z)
for z in range(len(self.dataSet.get_z_positions()))])
class CleanCellBoundaries(analysistask.ParallelAnalysisTask):
'''
A task to construct a network graph where each cell is a node, and overlaps
are represented by edges. This graph is then refined to assign cells to the
fov they are closest to (in terms of centroid). This graph is then refined
to eliminate overlapping cells to leave a single cell occupying a given
position.
'''
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
self.segmentTask = self.dataSet.load_analysis_task(
self.parameters['segment_task'])
self.alignTask = self.dataSet.load_analysis_task(
self.parameters['global_align_task'])
def fragment_count(self):
import os
import cv2
import numpy as np
from merlin.core import analysistask
from merlin.util import deconvolve
from merlin.util import aberration
from merlin.data import codebook
class Preprocess(analysistask.ParallelAnalysisTask):
"""
An abstract class for preparing data for barcode calling.
"""
def _image_name(self, fov):
destPath = self.dataSet.get_analysis_subdirectory(
self.analysisName, subdirectory='preprocessed_images')
return os.sep.join([destPath, 'fov_' + str(fov) + '.tif'])
def get_pixel_histogram(self, fov=None):
if fov is not None:
return self.dataSet.load_numpy_analysis_result(
'pixel_histogram', self.analysisName, fov, 'histograms')
pixelHistogram = np.zeros(self.get_pixel_histogram(
def _generate_current_task_inputs(self):
inputTasks = [self._analysisTask.dataSet.load_analysis_task(x)
for x in self._analysisTask.get_dependencies()]
if self._analysisTask.__class__ == ParallelTaskComplete:
inputString = []
for t in inputTasks:
if isinstance(t, analysistask.ParallelAnalysisTask):
inputString.append(self._expand_as_string(
t, t.fragment_count()))
else:
inputString.append(t.dataSet.analysis_done_filename(t))
inputString = ','.join(inputString)
else:
if len(inputTasks) > 0:
inputString = ','.join([self._add_quotes(
x.dataSet.analysis_done_filename(x)) for x in inputTasks])
else:
inputString = ''
return self._clean_string(inputString)
import pandas
import numpy as np
from merlin.core import analysistask
from merlin.util import spatialfeature
class PartitionBarcodes(analysistask.ParallelAnalysisTask):
"""
An analysis task that assigns RNAs and sequential signals to cells
based on the boundaries determined during the segment task.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
def fragment_count(self):
return len(self.dataSet.get_fovs())
def get_estimated_memory(self):
return 2048
def get_estimated_time(self):