Source code for dipy.workflows.denoise

import logging
import shutil

import numpy as np

from dipy.core.gradients import gradient_table
from dipy.denoise.gibbs import gibbs_removal
from dipy.denoise.localpca import localpca, mppca
from dipy.denoise.nlmeans import nlmeans
from dipy.denoise.noise_estimate import estimate_sigma
from dipy.denoise.patch2self import patch2self
from dipy.denoise.pca_noise_estimate import pca_noise_estimate
from dipy.io.gradients import read_bvals_bvecs
from dipy.io.image import load_nifti, save_nifti
from dipy.workflows.workflow import Workflow


[docs] class Patch2SelfFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "patch2self"
[docs] def run( self, input_files, bval_files, model="ols", b0_threshold=50, alpha=1.0, verbose=False, patch_radius=0, b0_denoising=True, clip_negative_vals=False, shift_intensity=True, ver=3, out_dir="", out_denoised="dwi_patch2self.nii.gz", ): """Workflow for Patch2Self denoising method. See :footcite:p:`Fadnavis2020` for further details about the method. See :footcite:p:`Fadnavis2024` for further details about the new method. It applies patch2self denoising :footcite:p:`Fadnavis2020` on each file found by 'globing' ``input_file`` and ``bval_file``. It saves the results in a directory specified by ``out_dir``. Parameters ---------- input_files : string Path to the input volumes. This path may contain wildcards to process multiple inputs at once. bval_files : string bval file associated with the diffusion data. model : string, or initialized linear model object. This will determine the algorithm used to solve the set of linear equations underlying this model. If it is a string it needs to be one of the following: {'ols', 'ridge', 'lasso'}. Otherwise, it can be an object that inherits from `dipy.optimize.SKLearnLinearSolver` or an object with a similar interface from Scikit-Learn: `sklearn.linear_model.LinearRegression`, `sklearn.linear_model.Lasso` or `sklearn.linear_model.Ridge` and other objects that inherit from `sklearn.base.RegressorMixin`. Default: 'ols'. b0_threshold : int, optional Threshold for considering volumes as b0. alpha : float, optional Regularization parameter only for ridge regression model. verbose : bool, optional Show progress of Patch2Self and time taken. patch_radius : variable int, optional The radius of the local patch to be taken around each voxel b0_denoising : bool, optional Skips denoising b0 volumes if set to False. clip_negative_vals : bool, optional Sets negative values after denoising to 0 using `np.clip`. shift_intensity : bool, optional Shifts the distribution of intensities per volume to give non-negative values ver : int, optional Version of the Patch2Self algorithm to use between 1 or 3. out_dir : string, optional Output directory (default current directory) out_denoised : string, optional Name of the resulting denoised volume (default: dwi_patch2self.nii.gz) References ---------- .. footbibliography:: """ io_it = self.get_io_iterator() if isinstance(patch_radius, list) and len(patch_radius) == 1: patch_radius = int(patch_radius[0]) for fpath, bvalpath, odenoised in io_it: if self._skip: shutil.copy(fpath, odenoised) logging.warning("Denoising skipped for now.") else: logging.info("Denoising %s", fpath) data, affine, image = load_nifti(fpath, return_img=True) bvals = np.loadtxt(bvalpath) extra_args = {"patch_radius": patch_radius} if ver == 1 else {} denoised_data = patch2self( data, bvals, model=model, b0_threshold=b0_threshold, alpha=alpha, verbose=verbose, b0_denoising=b0_denoising, clip_negative_vals=clip_negative_vals, shift_intensity=shift_intensity, version=ver, **extra_args, ) save_nifti(odenoised, denoised_data, affine, hdr=image.header) logging.info("Denoised volumes saved as %s", odenoised)
[docs] class NLMeansFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "nlmeans"
[docs] def run( self, input_files, sigma=0, patch_radius=1, block_radius=5, rician=True, out_dir="", out_denoised="dwi_nlmeans.nii.gz", ): """Workflow wrapping the nlmeans denoising method. It applies nlmeans denoise :footcite:p:`Descoteaux2008a` on each file found by 'globing' ``input_files`` and saves the results in a directory specified by ``out_dir``. Parameters ---------- input_files : string Path to the input volumes. This path may contain wildcards to process multiple inputs at once. sigma : float, optional Sigma parameter to pass to the nlmeans algorithm. patch_radius : int, optional patch size is ``2 x patch_radius + 1``. block_radius : int, optional block size is ``2 x block_radius + 1``. rician : bool, optional If True the noise is estimated as Rician, otherwise Gaussian noise is assumed. out_dir : string, optional Output directory. (default current directory) out_denoised : string, optional Name of the resulting denoised volume. References ---------- .. footbibliography:: """ io_it = self.get_io_iterator() for fpath, odenoised in io_it: if self._skip: shutil.copy(fpath, odenoised) logging.warning("Denoising skipped for now.") else: logging.info("Denoising %s", fpath) data, affine, image = load_nifti(fpath, return_img=True) if sigma == 0: logging.info("Estimating sigma") sigma = estimate_sigma(data) logging.debug(f"Found sigma {sigma}") denoised_data = nlmeans( data, sigma=sigma, patch_radius=patch_radius, block_radius=block_radius, rician=rician, ) save_nifti(odenoised, denoised_data, affine, hdr=image.header) logging.info("Denoised volume saved as %s", odenoised)
[docs] class LPCAFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "lpca"
[docs] def run( self, input_files, bvalues_files, bvectors_files, sigma=0, b0_threshold=50, bvecs_tol=0.01, patch_radius=2, pca_method="eig", tau_factor=2.3, out_dir="", out_denoised="dwi_lpca.nii.gz", ): r"""Workflow wrapping LPCA denoising method. See :footcite:p:`Veraart2016a` for further details about the method. Parameters ---------- input_files : string Path to the input volumes. This path may contain wildcards to process multiple inputs at once. bvalues_files : string Path to the bvalues files. This path may contain wildcards to use multiple bvalues files at once. bvectors_files : string Path to the bvectors files. This path may contain wildcards to use multiple bvectors files at once. sigma : float, optional Standard deviation of the noise estimated from the data. 0 means sigma value estimation following the algorithm in :footcite:t:`Manjon2013`. b0_threshold : float, optional Threshold used to find b0 volumes. bvecs_tol : float, optional Threshold used to check that norm(bvec) = 1 +/- bvecs_tol b-vectors are unit vectors. patch_radius : int, optional The radius of the local patch to be taken around each voxel (in voxels) For example, for a patch radius with value 2, and assuming the input image is a 3D image, the denoising will take place in blocks of 5x5x5 voxels. pca_method : string, optional Use either eigenvalue decomposition ('eig') or singular value decomposition ('svd') for principal component analysis. The default method is 'eig' which is faster. However, occasionally 'svd' might be more accurate. tau_factor : float, optional Thresholding of PCA eigenvalues is done by nulling out eigenvalues that are smaller than: .. math:: \tau = (\tau_{factor} \sigma)^2 $\tau_{factor}$ can be change to adjust the relationship between the noise standard deviation and the threshold $\tau$. If $\tau_{factor}$ is set to None, it will be automatically calculated using the Marcenko-Pastur distribution :footcite:p`Veraart2016b`. out_dir : string, optional Output directory. (default current directory) out_denoised : string, optional Name of the resulting denoised volume. References ---------- .. footbibliography:: """ io_it = self.get_io_iterator() if isinstance(patch_radius, list) and len(patch_radius) == 1: patch_radius = int(patch_radius[0]) for dwi, bval, bvec, odenoised in io_it: logging.info("Denoising %s", dwi) data, affine, image = load_nifti(dwi, return_img=True) if not sigma: logging.info("Estimating sigma") bvals, bvecs = read_bvals_bvecs(bval, bvec) gtab = gradient_table( bvals, bvecs=bvecs, b0_threshold=b0_threshold, atol=bvecs_tol ) sigma = pca_noise_estimate(data, gtab, correct_bias=True, smooth=3) logging.debug("Found sigma %s", sigma) denoised_data = localpca( data, sigma=sigma, patch_radius=patch_radius, pca_method=pca_method, tau_factor=tau_factor, ) save_nifti(odenoised, denoised_data, affine, hdr=image.header) logging.info("Denoised volume saved as %s", odenoised)
[docs] class MPPCAFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "mppca"
[docs] def run( self, input_files, patch_radius=2, pca_method="eig", return_sigma=False, out_dir="", out_denoised="dwi_mppca.nii.gz", out_sigma="dwi_sigma.nii.gz", ): r"""Workflow wrapping Marcenko-Pastur PCA denoising method. See :footcite:p:`Veraart2016a` for further details about the method. Parameters ---------- input_files : string Path to the input volumes. This path may contain wildcards to process multiple inputs at once. patch_radius : variable int, optional The radius of the local patch to be taken around each voxel (in voxels) For example, for a patch radius with value 2, and assuming the input image is a 3D image, the denoising will take place in blocks of 5x5x5 voxels. pca_method : string, optional Use either eigenvalue decomposition ('eig') or singular value decomposition ('svd') for principal component analysis. The default method is 'eig' which is faster. However, occasionally 'svd' might be more accurate. return_sigma : bool, optional If true, a noise standard deviation estimate based on the Marcenko-Pastur distribution is returned :footcite:p:`Veraart2016b`. out_dir : string, optional Output directory. (default current directory) out_denoised : string, optional Name of the resulting denoised volume. out_sigma : string, optional Name of the resulting sigma volume. References ---------- .. footbibliography:: """ io_it = self.get_io_iterator() if isinstance(patch_radius, list) and len(patch_radius) == 1: patch_radius = int(patch_radius[0]) for dwi, odenoised, osigma in io_it: logging.info("Denoising %s", dwi) data, affine, image = load_nifti(dwi, return_img=True) denoised_data, sigma = mppca( data, patch_radius=patch_radius, pca_method=pca_method, return_sigma=True, ) save_nifti(odenoised, denoised_data, affine, hdr=image.header) logging.info("Denoised volume saved as %s", odenoised) if return_sigma: save_nifti(osigma, sigma, affine, hdr=image.header) logging.info("Sigma volume saved as %s", osigma)
[docs] class GibbsRingingFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "gibbs_ringing"
[docs] def run( self, input_files, slice_axis=2, n_points=3, num_processes=1, out_dir="", out_unring="dwi_unring.nii.gz", ): r"""Workflow for applying Gibbs Ringing method. See :footcite:p:`Kellner2016` and :footcite:p:`NetoHenriques2018` for further details about the method. Parameters ---------- input_files : string Path to the input volumes. This path may contain wildcards to process multiple inputs at once. slice_axis : int, optional Data axis corresponding to the number of acquired slices. Could be (0, 1, or 2): for example, a value of 2 would mean the third axis. n_points : int, optional Number of neighbour points to access local TV (see note). num_processes : int or None, optional Split the calculation to a pool of children processes. Only applies to 3D or 4D `data` arrays. Default is 1. If < 0 the maximal number of cores minus ``num_processes + 1`` is used (enter -1 to use as many cores as possible). 0 raises an error. out_dir : string, optional Output directory. (default current directory) out_unring : string, optional Name of the resulting denoised volume. References ---------- .. footbibliography:: """ io_it = self.get_io_iterator() for dwi, ounring in io_it: logging.info("Unringing %s", dwi) data, affine, image = load_nifti(dwi, return_img=True) unring_data = gibbs_removal( data, slice_axis=slice_axis, n_points=n_points, num_processes=num_processes, ) save_nifti(ounring, unring_data, affine, hdr=image.header) logging.info("Denoised volume saved as %s", ounring)