Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"chemicalformula": "H",
"computer": "localhost",
"hamilton": "Test",
"hamversion": "0.1",
"job": "testing",
"parentid": 0,
"project": "database.testing",
"projectpath": "/TESTING",
"status": "initialized",
"timestart": datetime(2016, 5, 2, 11, 31, 4, 253377),
"timestop": datetime(2016, 5, 2, 11, 31, 4, 371165),
"totalcputime": 0.117788,
"username": "Test",
}
cls.job_id = cls.database.add_item_dict(par_dict)
cls.jobstatus_database = JobStatus(db=cls.database, job_id=cls.job_id)
def setUpClass(cls):
cls.jobstatus = JobStatus()
cls.database = DatabaseAccess("sqlite:///test_job_status.db", "simulation")
par_dict = {
"chemicalformula": "H",
"computer": "localhost",
"hamilton": "Test",
"hamversion": "0.1",
"job": "testing",
"parentid": 0,
"project": "database.testing",
"projectpath": "/TESTING",
"status": "initialized",
"timestart": datetime(2016, 5, 2, 11, 31, 4, 253377),
"timestop": datetime(2016, 5, 2, 11, 31, 4, 371165),
"totalcputime": 0.117788,
"username": "Test",
}
def refresh_job_status(self):
"""
Refresh job status by updating the job status with the status from the database if a job ID is available.
"""
if self.job_id:
self._status = JobStatus(
initial_status=self.project.db.get_item_by_id(self.job_id)["status"],
db=self.project.db,
job_id=self.job_id,
)
def __init__(self, project, job_name):
super(GenericJob, self).__init__(project, job_name)
self.__name__ = "GenericJob"
self.__version__ = "0.4"
self._server = Server()
self._logger = s.logger
self._executable = None
self._import_directory = None
self._status = JobStatus(db=project.db, job_id=self.job_id)
self.refresh_job_status()
self._restart_file_list = list()
self._restart_file_dict = dict()
self._exclude_nodes_hdf = list()
self._exclude_groups_hdf = list()
self._process = None
self._compress_by_default = False
self._python_only_job = False
self.interactive_cache = None
self.error = GenericError(job=self)
for sig in intercepted_signals:
signal.signal(sig, self.signal_intercept)
def ref_job(self):
"""
Get the reference job template from which all jobs within the ParallelMaster are generated.
Returns:
GenericJob: reference job
"""
if self._ref_job:
return self._ref_job
try:
ref_job = self[0]
if isinstance(ref_job, GenericJob):
self._ref_job = ref_job
self._ref_job._job_id = None
self._ref_job._status = JobStatus(db=self.project.db)
return self._ref_job
else:
return None
except IndexError:
return None
def refresh_job_status(self):
"""
Refresh job status by updating the job status with the status from the database if a job ID is available.
"""
if self.job_id:
self._status = JobStatus(
initial_status=self.project.db.get_item_by_id(self.job_id)["status"],
db=self.project.db,
job_id=self.job_id,
)
def reset_job_id(self, job_id=None):
"""
Reset the job id sets the job_id to None in the GenericJob as well as all connected modules like JobStatus.
"""
if job_id is not None:
job_id = int(job_id)
self._job_id = job_id
self._status = JobStatus(db=self.project.db, job_id=self._job_id)
def reset_job_id(self, job_id=None):
"""
Reset the job id sets the job_id to None in the GenericJob as well as all connected modules like JobStatus.
"""
if job_id is not None:
job_id = int(job_id)
self._job_id = job_id
self._status = JobStatus(db=self.project.db, job_id=self._job_id)