Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import numpy as np
from merlin.core import analysistask
'''This module contains dummy analysis tasks for running tests'''
class SimpleAnalysisTask(analysistask.AnalysisTask):
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
def _run_analysis(self):
pass
def get_estimated_memory(self):
return 100
def get_estimated_time(self):
return 1
def get_dependencies(self):
if 'dependencies' in self.parameters:
return self.parameters['dependencies']
def _add_parallel_completion_tasks(self, analysisTasks):
updatedTasks = {}
for k, v in analysisTasks.items():
updatedTasks[k] = v
if isinstance(v, analysistask.ParallelAnalysisTask):
parameters = {'dependent_task': k}
newTask = ParallelTaskComplete(self._dataSet, parameters,
'{}Done'.format(k))
if newTask.get_analysis_name() not in analysisTasks:
newTask.save()
updatedTasks[newTask.get_analysis_name()] = newTask
return updatedTasks
def get_analysis_subdirectory(
self, analysisTask: TaskOrName, subdirectory: str=None,
create: bool=True) -> str:
"""
analysisTask can either be the class or a string containing the
class name.
create - Flag indicating if the analysis subdirectory should be
created if it does not already exist.
"""
if isinstance(analysisTask, analysistask.AnalysisTask):
analysisName = analysisTask.get_analysis_name()
else:
analysisName = analysisTask
if subdirectory is None:
subdirectoryPath = os.sep.join(
[self.analysisPath, analysisName])
else:
subdirectoryPath = os.sep.join(
[self.analysisPath, analysisName, subdirectory])
if create:
os.makedirs(subdirectoryPath, exist_ok=True)
return subdirectoryPath
import os
from matplotlib import pyplot as plt
from matplotlib import patches
import pandas
import merlin
plt.style.use(
os.sep.join([os.path.dirname(merlin.__file__),
'ext', 'default.mplstyle']))
import seaborn
import numpy as np
from merlin.core import analysistask
from merlin.analysis import filterbarcodes
from merlin.util import binary
class PlotPerformance(analysistask.AnalysisTask):
"""
An analysis task that generates plots depicting metrics of the MERFISH
decoding.
"""
# TODO all the plotting should be refactored. I do not like the way
# this class is structured as a long list of plotting functions. It would
# be more convenient if each plot could track it's dependent tasks and
# be executed once those tasks are complete.
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
#TODO - move this definition to run_analysis()
self.optimizeTask = self.dataSet.load_analysis_task(
def _generate_slurm_report(self, task: analysistask.AnalysisTask):
if isinstance(task, analysistask.ParallelAnalysisTask):
idList = [
self.dataSet.get_analysis_environment(task, i)['SLURM_JOB_ID']
for i in range(task.fragment_count())]
else:
idList = [
self.dataSet.get_analysis_environment(task)['SLURM_JOB_ID']]
queryResult = subprocess.run(
['sacct', '--format=AssocID,Account,Cluster,User,JobID,JobName,'
+ 'NodeList,AveCPU,AveCPUFreq,MaxPages,MaxDiskRead,MaxDiskWrite,'
+ 'MaxRSS,ReqMem,CPUTime,Elapsed,Submit,Start,End,Timelimit',
'--units=M', '-P', '-j', ','.join(idList)], stdout=subprocess.PIPE)
slurmJobDF = pandas.read_csv(
io.StringIO(queryResult.stdout.decode('utf-8')), sep='|')