Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
TODO
"""
self.state.prepare_states(self.inputs)
self.state.prepare_inputs()
if state_index is not None:
inputs_copy = deepcopy(self.inputs)
for key, ind in self.state.inputs_ind[state_index].items():
setattr(
inputs_copy,
key.split(".")[1],
getattr(inputs_copy, key.split(".")[1])[ind],
)
input_hash = inputs_copy.hash
if is_workflow(self):
con_hash = hash_function(self._connections)
hash_list = [input_hash, con_hash]
checksum_ind = create_checksum(
self.__class__.__name__, self._checksum_wf(input_hash)
)
else:
checksum_ind = create_checksum(self.__class__.__name__, input_hash)
return checksum_ind
else:
checksum_list = []
for ind in range(len(self.state.inputs_ind)):
checksum_list.append(self.checksum_states(state_index=ind))
return checksum_list
inp_dict = {}
for field in attr_fields(self):
if field.name in ["_graph_checksums", "bindings"] or field.metadata.get(
"output_file_template"
):
continue
# removing values that are notset from hash calculation
if getattr(self, field.name) is attr.NOTHING:
continue
inp_dict[field.name] = hash_value(
value=getattr(self, field.name), tp=field.type, metadata=field.metadata
)
inp_hash = hash_function(inp_dict)
if hasattr(self, "_graph_checksums"):
return hash_function((inp_hash, self._graph_checksums))
else:
return inp_hash
def checksum(self):
""" Calculates the unique checksum of the task.
Used to create specific directory name for task that are run;
and to create nodes checksums needed for graph checkums
(before the tasks have inputs etc.)
"""
input_hash = self.inputs.hash
if self.state is None:
self._checksum = create_checksum(self.__class__.__name__, input_hash)
else:
splitter_hash = hash_function(self.state.splitter)
self._checksum = create_checksum(
self.__class__.__name__, hash_function([input_hash, splitter_hash])
)
return self._checksum
def _checksum_wf(self, input_hash, with_splitter=False):
""" creating hash value for workflows
includes connections and splitter if with_splitter is True
"""
connection_hash = hash_function(self._connections)
hash_list = [input_hash, connection_hash]
if with_splitter and self.state:
# including splitter in the hash
splitter_hash = hash_function(self.state.splitter)
hash_list.append(splitter_hash)
return hash_function(hash_list)
def checksum(self):
""" Calculates the unique checksum of the task.
Used to create specific directory name for task that are run;
and to create nodes checksums needed for graph checkums
(before the tasks have inputs etc.)
"""
input_hash = self.inputs.hash
if self.state is None:
self._checksum = create_checksum(self.__class__.__name__, input_hash)
else:
splitter_hash = hash_function(self.state.splitter)
self._checksum = create_checksum(
self.__class__.__name__, hash_function([input_hash, splitter_hash])
)
return self._checksum
def _checksum_wf(self, input_hash, with_splitter=False):
""" creating hash value for workflows
includes connections and splitter if with_splitter is True
"""
connection_hash = hash_function(self._connections)
hash_list = [input_hash, connection_hash]
if with_splitter and self.state:
# including splitter in the hash
splitter_hash = hash_function(self.state.splitter)
hash_list.append(splitter_hash)
return hash_function(hash_list)
def _checksum_wf(self, input_hash, with_splitter=False):
""" creating hash value for workflows
includes connections and splitter if with_splitter is True
"""
connection_hash = hash_function(self._connections)
hash_list = [input_hash, connection_hash]
if with_splitter and self.state:
# including splitter in the hash
splitter_hash = hash_function(self.state.splitter)
hash_list.append(splitter_hash)
return hash_function(hash_list)
def _checksum_wf(self, input_hash, with_splitter=False):
""" creating hash value for workflows
includes connections and splitter if with_splitter is True
"""
connection_hash = hash_function(self._connections)
hash_list = [input_hash, connection_hash]
if with_splitter and self.state:
# including splitter in the hash
splitter_hash = hash_function(self.state.splitter)
hash_list.append(splitter_hash)
return hash_function(hash_list)