"""Robust and Unbiased Model-BAsed Spherical Deconvolution (RUMBA-SD)"""
import logging
import warnings
import numpy as np
from dipy.core.geometry import vec2vec_rotmat
from dipy.core.gradients import get_bval_indices, gradient_table, unique_bvals_tolerance
from dipy.core.onetime import auto_attr
from dipy.core.sphere import Sphere
from dipy.data import get_sphere
from dipy.reconst.csdeconv import AxSymShResponse
from dipy.reconst.odf import OdfFit, OdfModel
from dipy.reconst.shm import lazy_index, normalize_data
from dipy.segment.mask import bounding_box, crop
from dipy.sims.voxel import all_tensor_evecs, single_tensor
from dipy.testing.decorators import warning_for_keywords
# Machine precision for numerical stability in division
_EPS = np.finfo(float).eps
logger = logging.getLogger(__name__)
[docs]
class RumbaSDModel(OdfModel):
@warning_for_keywords()
def __init__(
self,
gtab,
*,
wm_response=(1.7e-3, 0.2e-3, 0.2e-3),
gm_response=0.8e-3,
csf_response=3.0e-3,
n_iter=600,
recon_type="smf",
n_coils=1,
R=1,
voxelwise=True,
use_tv=False,
sphere=None,
verbose=False,
):
"""
Robust and Unbiased Model-BAsed Spherical Deconvolution (RUMBA-SD).
RUMBA-SD :footcite:p:`CanalesRodriguez2015` is a modification of the
Richardson-Lucy algorithm accounting for Rician and Noncentral Chi noise
distributions, which more accurately represent MRI noise. Computes a
maximum likelihood estimation of the fiber orientation density function
(fODF) at each voxel. Includes white matter compartments alongside
optional GM and CSF compartments to account for partial volume effects.
This fit can be performed voxelwise or globally. The global fit will
proceed more quickly than the voxelwise fit provided that the computer
has adequate RAM (>= 16 GB should be sufficient for most datasets).
Kernel for deconvolution constructed using a priori knowledge of white
matter response function, as well as the mean diffusivity of GM and/or
CSF. RUMBA-SD is robust against impulse response imprecision, and thus
the default diffusivity values are often adequate
:footcite:p:`DellAcqua2007`.
Parameters
----------
gtab : GradientTable
Gradient table.
wm_response : 1d ndarray or 2d ndarray or AxSymShResponse, optional
Tensor eigenvalues as a (3,) ndarray, multishell eigenvalues as
a (len(unique_bvals_tolerance(gtab.bvals))-1, 3) ndarray in
order of smallest to largest b-value, or an AxSymShResponse.
Default: np.array([1.7e-3, 0.2e-3, 0.2e-3])
gm_response : float, optional
Mean diffusivity for GM compartment. If `None`, then grey
matter volume fraction is not computed. Default: 0.8e-3
csf_response : float, optional
Mean diffusivity for CSF compartment. If `None`, then CSF
volume fraction is not computed. Default: 3.0e-3
n_iter : int, optional
Number of iterations for fODF estimation. Must be a positive int.
Default: 600
recon_type : {'smf', 'sos'}, optional
MRI reconstruction method: spatial matched filter (SMF) or
sum-of-squares (SoS). SMF reconstruction generates Rician noise
while SoS reconstruction generates Noncentral Chi noise.
Default: 'smf'
n_coils : int, optional
Number of coils in MRI scanner -- only relevant in SoS
reconstruction. Must be a positive int. Default: 1
R : int, optional
Acceleration factor of the acquisition. For SIEMENS,
R = iPAT factor. For GE, R = ASSET factor. For PHILIPS,
R = SENSE factor. Typical values are 1 or 2. Must be a positive
int. Default: 1
voxelwise : bool, optional
If true, performs a voxelwise fit. If false, performs a global fit
on the entire brain at once. The global fit requires a 4D brain
volume in `fit`.
use_tv : bool, optional
If true, applies total variation regularization. This only takes
effect in a global fit (`voxelwise` is set to `False`). TV can only
be applied to 4D brain volumes with no singleton dimensions.
sphere : Sphere, optional
Sphere on which to construct fODF. If None, uses `repulsion724`.
verbose : bool, optional
If true, logs updates on estimated signal-to-noise ratio after each
iteration. This only takes effect in a global fit (`voxelwise` is
set to `False`).
References
----------
.. footbibliography::
"""
if not np.any(gtab.b0s_mask):
raise ValueError("Gradient table has no b0 measurements")
self.gtab_orig = gtab # save for prediction
# Masks to extract b0/non-b0 measurements
self.where_b0s = lazy_index(gtab.b0s_mask)
self.where_dwi = lazy_index(~gtab.b0s_mask)
# Correct gradient table to contain b0 data at the beginning
bvals_cor = np.concatenate(([0], gtab.bvals[self.where_dwi]))
bvecs_cor = np.concatenate(([[0, 0, 0]], gtab.bvecs[self.where_dwi]))
gtab_cor = gradient_table(bvals_cor, bvecs=bvecs_cor)
# Initialize self.gtab
OdfModel.__init__(self, gtab_cor)
if isinstance(wm_response, tuple):
wm_response = np.asarray(wm_response)
# Store responses
self.wm_response = wm_response
self.gm_response = gm_response
self.csf_response = csf_response
# Initializing remaining parameters
if R < 1 or n_iter < 1 or n_coils < 1:
raise ValueError(
f"R, n_iter, and n_coils must be >= 1, but R={R},"
+ f"n_iter={n_iter}, and n_coils={n_coils} "
)
self.R = R
self.n_iter = n_iter
self.recon_type = recon_type
self.n_coils = n_coils
if voxelwise and use_tv:
raise ValueError("Total variation has no effect in voxelwise fit")
if voxelwise and verbose:
warnings.warn(
"Verbosity has no effect in voxelwise fit", UserWarning, stacklevel=2
)
self.voxelwise = voxelwise
self.use_tv = use_tv
self.verbose = verbose
if sphere is None:
self.sphere = get_sphere(name="repulsion724")
else:
self.sphere = sphere
if voxelwise:
self.fit = self._voxelwise_fit
else:
self.fit = self._global_fit
# Fitting parameters
self.kernel = None
@warning_for_keywords()
def _global_fit(self, data, *, mask=None):
"""
Fit fODF and GM/CSF volume fractions globally.
Parameters
----------
data : ndarray (x, y, z, N)
Signal values for each voxel. Must be 4D.
mask : ndarray (x, y, z), optional
Binary mask specifying voxels of interest with 1; results will only
be fit at these voxels (0 elsewhere). If `None`, fits all voxels.
Default: None.
Returns
-------
model_fit : RumbaFit
Fit object storing model parameters.
"""
# Checking data and mask shapes
if len(data.shape) != 4:
raise ValueError(f"Data should be 4D, received shape f{data.shape}")
if mask is None: # default mask includes all voxels
mask = np.ones(data.shape[:3])
if data.shape[:3] != mask.shape:
raise ValueError(
"Mask shape should match first 3 dimensions of "
+ f"data, but data dimensions are f{data.shape} "
+ f"while mask dimensions are f{mask.shape}"
)
# Signal repair, normalization
# Normalize data to mean b0 image
data = normalize_data(data, self.where_b0s, min_signal=_EPS)
# Rearrange data to match corrected gradient table
data = np.concatenate(
(np.ones([*data.shape[:3], 1]), data[..., self.where_dwi]), axis=3
)
data[data > 1] = 1 # clip values between 0 and 1
# All arrays are converted to float32 to reduce memory load
data = data.astype(np.float32)
# Generate kernel
self.kernel = generate_kernel(
self.gtab,
self.sphere,
self.wm_response,
self.gm_response,
self.csf_response,
).astype(np.float32)
# Fit fODF
model_params = rumba_deconv_global(
data,
self.kernel,
mask,
n_iter=self.n_iter,
recon_type=self.recon_type,
n_coils=self.n_coils,
R=self.R,
use_tv=self.use_tv,
verbose=self.verbose,
)
model_fit = RumbaFit(self, model_params)
return model_fit
@warning_for_keywords()
def _voxelwise_fit(self, data, *, mask=None):
"""
Fit fODF and GM/CSF volume fractions voxelwise.
Parameters
----------
data : ndarray ([x, y, z], N)
Signal values for each voxel.
mask : ndarray ([x, y, z]), optional
Binary mask specifying voxels of interest with 1; results will only
be fit at these voxels (0 elsewhere). If `None`, fits all voxels.
Default: None.
Returns
-------
model_fit : RumbaFit
Fit object storing model parameters.
"""
if mask is None: # default mask includes all voxels
mask = np.ones(data.shape[:-1])
if data.shape[:-1] != mask.shape:
raise ValueError(
"Mask shape should match first dimensions of "
+ f"data, but data dimensions are f{data.shape} "
+ f"while mask dimensions are f{mask.shape}"
)
self.kernel = generate_kernel(
self.gtab,
self.sphere,
self.wm_response,
self.gm_response,
self.csf_response,
)
model_params = np.zeros(data.shape[:-1] + (len(self.sphere.vertices) + 2,))
for ijk in np.ndindex(data.shape[:-1]):
if mask[ijk]:
vox_data = data[ijk]
# Normalize data to mean b0 image
vox_data = normalize_data(vox_data, self.where_b0s, min_signal=_EPS)
# Rearrange data to match corrected gradient table
vox_data = np.concatenate(([1], vox_data[self.where_dwi]))
vox_data[vox_data > 1] = 1 # clip values between 0 and 1
# Fitting
model_param = rumba_deconv(
vox_data,
self.kernel,
n_iter=self.n_iter,
recon_type=self.recon_type,
n_coils=self.n_coils,
)
model_params[ijk] = model_param
model_fit = RumbaFit(self, model_params)
return model_fit
[docs]
class RumbaFit(OdfFit):
def __init__(self, model, model_params):
"""
Constructs fODF, GM/CSF volume fractions, and other derived results.
fODF and GM/CSF fractions are normalized to collectively sum to 1 for
each voxel.
Parameters
----------
model : RumbaSDModel
RumbaSDModel-SD model.
model_params : ndarray ([x, y, z], M)
fODF and GM/CSF volume fractions for each voxel.
"""
self.model = model
self.model_params = model_params
[docs]
@warning_for_keywords()
def odf(self, *, sphere=None):
"""
Constructs fODF at discrete vertices on model sphere for each voxel.
Parameters
----------
sphere : Sphere, optional
Sphere on which to construct fODF. If specified, must be the same
sphere used by the `RumbaSDModel` model. Default: None.
Returns
-------
odf : ndarray ([x, y, z], M-2)
fODF computed at each vertex on model sphere.
"""
if sphere is not None and sphere != self.model.sphere:
raise ValueError(
"Reconstruction sphere must be the same as used"
+ " in the RUMBA-SD model."
)
odf = self.model_params[..., :-2]
return odf
[docs]
@auto_attr
def f_gm(self):
"""
Constructs GM volume fraction for each voxel.
Returns
-------
f_gm : ndarray ([x, y, z])
GM volume fraction.
"""
f_gm = self.model_params[..., -2]
return f_gm
[docs]
@auto_attr
def f_csf(self):
"""
Constructs CSF volume fraction for each voxel.
Returns
-------
f_csf : ndarray ([x, y, z])
CSF volume fraction.
"""
f_csf = self.model_params[..., -1]
return f_csf
[docs]
@auto_attr
def f_wm(self):
"""
Constructs white matter volume fraction for each voxel.
Equivalent to sum of fODF.
Returns
-------
f_wm : ndarray ([x, y, z])
White matter volume fraction.
"""
f_wm = np.sum(self.odf(), axis=-1)
return f_wm
[docs]
@auto_attr
def f_iso(self):
"""
Constructs isotropic volume fraction for each voxel.
Equivalent to sum of GM and CSF volume fractions.
Returns
-------
f_iso : ndarray ([x, y, z])
Isotropic volume fraction.
"""
f_iso = self.f_gm + self.f_csf
return f_iso
[docs]
@auto_attr
def combined_odf_iso(self):
"""
Constructs fODF combined with isotropic volume fraction at discrete
vertices on model sphere.
Distributes isotropic compartments evenly along each fODF direction.
Sums to 1.
Returns
-------
combined : ndarray ([x, y, z], M-2)
fODF combined with isotropic volume fraction.
"""
odf = self.odf()
combined = odf + self.f_iso[..., None] / odf.shape[-1]
return combined
[docs]
@warning_for_keywords()
def predict(self, *, gtab=None, S0=None):
"""
Compute signal prediction on model gradient table given given fODF
and GM/CSF volume fractions for each voxel.
Parameters
----------
gtab : GradientTable, optional
The gradients for which the signal will be predicted. Use the
model's gradient table if `None`. Default: None
S0 : ndarray ([x, y, z]) or float, optional
The non diffusion-weighted signal value for each voxel. If a float,
the same value is used for each voxel. If `None`, 1 is used for
each voxel. Default: None
Returns
-------
pred_sig : ndarray ([x, y, z], N)
The predicted signal.
"""
model_params = self.model_params
if gtab is None:
gtab = self.model.gtab_orig
pred_kernel = generate_kernel(
gtab,
self.model.sphere,
self.model.wm_response,
self.model.gm_response,
self.model.csf_response,
)
if S0 is None:
S0 = np.ones(model_params.shape[:-1] + (1,))
elif isinstance(S0, np.ndarray):
S0 = S0[..., None]
else:
S0 = S0 * np.ones(model_params.shape[:-1] + (1,))
pred_sig = np.empty(model_params.shape[:-1] + (len(gtab.bvals),))
for ijk in np.ndindex(model_params.shape[:-1]):
pred_sig[ijk] = S0[ijk] * np.dot(pred_kernel, model_params[ijk])
return pred_sig
[docs]
@warning_for_keywords()
def rumba_deconv(data, kernel, *, n_iter=600, recon_type="smf", n_coils=1):
r"""
Fit fODF and GM/CSF volume fractions for a voxel using RUMBA-SD.
Deconvolves the kernel from the diffusion-weighted signal by computing a
maximum likelihood estimation of the fODF
:footcite:p:`CanalesRodriguez2015`. Minimizes the negative log-likelihood of
the data under Rician or Noncentral Chi noise distributions by adapting the
iterative technique developed in Richardson-Lucy deconvolution.
Parameters
----------
data : 1d ndarray (N,)
Signal values for a single voxel.
kernel : 2d ndarray (N, M)
Deconvolution kernel mapping volume fractions of the M compartments to
N-length signal. Last two columns should be for GM and CSF.
n_iter : int, optional
Number of iterations for fODF estimation. Must be a positive int.
Default: 600
recon_type : {'smf', 'sos'}, optional
MRI reconstruction method: spatial matched filter (SMF) or
sum-of-squares (SoS). SMF reconstruction generates Rician noise while
SoS reconstruction generates Noncentral Chi noise. Default: 'smf'
n_coils : int, optional
Number of coils in MRI scanner -- only relevant in SoS reconstruction.
Must be a positive int. Default: 1
Returns
-------
fit_vec : 1d ndarray (M,)
Vector containing fODF and GM/CSF volume fractions. First M-2
components are fODF while last two are GM and CSF respectively.
Notes
-----
The diffusion MRI signal measured at a given voxel is a sum of
contributions from each intra-voxel compartment, including parallel white
matter (WM) fiber populations in a given orientation as well as effects
from GM and CSF. The equation governing these contributions is:
$S_i = S_0\left(\sum_{j=1}^{M}f_j\exp(-b_i\textbf{v}_i^T\textbf{D}_j
\textbf{v}_i) + f_{GM}\exp(-b_iD_{GM})+f_{CSF}\exp(-b_iD_{CSF})\right)$
Where $S_i$ is the resultant signal along the diffusion-sensitizing
gradient unit vector $\textbf{v_i}; i = 1, ..., N$ with a b-value of $b_i$.
$f_j; j = 1, ..., M$ is the volume fraction of the $j^{th}$ fiber
population with an anisotropic diffusion tensor $\textbf{D_j}$.
$f_{GM}$ and $f_{CSF}$ are the volume fractions and $D_{GM}$ and $D_{CSF}$
are the mean diffusivities of GM and CSF respectively.
This equation is linear in $f_j, f_{GM}, f_{CSF}$ and can be simplified to
a single matrix multiplication:
$\textbf{S} = \textbf{Hf}$
Where $\textbf{S}$ is the signal vector at a certain voxel, $\textbf{H}$ is
the deconvolution kernel, and $\textbf{f}$ is the vector of volume
fractions for each compartment.
Modern MRI scanners produce noise following a Rician or Noncentral Chi
distribution, depending on their signal reconstruction technique
`footcite:p:`Constantinides1997`. Using this linear model, it can be shown
that the likelihood of a signal under a Noncentral Chi noise model is:
$P(\textbf{S}|\textbf{H}, \textbf{f}, \sigma^2, n) = \prod_{i=1}^{N}\left(
\frac{S_i}{\bar{S_i}}\right)^n\exp\left\{-\frac{1}{2\sigma^2}\left[
S_i^2 + \bar{S}_i^2\right]\right\}I_{n-1}\left(\frac{S_i\bar{S}_i}
{\sigma^2}\right)u(S_i)$
Where $S_i$ and $\bar{S}_i = \textbf{Hf}$ are the measured and expected
signals respectively, and $n$ is the number of coils in the scanner, and
$I_{n-1}$ is the modified Bessel function of first kind of order $n-1$.
This gives the likelihood under a Rician distribution when $n$ is set to 1.
By taking the negative log of this with respect to $\textbf{f}$ and setting
the derivative to 0, the $\textbf{f}$ maximizing likelihood is found to be:
$\textbf{f} = \textbf{f} \circ \frac{\textbf{H}^T\left[\textbf{S}\circ
\frac{I_n(\textbf{S}\circ \textbf{Hf}/\sigma^2)} {I_{n-1}(\textbf{S}
\circ\textbf{Hf}\sigma^2)} \right ]} {\textbf{H}^T\textbf{Hf}}$
The solution can be found using an iterative scheme, just as in the
Richardson-Lucy algorithm:
$\textbf{f}^{k+1} = \textbf{f}^k \circ \frac{\textbf{H}^T\left[\textbf{S}
\circ\frac{I_n(\textbf{S}\circ\textbf{Hf}^k/\sigma^2)} {I_{n-1}(\textbf{S}
\circ\textbf{Hf}^k/\sigma^2)} \right ]} {\textbf{H}^T\textbf{Hf}^k}$
In order to apply this, a reasonable estimate of $\sigma^2$ is required.
To find this, a separate iterative scheme is found using the derivative
of the negative log with respect to $\sigma^2$, and is run in parallel.
This is shown here:
$\alpha^{k+1} = \frac{1}{nN}\left\{ \frac{\textbf{S}^T\textbf{S} +
\textbf{f}^T\textbf{H}^T\textbf{Hf}}{2} - \textbf{1}^T_N\left[(\textbf{S}
\circ\textbf{Hf})\circ\frac{I_n(\textbf{S}\circ\textbf{Hf}/\alpha^k)}
{I_{n-1}(\textbf{S}\circ\textbf{Hf}/\alpha^k)} \right ]\right \}$
For more details, see :footcite:p:`CanalesRodriguez2015`.
References
----------
.. footbibliography::
"""
n_comp = kernel.shape[1] # number of compartments
n_grad = len(data) # gradient directions
fodf = np.ones((n_comp, 1)) # initial guess is iso-probable
fodf = fodf / np.sum(fodf, axis=0) # normalize initial guess
if recon_type == "smf":
n_order = 1 # Rician noise (same as Noncentral Chi with order 1)
elif recon_type == "sos":
n_order = n_coils # Noncentral Chi noise (order = # of coils)
else:
raise ValueError(
f"Invalid recon_type. Should be 'smf' or 'sos', received {recon_type}"
)
data = data.reshape(-1, 1)
reblurred = np.matmul(kernel, fodf)
# For use later
kernel_t = kernel.T
f_zero = 0 # minimum value allowed in fODF
# Initialize variance
sigma0 = 1 / 15
sigma2 = sigma0**2 * np.ones(data.shape) # Expand into vector
reblurred_s = data * reblurred / sigma2
for _ in range(n_iter):
fodf_i = fodf
ratio = mbessel_ratio(n_order, reblurred_s)
rl_factor = np.matmul(kernel_t, data * ratio) / (
np.matmul(kernel_t, reblurred) + _EPS
)
fodf = fodf_i * rl_factor # result of iteration
fodf = np.maximum(f_zero, fodf) # apply positivity constraint
# Update other variables
reblurred = np.matmul(kernel, fodf)
reblurred_s = data * reblurred / sigma2
# Iterate variance
sigma2_i = (1 / (n_grad * n_order)) * np.sum(
(data**2 + reblurred**2) / 2 - (sigma2 * reblurred_s) * ratio, axis=0
)
sigma2_i = np.minimum((1 / 8) ** 2, np.maximum(sigma2_i, (1 / 80) ** 2))
sigma2 = sigma2_i * np.ones(data.shape) # Expand into vector
# Normalize final result
fit_vec = np.squeeze(fodf / (np.sum(fodf, axis=0) + _EPS))
return fit_vec
[docs]
def mbessel_ratio(n, x):
r"""
Fast computation of modified Bessel function ratio (first kind).
Computes:
$I_{n}(x) / I_{n-1}(x)$
using Perron's continued fraction equation where $I_n$ is the modified
Bessel function of first kind, order $n$ :footcite:p:`Gautschi1978`.
Parameters
----------
n : int
Order of Bessel function in numerator (denominator is of order n-1).
Must be a positive int.
x : float or ndarray
Value or array of values with which to compute ratio.
Returns
-------
y : float or ndarray
Result of ratio computation.
References
----------
.. footbibliography::
"""
y = x / (
(2 * n + x)
- (
2
* x
* (n + 1 / 2)
/ (
2 * n
+ 1
+ 2 * x
- (
2
* x
* (n + 3 / 2)
/ (2 * n + 2 + 2 * x - (2 * x * (n + 5 / 2) / (2 * n + 3 + 2 * x)))
)
)
)
)
return y
[docs]
def generate_kernel(gtab, sphere, wm_response, gm_response, csf_response):
"""
Generate deconvolution kernel
Compute kernel mapping orientation densities of white matter fiber
populations (along each vertex of the sphere) and isotropic volume
fractions to a diffusion weighted signal.
Parameters
----------
gtab : GradientTable
Gradient table.
sphere : Sphere
Sphere with which to sample discrete fiber orientations in order to
construct kernel
wm_response : 1d ndarray or 2d ndarray or AxSymShResponse, optional
Tensor eigenvalues as a (3,) ndarray, multishell eigenvalues as
a (len(unique_bvals_tolerance(gtab.bvals))-1, 3) ndarray in
order of smallest to largest b-value, or an AxSymShResponse.
gm_response : float, optional
Mean diffusivity for GM compartment. If `None`, then grey
matter compartment set to all zeros.
csf_response : float, optional
Mean diffusivity for CSF compartment. If `None`, then CSF
compartment set to all zeros.
Returns
-------
kernel : 2d ndarray (N, M)
Computed kernel; can be multiplied with a vector consisting of volume
fractions for each of M-2 fiber populations as well as GM and CSF
fractions to produce a diffusion weighted signal.
"""
# Coordinates of sphere vertices
sticks = sphere.vertices
n_grad = len(gtab.gradients) # number of gradient directions
n_wm_comp = sticks.shape[0] # number of fiber populations
n_comp = n_wm_comp + 2 # plus isotropic compartments
kernel = np.zeros((n_grad, n_comp))
# White matter compartments
list_bvals = unique_bvals_tolerance(gtab.bvals)
n_bvals = len(list_bvals) - 1 # number of unique b-values
if isinstance(wm_response, AxSymShResponse):
# Data-driven response
where_dwi = lazy_index(~gtab.b0s_mask)
gradients = gtab.gradients[where_dwi]
gradients = gradients / np.linalg.norm(gradients, axis=1)[..., None]
S0 = wm_response.S0
for i in range(n_wm_comp):
# Response oriented along [0, 0, 1], so must rotate sticks[i]
rot_mat = vec2vec_rotmat(sticks[i], np.array([0, 0, 1]))
rot_gradients = np.dot(rot_mat, gradients.T).T
rot_sphere = Sphere(xyz=rot_gradients)
# Project onto rotated sphere and scale
rot_response = wm_response.on_sphere(rot_sphere) / S0
kernel[where_dwi, i] = rot_response
# Set b0 components
kernel[gtab.b0s_mask, :] = 1
elif wm_response.shape == (n_bvals, 3):
# Multi-shell response
bvals = gtab.bvals
bvecs = gtab.bvecs
for n, bval in enumerate(list_bvals[1:]):
indices = get_bval_indices(bvals, bval)
with warnings.catch_warnings(): # extract relevant b-value
warnings.simplefilter("ignore")
gtab_sub = gradient_table(bvals[indices], bvecs=bvecs[indices])
for i in range(n_wm_comp):
# Signal generated by WM-fiber for each gradient direction
S = single_tensor(
gtab_sub, evals=wm_response[n], evecs=all_tensor_evecs(sticks[i])
)
kernel[indices, i] = S
# Set b0 components
b0_indices = get_bval_indices(bvals, list_bvals[0])
kernel[b0_indices, :] = 1
else:
# Single-shell response
for i in range(n_wm_comp):
# Signal generated by WM-fiber for each gradient direction
S = single_tensor(
gtab, evals=wm_response, evecs=all_tensor_evecs(sticks[i])
)
kernel[:, i] = S
# Set b0 components
kernel[gtab.b0s_mask, :] = 1
# GM compartment
if gm_response is None:
S_gm = np.zeros(n_grad)
else:
S_gm = single_tensor(
gtab, evals=np.array([gm_response, gm_response, gm_response])
)
if csf_response is None:
S_csf = np.zeros(n_grad)
else:
S_csf = single_tensor(
gtab, evals=np.array([csf_response, csf_response, csf_response])
)
kernel[:, n_comp - 2] = S_gm
kernel[:, n_comp - 1] = S_csf
return kernel
[docs]
@warning_for_keywords()
def rumba_deconv_global(
data,
kernel,
mask,
*,
n_iter=600,
recon_type="smf",
n_coils=1,
R=1,
use_tv=True,
verbose=False,
):
r"""
Fit fODF for all voxels simultaneously using RUMBA-SD.
Deconvolves the kernel from the diffusion-weighted signal at each voxel by
computing a maximum likelihood estimation of the fODF
:footcite:p:`CanalesRodriguez2015`. Global fitting also permits the use of
total variation regularization (RUMBA-SD + TV). The spatial dependence
introduced by TV promotes smoother solutions (i.e. prevents oscillations),
while still allowing for sharp discontinuities :footcite:p:`Rudin1992`. This
promotes smoothness and continuity along individual tracts while preventing
smoothing of adjacent tracts.
Generally, global_fit will proceed more quickly than the voxelwise fit
provided that the computer has adequate RAM (>= 16 GB should be more than
sufficient).
Parameters
----------
data : 4d ndarray (x, y, z, N)
Signal values for entire brain. None of the volume dimensions x, y, z
can be 1 if TV regularization is required.
kernel : 2d ndarray (N, M)
Deconvolution kernel mapping volume fractions of the M compartments to
N-length signal. Last two columns should be for GM and CSF.
mask : 3d ndarray(x, y, z)
Binary mask specifying voxels of interest with 1; fODF will only be
fit at these voxels (0 elsewhere).
n_iter : int, optional
Number of iterations for fODF estimation. Must be a positive int.
recon_type : {'smf', 'sos'}, optional
MRI reconstruction method: spatial matched filter (SMF) or
sum-of-squares (SoS). SMF reconstruction generates Rician noise while
SoS reconstruction generates Noncentral Chi noise.
n_coils : int, optional
Number of coils in MRI scanner -- only relevant in SoS reconstruction.
Must be a positive int.
use_tv : bool, optional
If true, applies total variation regularization. This requires a brain
volume with no singleton dimensions.
verbose : bool, optional
If true, logs updates on estimated signal-to-noise ratio after each
iteration.
Returns
-------
fit_array : 4d ndarray (x, y, z, M)
fODF and GM/CSF volume fractions computed for each voxel. First M-2
components are fODF, while last two are GM and CSf respectively.
Notes
-----
TV modifies our cost function as follows:
.. math::
J(\textbf{f}) = -\log{P(\textbf{S}|\textbf{H}, \textbf{f}, \sigma^2, n)})+
\alpha_{TV}TV(\textbf{f})
where the first term is the negative log likelihood described in the notes
of `rumba_deconv`, and the second term is the TV energy, or the sum of
gradient absolute values for the fODF across the entire brain. This results
in a new multiplicative factor in the iterative scheme, now becoming:
.. math::
\textbf{f}^{k+1} = \textbf{f}^k \circ \frac{\textbf{H}^T\left[\textbf{S}
\circ\frac{I_n(\textbf{S}\circ\textbf{Hf}^k/\sigma^2)} {I_{n-1}(\textbf{S}
\circ\textbf{Hf}^k/\sigma^2)} \right ]} {\textbf{H}^T\textbf{Hf}^k}\circ
\textbf{R}^k
where $\textbf{R}^k$ is computed voxelwise by:
.. math::
(\textbf{R}^k)_j = \frac{1}{1 - \alpha_{TV}div\left(\frac{\triangledown[
\textbf{f}^k_{3D}]_j}{\lvert\triangledown[\textbf{f}^k_{3D}]_j \rvert}
\right)\biggr\rvert_{x, y, z}}
Here, $\triangledown$ is the symbol for the 3D gradient at any voxel.
The regularization strength, $\alpha_{TV}$ is updated after each iteration
by the discrepancy principle -- specifically, it is selected to match the
estimated variance after each iteration :footcite:p:`Chambolle2004`.
References
----------
.. footbibliography::
"""
# Crop data to reduce memory consumption
dim_orig = data.shape
ixmin, ixmax = bounding_box(mask)
data = crop(data, ixmin, ixmax)
mask = crop(mask, ixmin, ixmax)
if np.any(np.array(data.shape[:3]) == 1) and use_tv:
raise ValueError(
"Cannot use TV regularization if any spatial"
+ "dimensions are 1; "
+ f"provided dimensions were {data.shape[:3]}"
)
epsilon = 1e-7
n_grad = kernel.shape[0] # gradient directions
n_comp = kernel.shape[1] # number of compartments
dim = data.shape
n_v_tot = np.prod(dim[:3]) # total number of voxels
# Initial guess is iso-probable
fodf0 = np.ones((n_comp, 1), dtype=np.float32)
fodf0 = fodf0 / np.sum(fodf0, axis=0)
if recon_type == "smf":
n_order = 1 # Rician noise (same as Noncentral Chi with order 1)
elif recon_type == "sos":
n_order = n_coils # Noncentral Chi noise (order = # of coils)
else:
raise ValueError(
f"Invalid recon_type. Should be 'smf' or 'sos', received f{recon_type}"
)
mask_vec = np.ravel(mask)
# Indices of target voxels
index_mask = np.atleast_1d(np.squeeze(np.argwhere(mask_vec)))
n_v_true = len(index_mask) # number of target voxels
data_2d = np.zeros((n_v_true, n_grad), dtype=np.float32)
for i in range(n_grad):
data_2d[:, i] = np.ravel(data[:, :, :, i])[
index_mask
] # only keep voxels of interest
data_2d = data_2d.T
fodf = np.tile(fodf0, (1, n_v_true))
reblurred = np.matmul(kernel, fodf)
# For use later
kernel_t = kernel.T
f_zero = 0
# Initialize algorithm parameters
sigma0 = 1 / 15
sigma2 = sigma0**2
tv_lambda = sigma2 # initial guess for TV regularization strength
# Expand into matrix form for iterations
sigma2 = sigma2 * np.ones(data_2d.shape, dtype=np.float32)
tv_lambda_aux = np.zeros(n_v_tot, dtype=np.float32)
reblurred_s = data_2d * reblurred / sigma2
for i in range(n_iter):
fodf_i = fodf
ratio = mbessel_ratio(n_order, reblurred_s).astype(np.float32)
rl_factor = np.matmul(kernel_t, data_2d * ratio) / (
np.matmul(kernel_t, reblurred) + _EPS
)
if use_tv: # apply TV regularization
tv_factor = np.ones(fodf_i.shape, dtype=np.float32)
fodf_4d = _reshape_2d_4d(fodf_i.T, mask)
# Compute gradient, divergence
gr = _grad(fodf_4d)
d_inv = 1 / np.sqrt(epsilon**2 + np.sum(gr**2, axis=3))
gr_norm = gr * d_inv[:, :, :, None, :]
div_f = _divergence(gr_norm)
g0 = np.abs(1 - tv_lambda * div_f)
tv_factor_4d = 1 / (g0 + _EPS)
for j in range(n_comp):
tv_factor_1d = np.ravel(tv_factor_4d[:, :, :, j])[index_mask]
tv_factor[j, :] = tv_factor_1d
# Apply TV regularization to iteration factor
rl_factor = rl_factor * tv_factor
fodf = fodf_i * rl_factor # result of iteration
fodf = np.maximum(f_zero, fodf) # positivity constraint
# Update other variables
reblurred = np.matmul(kernel, fodf)
reblurred_s = data_2d * reblurred / sigma2
# Iterate variance
sigma2_i = (1 / (n_grad * n_order)) * np.sum(
(data_2d**2 + reblurred**2) / 2 - (sigma2 * reblurred_s) * ratio, axis=0
)
sigma2_i = np.minimum((1 / 8) ** 2, np.maximum(sigma2_i, (1 / 80) ** 2))
if verbose:
logger.info("Iteration %d of %d", i + 1, n_iter)
snr_mean = np.mean(1 / np.sqrt(sigma2_i))
snr_std = np.std(1 / np.sqrt(sigma2_i))
logger.info(
"Mean SNR (S0/sigma) estimated to be %.3f +/- %.3f", snr_mean, snr_std
)
# Expand into matrix
sigma2 = np.tile(sigma2_i[None, :], (data_2d.shape[0], 1))
# Update TV regularization strength using the discrepancy principle
if use_tv:
if R == 1:
tv_lambda = np.mean(sigma2_i)
if tv_lambda < (1 / 30) ** 2:
tv_lambda = (1 / 30) ** 2
else: # different factor for each voxel
tv_lambda_aux[index_mask] = sigma2_i
tv_lambda = np.reshape(tv_lambda_aux, (*dim[:3], 1))
fodf = fodf.astype(np.float64)
fodf = fodf / (np.sum(fodf, axis=0)[None, ...] + _EPS) # normalize fODF
# Extract compartments
fit_array = np.zeros((*dim_orig[:3], n_comp))
_reshape_2d_4d(
fodf.T,
mask,
out=fit_array[ixmin[0] : ixmax[0], ixmin[1] : ixmax[1], ixmin[2] : ixmax[2]],
)
return fit_array
def _grad(M):
"""
Computes one way first difference
"""
x_ind = list(range(1, M.shape[0])) + [M.shape[0] - 1]
y_ind = list(range(1, M.shape[1])) + [M.shape[1] - 1]
z_ind = list(range(1, M.shape[2])) + [M.shape[2] - 1]
grad = np.zeros((*M.shape[:3], 3, M.shape[-1]), dtype=np.float32)
grad[:, :, :, 0, :] = M[x_ind, :, :, :] - M
grad[:, :, :, 1, :] = M[:, y_ind, :, :] - M
grad[:, :, :, 2, :] = M[:, :, z_ind, :] - M
return grad
def _divergence(F):
"""
Computes divergence of a 3-dimensional vector field (with one way
first difference)
"""
Fx = F[:, :, :, 0, :]
Fy = F[:, :, :, 1, :]
Fz = F[:, :, :, 2, :]
x_ind = [0] + list(range(F.shape[0] - 1))
y_ind = [0] + list(range(F.shape[1] - 1))
z_ind = [0] + list(range(F.shape[2] - 1))
fx = Fx - Fx[x_ind, :, :, :]
fx[0, :, :, :] = Fx[0, :, :, :] # edge conditions
fx[-1, :, :, :] = -Fx[-2, :, :, :]
fy = Fy - Fy[:, y_ind, :, :]
fy[:, 0, :, :] = Fy[:, 0, :, :]
fy[:, -1, :, :] = -Fy[:, -2, :, :]
fz = Fz - Fz[:, :, z_ind, :]
fz[:, :, 0, :] = Fz[:, :, 0, :]
fz[:, :, -1, :] = -Fz[:, :, -2, :]
return fx + fy + fz
@warning_for_keywords()
def _reshape_2d_4d(M, mask, *, out=None):
"""
Faster reshape from 2D to 4D.
"""
if out is None:
out = np.zeros((*mask.shape, M.shape[-1]), dtype=M.dtype)
n = 0
for i, j, k in np.ndindex(mask.shape):
if mask[i, j, k]:
out[i, j, k, :] = M[n, :]
n += 1
return out