Source code for dipy.workflows.workflow

import inspect
import logging
import os

from dipy.testing.decorators import warning_for_keywords
from dipy.workflows.multi_io import _io_iterator


[docs] class Workflow: @warning_for_keywords() def __init__( self, *, output_strategy="absolute", mix_names=False, force=False, skip=False ): """Initialize the basic workflow object. This object takes care of any workflow operation that is common to all the workflows. Every new workflow should extend this class. """ self._output_strategy = output_strategy self._mix_names = mix_names self.last_generated_outputs = None self._force_overwrite = force self._skip = skip
[docs] def get_io_iterator(self): """Create an iterator for IO. Use a couple of inspection tricks to build an IOIterator using the previous frame (values of local variables and other contextuals) and the run method's docstring. """ # To manage different python versions. frame = inspect.stack()[1] if isinstance(frame, tuple): frame = frame[0] else: frame = frame.frame io_it = _io_iterator( frame, self.run, output_strategy=self._output_strategy, mix_names=self._mix_names, ) # Make a list out of a list of lists self.flat_outputs = [item for sublist in io_it.outputs for item in sublist] if io_it.out_keys: self.last_generated_outputs = dict(zip(io_it.out_keys, self.flat_outputs)) else: self.last_generated_outputs = self.flat_outputs if self.manage_output_overwrite(): return io_it else: return []
[docs] def manage_output_overwrite(self): """Check if a file will be overwritten upon processing the inputs. If it is bound to happen, an action is taken depending on self._force_overwrite (or --force via command line). A log message is output independently of the outcome to tell the user something happened. """ duplicates = [] for output in self.flat_outputs: if os.path.isfile(output): duplicates.append(output) if len(duplicates) > 0: if self._force_overwrite: logging.info( "The following output files are about to be" " overwritten." ) else: logging.info( "The following output files already exist, the " "workflow will not continue processing any " "further. Add the --force flag to allow output " "files overwrite." ) for dup in duplicates: logging.info(dup) return self._force_overwrite return True
[docs] def run(self, *args, **kwargs): """Execute the workflow. Since this is an abstract class, raise exception if this code is reached (not implemented in child class or literally called on this class) """ raise Exception(f"Error: {self.__class__} does not have a run method.")
[docs] def get_sub_runs(self): """Return No sub runs since this is a simple workflow.""" return []
[docs] @classmethod def get_short_name(cls): """Return A short name for the workflow used to subdivide. The short name is used by CombinedWorkflows and the argparser to subdivide the commandline parameters avoiding the trouble of having subworkflows parameters with the same name. For example, a combined workflow with dti reconstruction and csd reconstruction might en up with the b0_threshold parameter. Using short names, we will have dti.b0_threshold and csd.b0_threshold available. Returns class name by default but it is strongly advised to set it to something shorter and easier to write on commandline. """ return cls.__name__