#!/usr/bin/python
"""
Classes and functions for fitting tensors.
"""
import functools
import warnings
import numpy as np
import scipy.optimize as opt
from dipy.core.geometry import vector_norm
from dipy.core.gradients import gradient_table
from dipy.core.onetime import auto_attr
from dipy.data import get_sphere
from dipy.reconst.base import ReconstModel
from dipy.reconst.vec_val_sum import vec_val_vect
from dipy.reconst.weights_method import (
weights_method_nlls_m_est,
weights_method_wls_m_est,
)
from dipy.testing.decorators import warning_for_keywords
MIN_POSITIVE_SIGNAL = 0.0001
ols_resort_msg = "Resorted to OLS solution in some voxels"
@warning_for_keywords()
def _roll_evals(evals, *, axis=-1):
"""Check evals shape.
Helper function to check that the evals provided to functions calculating
tensor statistics have the right shape
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor. shape should be (...,3).
axis : int, optional
The axis of the array which contains the 3 eigenvals. Default: -1
Returns
-------
evals : array-like
Eigenvalues of a diffusion tensor, rolled so that the 3 eigenvals are
the last axis.
"""
if evals.shape[-1] != 3:
msg = f"Expecting 3 eigenvalues, got {evals.shape[-1]}"
raise ValueError(msg)
evals = np.rollaxis(evals, axis)
return evals
[docs]
@warning_for_keywords()
def fractional_anisotropy(evals, *, axis=-1):
r"""Return Fractional anisotropy (FA) of a diffusion tensor.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
fa : array
Calculated FA. Range is 0 <= FA <= 1.
Notes
-----
FA is calculated using the following equation:
.. math::
FA = \sqrt{\frac{1}{2}\frac{(\lambda_1-\lambda_2)^2+(\lambda_1-
\lambda_3)^2+(\lambda_2-\lambda_3)^2}{\lambda_1^2+
\lambda_2^2+\lambda_3^2}}
"""
evals = _roll_evals(evals, axis=axis)
# Make sure not to get nans
all_zero = (evals == 0).all(axis=0)
ev1, ev2, ev3 = evals
fa = np.sqrt(
0.5
* ((ev1 - ev2) ** 2 + (ev2 - ev3) ** 2 + (ev3 - ev1) ** 2)
/ ((evals * evals).sum(0) + all_zero)
)
return fa
[docs]
@warning_for_keywords()
def geodesic_anisotropy(evals, *, axis=-1):
r"""
Geodesic anisotropy (GA) of a diffusion tensor.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
ga : array
Calculated GA. In the range 0 to +infinity
Notes
-----
GA is calculated using the following equation given in
:footcite:p:`Batchelor2005`:
.. math::
GA = \sqrt{\sum_{i=1}^3
\log^2{\left ( \lambda_i/<\mathbf{D}> \right )}},
\quad \textrm{where} \quad <\mathbf{D}> =
(\lambda_1\lambda_2\lambda_3)^{1/3}
Note that the notation, $<D>$, is often used as the mean diffusivity (MD)
of the diffusion tensor and can lead to confusions in the literature
(see :footcite:p:`Batchelor2005` versus :footcite:p:`Correia2011b` versus
:footcite:p:`Lee2008` for example). :footcite:p:`Correia2011b` defines
geodesic anisotropy (GA) with $<D>$ as the MD in the denominator of the
sum. This is wrong. The original paper :footcite:p:`Batchelor2005` defines
GA with $<D> = det(D)^{1/3}$, as the isotropic part of the distance. This
might be an explanation for the confusion. The isotropic part of the
diffusion tensor in Euclidean space is the MD whereas the isotropic part of
the tensor in log-Euclidean space is $det(D)^{1/3}$. The Appendix of
:footcite:p:`Batchelor2005` and log-Euclidean derivations from
:footcite:p:`Lee2008` are clear on this. Hence, all that to say that
$<D> = det(D)^{1/3}$ here for the GA definition and not MD.
See also :footcite:p:`Arsigny2006`.
References
----------
.. footbibliography::
"""
evals = _roll_evals(evals, axis=axis)
ev1, ev2, ev3 = evals
log1 = np.zeros(ev1.shape)
log2 = np.zeros(ev1.shape)
log3 = np.zeros(ev1.shape)
idx = np.nonzero(ev1)
# this is the definition in :footcite:p:`Batchelor2005`
detD = np.power(ev1 * ev2 * ev3, 1 / 3.0)
log1[idx] = np.log(ev1[idx] / detD[idx])
log2[idx] = np.log(ev2[idx] / detD[idx])
log3[idx] = np.log(ev3[idx] / detD[idx])
ga = np.sqrt(log1**2 + log2**2 + log3**2)
return ga
[docs]
@warning_for_keywords()
def mean_diffusivity(evals, *, axis=-1):
r"""
Mean Diffusivity (MD) of a diffusion tensor.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
md : array
Calculated MD.
Notes
-----
MD is calculated with the following equation:
.. math::
MD = \frac{\lambda_1 + \lambda_2 + \lambda_3}{3}
"""
evals = _roll_evals(evals, axis=axis)
return evals.mean(0)
[docs]
@warning_for_keywords()
def axial_diffusivity(evals, *, axis=-1):
r"""
Axial Diffusivity (AD) of a diffusion tensor.
Also called parallel diffusivity.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor, must be sorted in descending order
along `axis`.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
ad : array
Calculated AD.
Notes
-----
AD is calculated with the following equation:
.. math::
AD = \lambda_1
"""
evals = _roll_evals(evals, axis=axis)
ev1, ev2, ev3 = evals
return ev1
[docs]
@warning_for_keywords()
def radial_diffusivity(evals, *, axis=-1):
r"""
Radial Diffusivity (RD) of a diffusion tensor.
Also called perpendicular diffusivity.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor, must be sorted in descending order
along `axis`.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
rd : array
Calculated RD.
Notes
-----
RD is calculated with the following equation:
.. math::
RD = \frac{\lambda_2 + \lambda_3}{2}
"""
evals = _roll_evals(evals, axis=axis)
return evals[1:].mean(0)
[docs]
@warning_for_keywords()
def trace(evals, *, axis=-1):
r"""
Trace of a diffusion tensor.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
trace : array
Calculated trace of the diffusion tensor.
Notes
-----
Trace is calculated with the following equation:
.. math::
Trace = \lambda_1 + \lambda_2 + \lambda_3
"""
evals = _roll_evals(evals, axis=axis)
return evals.sum(0)
[docs]
def color_fa(fa, evecs):
r"""Color fractional anisotropy of diffusion tensor
Parameters
----------
fa : array-like
Array of the fractional anisotropy (can be 1D, 2D or 3D)
evecs : array-like
eigen vectors from the tensor model
Returns
-------
rgb : Array with 3 channels for each color as the last dimension.
Colormap of the FA with red for the x value, y for the green
value and z for the blue value.
Notes
-----
It is computed from the clipped FA between 0 and 1 using the following
formula
.. math::
rgb = abs(max(\vec{e})) \times fa
"""
if (fa.shape != evecs[..., 0, 0].shape) or ((3, 3) != evecs.shape[-2:]):
raise ValueError("Wrong number of dimensions for evecs")
return np.abs(evecs[..., 0]) * np.clip(fa, 0, 1)[..., None]
# The following are used to calculate the tensor mode:
[docs]
def determinant(q_form):
"""
The determinant of a tensor, given in quadratic form
Parameters
----------
q_form : ndarray
The quadratic form of a tensor, or an array with quadratic forms of
tensors. Should be of shape (x, y, z, 3, 3) or (n, 3, 3) or (3, 3).
Returns
-------
det : array
The determinant of the tensor in each spatial coordinate
"""
# Following the conventions used here:
# https://en.wikipedia.org/wiki/Determinant
aei = q_form[..., 0, 0] * q_form[..., 1, 1] * q_form[..., 2, 2]
bfg = q_form[..., 0, 1] * q_form[..., 1, 2] * q_form[..., 2, 0]
cdh = q_form[..., 0, 2] * q_form[..., 1, 0] * q_form[..., 2, 1]
ceg = q_form[..., 0, 2] * q_form[..., 1, 1] * q_form[..., 2, 0]
bdi = q_form[..., 0, 1] * q_form[..., 1, 0] * q_form[..., 2, 2]
afh = q_form[..., 0, 0] * q_form[..., 1, 2] * q_form[..., 2, 1]
return aei + bfg + cdh - ceg - bdi - afh
[docs]
def isotropic(q_form):
r"""
Calculate the isotropic part of the tensor.
See :footcite:p:`Ennis2006` for further details about the method.
Parameters
----------
q_form : ndarray
The quadratic form of a tensor, or an array with quadratic forms of
tensors. Should be of shape (x,y,z,3,3) or (n, 3, 3) or (3,3).
Returns
-------
A_hat: ndarray
The isotropic part of the tensor in each spatial coordinate
Notes
-----
The isotropic part of a tensor is defined as (equations 3-5 of
:footcite:p:`Ennis2006`):
.. math::
\bar{A} = \frac{1}{2} tr(A) I
References
----------
.. footbibliography::
"""
tr_A = q_form[..., 0, 0] + q_form[..., 1, 1] + q_form[..., 2, 2]
my_I = np.eye(3)
tr_AI = tr_A.reshape(tr_A.shape + (1, 1)) * my_I
return (1 / 3.0) * tr_AI
[docs]
def deviatoric(q_form):
r"""
Calculate the deviatoric (anisotropic) part of the tensor.
See :footcite:p:`Ennis2006` for further details about the method.
Parameters
----------
q_form : ndarray
The quadratic form of a tensor, or an array with quadratic forms of
tensors. Should be of shape (x,y,z,3,3) or (n, 3, 3) or (3,3).
Returns
-------
A_squiggle : ndarray
The deviatoric part of the tensor in each spatial coordinate.
Notes
-----
The deviatoric part of the tensor is defined as (equations 3-5 in
:footcite:p:`Ennis2006`):
.. math::
\widetilde{A} = A - \bar{A}
Where $A$ is the tensor quadratic form and $\bar{A}$ is the anisotropic
part of the tensor.
References
----------
.. footbibliography::
"""
A_squiggle = q_form - isotropic(q_form)
return A_squiggle
[docs]
def norm(q_form):
r"""
Calculate the Frobenius norm of a tensor quadratic form
Parameters
----------
q_form: ndarray
The quadratic form of a tensor, or an array with quadratic forms of
tensors. Should be of shape (x,y,z,3,3) or (n, 3, 3) or (3,3).
Returns
-------
norm : ndarray
The Frobenius norm of the 3,3 tensor q_form in each spatial
coordinate.
Notes
-----
The Frobenius norm is defined as:
.. math::
||A||_F = [\sum_{i,j} abs(a_{i,j})^2]^{1/2}
See Also
--------
np.linalg.norm
"""
return np.sqrt(np.sum(np.sum(np.abs(q_form**2), -1), -1))
[docs]
def mode(q_form):
r"""
Mode (MO) of a diffusion tensor.
See :footcite:p:`Ennis2006` for further details about the method.
Parameters
----------
q_form : ndarray
The quadratic form of a tensor, or an array with quadratic forms of
tensors. Should be of shape (x, y, z, 3, 3) or (n, 3, 3) or (3, 3).
Returns
-------
mode : array
Calculated tensor mode in each spatial coordinate.
Notes
-----
Mode ranges between -1 (planar anisotropy) and +1 (linear anisotropy)
with 0 representing isotropy. Mode is calculated with the following
equation (equation 9 in :footcite:p:`Ennis2006`):
.. math::
Mode = 3*\sqrt{6}*det(\widetilde{A}/norm(\widetilde{A}))
Where $\widetilde{A}$ is the deviatoric part of the tensor quadratic form.
References
----------
.. footbibliography::
"""
A_squiggle = deviatoric(q_form)
A_s_norm = norm(A_squiggle)
mode = np.zeros_like(A_s_norm)
nonzero = A_s_norm != 0
A_squiggle_nonzero = A_squiggle[nonzero]
# Add two dims for the (3,3), so that it can broadcast on A_squiggle
A_s_norm_nonzero = A_s_norm[nonzero].reshape(-1, 1, 1)
mode_nonzero = 3 * np.sqrt(6) * determinant(A_squiggle_nonzero / A_s_norm_nonzero)
mode[nonzero] = mode_nonzero
return mode
[docs]
@warning_for_keywords()
def linearity(evals, *, axis=-1):
r"""
The linearity of the tensor.
See :footcite:p:`Westin1997` for further details about the method.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
linearity : array
Calculated linearity of the diffusion tensor.
Notes
-----
Linearity is calculated with the following equation:
.. math::
Linearity = \frac{\lambda_1-\lambda_2}{\lambda_1+\lambda_2+\lambda_3}
References
----------
.. footbibliography::
"""
evals = _roll_evals(evals, axis=axis)
ev1, ev2, ev3 = evals
return (ev1 - ev2) / evals.sum(0)
[docs]
@warning_for_keywords()
def planarity(evals, *, axis=-1):
r"""
The planarity of the tensor.
See :footcite:p:`Westin1997` for further details about the method.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
linearity : array
Calculated linearity of the diffusion tensor.
Notes
-----
Planarity is calculated with the following equation:
.. math::
Planarity =
\frac{2 (\lambda_2-\lambda_3)}{\lambda_1+\lambda_2+\lambda_3}
References
----------
.. footbibliography::
"""
evals = _roll_evals(evals, axis=axis)
ev1, ev2, ev3 = evals
return 2 * (ev2 - ev3) / evals.sum(0)
[docs]
@warning_for_keywords()
def sphericity(evals, *, axis=-1):
r"""
The sphericity of the tensor.
See :footcite:p:`Westin1997` for further details about the method.
Parameters
----------
evals : array-like
Eigenvalues of a diffusion tensor.
axis : int, optional
Axis of `evals` which contains 3 eigenvalues.
Returns
-------
sphericity : array
Calculated sphericity of the diffusion tensor.
Notes
-----
Sphericity is calculated with the following equation:
.. math::
Sphericity = \frac{3 \lambda_3)}{\lambda_1+\lambda_2+\lambda_3}
References
----------
.. footbibliography::
"""
evals = _roll_evals(evals, axis=axis)
ev1, ev2, ev3 = evals
return (3 * ev3) / evals.sum(0)
[docs]
def apparent_diffusion_coef(q_form, sphere):
r"""
Calculate the apparent diffusion coefficient (ADC) in each direction of a
sphere.
Parameters
----------
q_form : ndarray
The quadratic form of a tensor, or an array with quadratic forms of
tensors. Should be of shape (..., 3, 3)
sphere : a Sphere class instance
The ADC will be calculated for each of the vertices in the sphere
Notes
-----
The calculation of ADC, relies on the following relationship:
.. math::
ADC = \vec{b} Q \vec{b}^T
Where Q is the quadratic form of the tensor.
"""
bvecs = sphere.vertices
bvals = np.ones(bvecs.shape[0])
gtab = gradient_table(bvals, bvecs=bvecs)
D = design_matrix(gtab)[:, :6]
return -np.dot(lower_triangular(q_form), D.T)
[docs]
def tensor_prediction(dti_params, gtab, S0):
r"""
Predict a signal given tensor parameters.
Parameters
----------
dti_params : ndarray
Tensor parameters. The last dimension should have 12 tensor
parameters: 3 eigenvalues, followed by the 3 corresponding
eigenvectors.
gtab : a GradientTable class instance
The gradient table for this prediction
S0 : float or ndarray
The non diffusion-weighted signal in every voxel, or across all
voxels. Default: 1
Notes
-----
The predicted signal is given by:
.. math::
S(\theta, b) = S_0 * e^{-b ADC}
where $ADC = \theta Q \theta^T$, $\theta$ is a unit vector pointing at any
direction on the sphere for which a signal is to be predicted, $b$ is the b
value provided in the GradientTable input for that direction, $Q$ is the
quadratic form of the tensor determined by the input parameters.
"""
evals = dti_params[..., :3]
evecs = dti_params[..., 3:].reshape(dti_params.shape[:-1] + (3, 3))
qform = vec_val_vect(evecs, evals)
del evals, evecs
lower_tri = lower_triangular(qform, b0=S0)
del qform
D = design_matrix(gtab)
return np.exp(np.dot(lower_tri, D.T))
[docs]
class TensorModel(ReconstModel):
"""Diffusion Tensor"""
def __init__(self, gtab, *args, fit_method="WLS", return_S0_hat=False, **kwargs):
"""A Diffusion Tensor Model.
See :footcite:p:`Basser1994b` and :footcite:p:`Basser1996` for further
details about the model.
Parameters
----------
gtab : GradientTable class instance
Gradient table.
fit_method : str or callable, optional
str can be one of the following:
'WLS' for weighted least squares
:func:`dti.wls_fit_tensor`
'LS' or 'OLS' for ordinary least squares
:func:`dti.ols_fit_tensor`
'NLLS' for non-linear least-squares
:func:`dti.nlls_fit_tensor`
'RT' or 'restore' or 'RESTORE' for RESTORE robust tensor
fitting :footcite:p:`Chang2005`
:func:`dti.restore_fit_tensor`
callable has to have the signature:
``fit_method(design_matrix, data, *args, **kwargs)``
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
args, kwargs : arguments and key-word arguments passed to the
fit_method. See :func:`dti.wls_fit_tensor`,
:func:`dti.ols_fit_tensor` for details
min_signal : float, optional
The minimum signal value. Needs to be a strictly positive
number. Default: minimal signal in the data provided to `fit`.
Notes
-----
In order to increase speed of processing, tensor fitting is done
simultaneously over many voxels. Many fit_methods use the 'step'
parameter to set the number of voxels that will be fit at once in each
iteration. This is the chunk size as a number of voxels. A larger step
value should speed things up, but it will also take up more memory. It
is advisable to keep an eye on memory consumption as this value is
increased.
E.g., in :func:`iter_fit_tensor` we have a default step value of
1e4
References
----------
.. footbibliography::
"""
ReconstModel.__init__(self, gtab)
if not callable(fit_method):
try:
if fit_method.upper() in ["NLS", "NLLS"] and "step" in kwargs:
_ = kwargs.pop("step")
warnings.warn(
"The 'step' parameter can not be used in the "
f"{fit_method.upper()} method. It will be ignored.",
UserWarning,
stacklevel=2,
)
fit_method = common_fit_methods[fit_method.upper()]
except KeyError as e:
e_s = '"' + str(fit_method) + '" is not a known fit '
e_s += "method, the fit method should either be a "
e_s += "function or one of the common fit methods"
raise ValueError(e_s) from e
self.fit_method = fit_method
self.return_S0_hat = return_S0_hat
self.design_matrix = design_matrix(self.gtab)
self.args = args
self.kwargs = kwargs
self.min_signal = self.kwargs.pop("min_signal", None)
if self.min_signal is not None and self.min_signal <= 0:
e_s = "The `min_signal` key-word argument needs to be strictly"
e_s += " positive."
raise ValueError(e_s)
self.extra = {}
[docs]
@warning_for_keywords()
def fit(self, data, *, mask=None):
"""Fit method of the DTI model class
Parameters
----------
data : array
The measured signal from one voxel.
mask : array, optional
A boolean array used to mark the coordinates in the data that
should be analyzed that has the shape data.shape[:-1]
"""
S0_params = None
img_shape = data.shape[:-1]
if mask is not None:
# Check for valid shape of the mask
if mask.shape != img_shape:
raise ValueError("Mask is not the same shape as data.")
mask = np.asarray(mask, dtype=bool)
data_in_mask = np.reshape(data[mask], (-1, data.shape[-1]))
if self.min_signal is None:
min_signal = MIN_POSITIVE_SIGNAL
else:
min_signal = self.min_signal
data_in_mask = np.maximum(data_in_mask, min_signal)
params_in_mask, extra = self.fit_method(
self.design_matrix,
data_in_mask,
*self.args,
return_S0_hat=self.return_S0_hat,
**self.kwargs,
)
if self.return_S0_hat:
params_in_mask, model_S0 = params_in_mask
if mask is None:
out_shape = data.shape[:-1] + (-1,)
dti_params = params_in_mask.reshape(out_shape)
if self.return_S0_hat:
S0_params = model_S0.reshape(out_shape[:-1])
if extra is not None:
for key in extra:
self.extra[key] = extra[key].reshape(data.shape)
else:
dti_params = np.zeros(data.shape[:-1] + (12,))
dti_params[mask, :] = params_in_mask
if self.return_S0_hat:
S0_params = np.zeros(data.shape[:-1])
try:
S0_params[mask] = model_S0.squeeze(axis=-1)
except ValueError:
S0_params[mask] = model_S0
if extra is not None:
for key in extra:
self.extra[key] = np.zeros(data.shape)
self.extra[key][mask, :] = extra[key]
return TensorFit(self, dti_params, model_S0=S0_params)
[docs]
@warning_for_keywords()
def predict(self, dti_params, *, S0=1.0):
"""
Predict a signal for this TensorModel class instance given parameters.
Parameters
----------
dti_params : ndarray
The last dimension should have 12 tensor parameters: 3
eigenvalues, followed by the 3 eigenvectors
S0 : float or ndarray, optional
The non diffusion-weighted signal in every voxel, or across all
voxels.
"""
return tensor_prediction(dti_params, self.gtab, S0)
[docs]
class TensorFit:
@warning_for_keywords()
def __init__(self, model, model_params, *, model_S0=None):
"""Initialize a TensorFit class instance."""
self.model = model
self.model_params = model_params
self.model_S0 = model_S0
def __getitem__(self, index):
model_params = self.model_params
model_S0 = self.model_S0
N = model_params.ndim
if type(index) is not tuple:
index = (index,)
elif len(index) >= model_params.ndim:
raise IndexError("IndexError: invalid index")
index = index + (slice(None),) * (N - len(index))
if model_S0 is not None:
model_S0 = model_S0[index[:-1]]
return type(self)(self.model, model_params[index], model_S0=model_S0)
@property
def S0_hat(self):
return self.model_S0
@property
def shape(self):
return self.model_params.shape[:-1]
@property
def directions(self):
"""
For tracking - return the primary direction in each voxel
"""
return self.evecs[..., None, :, 0]
@property
def evals(self):
"""
Returns the eigenvalues of the tensor as an array
"""
return self.model_params[..., :3]
@property
def evecs(self):
"""
Returns the eigenvectors of the tensor as an array, columnwise
"""
evecs = self.model_params[..., 3:12]
return evecs.reshape(self.shape + (3, 3))
@property
def quadratic_form(self):
"""Calculates the 3x3 diffusion tensor for each voxel"""
# do `evecs * evals * evecs.T` where * is matrix multiply
# einsum does this with:
# np.einsum('...ij,...j,...kj->...ik', evecs, evals, evecs)
return vec_val_vect(self.evecs, self.evals)
[docs]
@warning_for_keywords()
def lower_triangular(self, *, b0=None):
return lower_triangular(self.quadratic_form, b0=b0)
[docs]
@auto_attr
def fa(self):
"""Fractional anisotropy (FA) calculated from cached eigenvalues."""
return fractional_anisotropy(self.evals)
[docs]
@auto_attr
def color_fa(self):
"""Color fractional anisotropy of diffusion tensor"""
return color_fa(self.fa, self.evecs)
[docs]
@auto_attr
def ga(self):
"""Geodesic anisotropy (GA) calculated from cached eigenvalues."""
return geodesic_anisotropy(self.evals)
[docs]
@auto_attr
def mode(self):
"""
Tensor mode calculated from cached eigenvalues.
"""
return mode(self.quadratic_form)
[docs]
@auto_attr
def md(self):
r"""
Mean diffusivity (MD) calculated from cached eigenvalues.
Returns
-------
md : array (V, 1)
Calculated MD.
Notes
-----
MD is calculated with the following equation:
.. math::
MD = \frac{\lambda_1+\lambda_2+\lambda_3}{3}
"""
return self.trace / 3.0
[docs]
@auto_attr
def rd(self):
r"""
Radial diffusivity (RD) calculated from cached eigenvalues.
Returns
-------
rd : array (V, 1)
Calculated RD.
Notes
-----
RD is calculated with the following equation:
.. math::
RD = \frac{\lambda_2 + \lambda_3}{2}
"""
return radial_diffusivity(self.evals)
[docs]
@auto_attr
def ad(self):
r"""
Axial diffusivity (AD) calculated from cached eigenvalues.
Returns
-------
ad : array (V, 1)
Calculated AD.
Notes
-----
AD is calculated with the following equation:
.. math::
AD = \lambda_1
"""
return axial_diffusivity(self.evals)
[docs]
@auto_attr
def trace(self):
r"""
Trace of the tensor calculated from cached eigenvalues.
Returns
-------
trace : array (V, 1)
Calculated trace.
Notes
-----
The trace is calculated with the following equation:
.. math::
trace = \lambda_1 + \lambda_2 + \lambda_3
"""
return trace(self.evals)
[docs]
@auto_attr
def planarity(self):
r"""
Returns
-------
sphericity : array
Calculated sphericity of the diffusion tensor
:footcite:p:`Westin1997`.
Notes
-----
Sphericity is calculated with the following equation:
.. math::
Sphericity =
\frac{2 (\lambda_2 - \lambda_3)}{\lambda_1+\lambda_2+\lambda_3}
References
----------
.. footbibliography::
"""
return planarity(self.evals)
[docs]
@auto_attr
def linearity(self):
r"""
Returns
-------
linearity : array
Calculated linearity of the diffusion tensor
:footcite:p:`Westin1997`.
Notes
-----
Linearity is calculated with the following equation:
.. math::
Linearity =
\frac{\lambda_1-\lambda_2}{\lambda_1+\lambda_2+\lambda_3}
References
----------
.. footbibliography::
"""
return linearity(self.evals)
[docs]
@auto_attr
def sphericity(self):
r"""
Returns
-------
sphericity : array
Calculated sphericity of the diffusion tensor
:footcite:p:`Westin1997`.
Notes
-----
Sphericity is calculated with the following equation:
.. math::
Sphericity = \frac{3 \lambda_3}{\lambda_1+\lambda_2+\lambda_3}
References
----------
.. footbibliography::
"""
return sphericity(self.evals)
[docs]
def odf(self, sphere):
r"""
The diffusion orientation distribution function (dODF). This is an
estimate of the diffusion distance in each direction
Parameters
----------
sphere : Sphere class instance.
The dODF is calculated in the vertices of this input.
Returns
-------
odf : ndarray
The diffusion distance in every direction of the sphere in every
voxel in the input data.
Notes
-----
This is based on equation 3 in :footcite:p:`Aganj2010`. To re-derive it
from scratch, follow steps in :footcite:p:`Descoteaux2008b`, Section 7.9
Equation 7.24 but with an $r^2$ term in the integral.
References
----------
.. footbibliography::
"""
odf = np.zeros((self.evals.shape[:-1] + (sphere.vertices.shape[0],)))
if len(self.evals.shape) > 1:
mask = np.where(
(self.evals[..., 0] > 0)
& (self.evals[..., 1] > 0)
& (self.evals[..., 2] > 0)
)
evals = self.evals[mask]
evecs = self.evecs[mask]
else:
evals = self.evals
evecs = self.evecs
lower = 4 * np.pi * np.sqrt(np.prod(evals, -1))
projection = np.dot(sphere.vertices, evecs)
projection /= np.sqrt(evals)
result = ((vector_norm(projection) ** -3) / lower).T
if len(self.evals.shape) > 1:
odf[mask] = result
else:
odf = result
return odf
[docs]
def adc(self, sphere):
r"""
Calculate the apparent diffusion coefficient (ADC) in each direction on
the sphere for each voxel in the data
Parameters
----------
sphere : Sphere class instance
Sphere providing sample directions to compute the apparent diffusion
coefficient.
Returns
-------
adc : ndarray
The estimates of the apparent diffusion coefficient in every
direction on the input sphere
Notes
-----
The calculation of ADC, relies on the following relationship:
.. math::
ADC = \vec{b} Q \vec{b}^T
Where Q is the quadratic form of the tensor.
"""
return apparent_diffusion_coef(self.quadratic_form, sphere)
[docs]
@warning_for_keywords()
def predict(self, gtab, *, S0=None, step=None):
r"""
Given a model fit, predict the signal on the vertices of a sphere
Parameters
----------
gtab : a GradientTable class instance
This encodes the directions for which a prediction is made
S0 : float array, optional
The mean non-diffusion weighted signal in each voxel. Default:
The fitted S0 value in all voxels if it was fitted. Otherwise 1 in
all voxels.
step : int, optional
The chunk size as a number of voxels. Optional parameter with
default value 10,000.
In order to increase speed of processing, tensor fitting is done
simultaneously over many voxels. This parameter sets the number of
voxels that will be fit at once in each iteration. A larger step
value should speed things up, but it will also take up more memory.
It is advisable to keep an eye on memory consumption as this value
is increased.
Notes
-----
The predicted signal is given by:
.. math::
S(\theta, b) = S_0 * e^{-b ADC}
Where:
.. math::
ADC = \theta Q \theta^T
$\theta$ is a unit vector pointing at any direction on the sphere for
which a signal is to be predicted and $b$ is the b value provided in
the GradientTable input for that direction
"""
if S0 is None:
S0 = self.model_S0
if S0 is None: # if we didn't input or estimate S0 just use 1
S0 = 1.0
shape = self.model_params.shape[:-1]
size = np.prod(shape)
if step is None:
step = self.model.kwargs.get("step", size)
if step >= size:
return tensor_prediction(self.model_params[..., 0:12], gtab, S0=S0)
params = np.reshape(self.model_params, (-1, self.model_params.shape[-1]))
predict = np.empty((size, gtab.bvals.shape[0]))
if isinstance(S0, np.ndarray):
S0 = S0.ravel()
for i in range(0, size, step):
if isinstance(S0, np.ndarray):
this_S0 = S0[i : i + step]
else:
this_S0 = S0
predict[i : i + step] = tensor_prediction(
params[i : i + step], gtab, S0=this_S0
)
return predict.reshape(shape + (gtab.bvals.shape[0],))
[docs]
@warning_for_keywords()
def iter_fit_tensor(*, step=1e4):
"""Wrap a fit_tensor func and iterate over chunks of data with given length
Splits data into a number of chunks of specified size and iterates the
decorated fit_tensor function over them. This is useful to counteract the
temporary but significant memory usage increase in fit_tensor functions
that use vectorized operations and need to store large temporary arrays for
their vectorized operations.
Parameters
----------
step : int, optional
The chunk size as a number of voxels. Optional parameter with default
value 10,000.
In order to increase speed of processing, tensor fitting is done
simultaneously over many voxels. This parameter sets the number of
voxels that will be fit at once in each iteration. A larger step value
should speed things up, but it will also take up more memory. It is
advisable to keep an eye on memory consumption as this value is
increased.
"""
def iter_decorator(fit_tensor):
"""Actual iter decorator returned by iter_fit_tensor dec factory
Parameters
----------
fit_tensor : callable
A tensor fitting callable (most likely a function). The callable
has to have the signature:
``fit_method(design_matrix, data, *args, **kwargs)``
"""
@functools.wraps(fit_tensor)
def wrapped_fit_tensor(
design_matrix, data, *args, return_S0_hat=False, step=step, **kwargs
):
"""Iterate fit_tensor function over the data chunks
Parameters
----------
design_matrix : array (g, 7)
Design matrix holding the covariants used to solve for the
regression coefficients.
data : array ([X, Y, Z, ...], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the
fit.
step : int, optional
The chunk size as a number of voxels. Overrides `step` value
of `iter_fit_tensor`.
args : {list,tuple}
Any extra optional positional arguments passed to `fit_tensor`.
kwargs : dict
Any extra optional keyword arguments passed to `fit_tensor`.
"""
shape = data.shape[:-1]
size = np.prod(shape)
step = int(step) or size
weights = kwargs["weights"] if "weights" in kwargs else None
if weights is None:
kwargs.pop("weights", None)
if step >= size:
return fit_tensor(
design_matrix, data, *args, return_S0_hat=return_S0_hat, **kwargs
)
data = data.reshape(-1, data.shape[-1])
if weights is not None:
weights = weights.reshape(-1, weights.shape[-1])
if design_matrix.shape[-1] == 22: # DKI
sz = 22
else: # DTI
sz = 7 if kwargs.get("return_lower_triangular", False) else 12
dtiparams = np.empty((size, sz), dtype=np.float64)
if return_S0_hat:
S0params = np.empty(size, dtype=np.float64)
extra = {}
for i in range(0, size, step):
if weights is not None:
kwargs["weights"] = weights[i : i + step]
if return_S0_hat:
(dtiparams[i : i + step], S0params[i : i + step]), extra_i = (
fit_tensor(
design_matrix,
data[i : i + step],
*args,
return_S0_hat=return_S0_hat,
**kwargs,
)
)
else:
dtiparams[i : i + step], extra_i = fit_tensor(
design_matrix, data[i : i + step], *args, **kwargs
)
if extra_i is not None:
for key in extra_i:
if i == 0:
extra[key] = np.empty(data.shape)
extra[key][i : i + step] = extra_i[key]
if extra:
for key in extra:
extra[key] = extra[key].reshape(shape + (-1,))
if return_S0_hat:
return (
dtiparams.reshape(shape + (sz,)),
S0params.reshape(shape + (1,)),
), extra
else:
return dtiparams.reshape(shape + (sz,)), extra
return wrapped_fit_tensor
return iter_decorator
[docs]
@iter_fit_tensor()
@warning_for_keywords()
def wls_fit_tensor(
design_matrix,
data,
*,
weights=None,
return_S0_hat=False,
return_lower_triangular=False,
return_leverages=False,
):
r"""
Computes weighted least squares (WLS) fit to calculate self-diffusion
tensor using a linear regression model.
See :footcite:p:`Chung2006` for further details about the method.
Parameters
----------
design_matrix : array (g, 7)
Design matrix holding the covariants used to solve for the regression
coefficients.
data : array ([X, Y, Z, ...], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
weights : array ([X, Y, Z, ...], g), optional
Weights to apply for fitting. These weights must correspond to the
squared residuals such that $S = \sum_i w_i r_i^2$.
If not provided, weights are estimated as the squared predicted signal
from an initial OLS fit :footcite:p:`Chung2006`.
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
return_lower_triangular : bool, optional
Boolean to return (True) or not (False) the coefficients of the fit.
return_leverages : bool, optional
Boolean to return (True) or not (False) the fitting leverages.
Returns
-------
eigvals : array (..., 3)
Eigenvalues from eigen decomposition of the tensor.
eigvecs : array (..., 3, 3)
Associated eigenvectors from eigen decomposition of the tensor.
Eigenvectors are columnar (e.g. eigvecs[:,j] is associated with
eigvals[j])
leverages : array (g)
Leverages of the fitting problem (if return_leverages is True)
See Also
--------
decompose_tensor
Notes
-----
In Chung, et al. 2006, the regression of the WLS fit needed an unbiased
preliminary estimate of the weights and therefore the ordinary least
squares (OLS) estimates were used. A "two pass" method was implemented:
1. calculate OLS estimates of the data
2. apply the OLS estimates as weights to the WLS fit of the data
This ensured heteroscedasticity could be properly modeled for various
types of bootstrap resampling (namely residual bootstrap).
.. math::
y = \mathrm{data} \\
X = \mathrm{design matrix} \\
\hat{\beta}_\mathrm{WLS} =
\mathrm{desired regression coefficients (e.g. tensor)}\\
\\
\hat{\beta}_\mathrm{WLS} = (X^T W X)^{-1} X^T W y \\
\\
W = \mathrm{diag}((X \hat{\beta}_\mathrm{OLS})^2),
\mathrm{where} \hat{\beta}_\mathrm{OLS} = (X^T X)^{-1} X^T y
References
----------
.. footbibliography::
"""
tol = 1e-6
data = np.asarray(data)
log_s = np.log(data)
if weights is None: # calculate weights
fit_result, _ = ols_fit_tensor(
design_matrix, data, return_lower_triangular=True
)
w = np.exp(fit_result @ design_matrix.T)
else:
w = np.sqrt(weights)
# the weighted problem design_matrix * w is much larger (differs per voxel)
if return_leverages is False:
fit_result = np.einsum(
"...ij,...j", np.linalg.pinv(design_matrix * w[..., None]), w * log_s
)
leverages = None
else:
tmp = np.einsum(
"...ij,...j->...ij", np.linalg.pinv(design_matrix * w[..., None]), w
)
fit_result = np.einsum("...ij,...j", tmp, log_s)
leverages = np.einsum("ij,...ji->...i", design_matrix, tmp)
if leverages is not None:
leverages = {"leverages": leverages}
if return_lower_triangular:
return fit_result, leverages
if return_S0_hat:
return (
eig_from_lo_tri(fit_result, min_diffusivity=tol / -design_matrix.min()),
np.exp(-fit_result[:, -1]),
), leverages
else:
return eig_from_lo_tri(
fit_result, min_diffusivity=tol / -design_matrix.min()
), leverages
[docs]
@iter_fit_tensor()
@warning_for_keywords()
def ols_fit_tensor(
design_matrix,
data,
*,
return_S0_hat=False,
return_lower_triangular=False,
return_leverages=False,
):
r"""
Computes ordinary least squares (OLS) fit to calculate self-diffusion
tensor using a linear regression model.
See :footcite:p:`Chung2006` for further details about the method.
Parameters
----------
design_matrix : array (g, 7)
Design matrix holding the covariants used to solve for the regression
coefficients.
data : array ([X, Y, Z, ...], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
return_lower_triangular : bool, optional
Boolean to return (True) or not (False) the coefficients of the fit.
return_leverages : bool, optional
Boolean to return (True) or not (False) the fitting leverages.
Returns
-------
eigvals : array (..., 3)
Eigenvalues from eigen decomposition of the tensor.
eigvecs : array (..., 3, 3)
Associated eigenvectors from eigen decomposition of the tensor.
Eigenvectors are columnar (e.g. eigvecs[:,j] is associated with
eigvals[j])
leverages : array (g)
Leverages of the fitting problem (if return_leverages is True)
See Also
--------
WLS_fit_tensor, decompose_tensor, design_matrix
Notes
-----
.. math::
y = \mathrm{data} \\
X = \mathrm{design matrix} \\
\hat{\beta}_\mathrm{OLS} = (X^T X)^{-1} X^T y
References
----------
.. footbibliography::
"""
tol = 1e-6
data = np.asarray(data)
if return_leverages is False:
fit_result = np.einsum(
"...ij,...j", np.linalg.pinv(design_matrix), np.log(data)
)
leverages = None
else:
tmp = np.linalg.pinv(design_matrix)
fit_result = np.einsum("...ij,...j", tmp, np.log(data))
leverages = np.einsum("ij,ji->i", design_matrix, tmp)
if leverages is not None:
leverages = {"leverages": leverages}
if return_lower_triangular:
return fit_result, leverages
if return_S0_hat:
return (
eig_from_lo_tri(fit_result, min_diffusivity=tol / -design_matrix.min()),
np.exp(-fit_result[:, -1]),
), leverages
else:
return eig_from_lo_tri(
fit_result, min_diffusivity=tol / -design_matrix.min()
), leverages
def _ols_fit_matrix(design_matrix):
"""
Helper function to calculate the ordinary least squares (OLS)
fit as a matrix multiplication. Mainly used to calculate WLS weights. Can
be used to calculate regression coefficients in OLS but not recommended.
See Also
--------
wls_fit_tensor, ols_fit_tensor
Examples
---------
ols_fit = _ols_fit_matrix(design_mat)
ols_data = np.dot(ols_fit, data)
"""
U, S, V = np.linalg.svd(design_matrix, False)
return np.dot(U, U.T)
class _NllsHelper:
r"""Class with member functions to return nlls error and derivative."""
def err_func(self, tensor, design_matrix, data, weights=None):
r"""
Error function for the non-linear least-squares fit of the tensor.
Parameters
----------
tensor : array (3,3)
The 3-by-3 tensor matrix
design_matrix : array
The design matrix
data : array
The voxel signal in all gradient directions
weights : array ([X, Y, Z, ...], g), optional
Weights to apply for fitting. These weights must correspond to the
squared residuals such that $S = \sum_i w_i r_i^2$.
"""
# This is the predicted signal given the params:
y = np.exp(np.dot(design_matrix, tensor))
self.y = y # cache the results
# Compute the residuals
residuals = data - y
# Set weights
if weights is None:
self.sqrt_w = 1 # cache weights for the *non-squared* residuals
# And we return the SSE:
return residuals
else:
# Return the weighted residuals:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.sqrt_w = np.sqrt(weights)
ans = self.sqrt_w * residuals
if np.iterable(weights):
# cache the weights for the *non-squared* residuals
self.sqrt_w = self.sqrt_w[:, None]
return ans
def jacobian_func(self, tensor, design_matrix, data, weights=None):
r"""The Jacobian is the first derivative of the error function.
Parameters
----------
tensor : array (3,3)
The 3-by-3 tensor matrix
design_matrix : array
The design matrix
data : array
The voxel signal in all gradient directions
weights : array ([X, Y, Z, ...], g), optional
Weights to apply for fitting. These weights must correspond to the
squared residuals such that $S = \sum_i w_i r_i^2$.
Notes
-----
This Jacobian correctly accounts for weights on the squared residuals
if provided.
References
----------
.. footbibliography::
"""
# minus sign, because derivative of residuals = data - y
# sqrt(w) because w corresponds to the squared residuals
if weights is None:
return -self.y[:, None] * design_matrix
else:
return -self.y[:, None] * design_matrix * self.sqrt_w
@warning_for_keywords()
def _decompose_tensor_nan(tensor, tensor_alternative, *, min_diffusivity=0):
"""Helper function that expands the function decompose_tensor to deal
with tensor with nan elements.
Computes tensor eigen decomposition to calculate eigenvalues and
eigenvectors (Basser et al., 1994a). Some fit approaches can produce nan
tensor elements in background voxels (particularly non-linear approaches).
This function avoids the eigen decomposition errors of nan tensor elements
by replacing tensor with nan elements by a given alternative tensor
estimate.
Parameters
----------
tensor : array (3, 3)
Hermitian matrix representing a diffusion tensor.
tensor_alternative : array (3, 3)
Hermitian matrix representing a diffusion tensor obtain from an
approach that does not produce nan tensor elements
min_diffusivity : float, optional
Because negative eigenvalues are not physical and small eigenvalues,
much smaller than the diffusion weighting, cause quite a lot of noise
in metrics such as fa, diffusivity values smaller than
`min_diffusivity` are replaced with `min_diffusivity`.
Returns
-------
eigvals : array (3)
Eigenvalues from eigen decomposition of the tensor. Negative
eigenvalues are replaced by zero. Sorted from largest to smallest.
eigvecs : array (3, 3)
Associated eigenvectors from eigen decomposition of the tensor.
Eigenvectors are columnar (e.g. eigvecs[..., :, j] is associated with
eigvals[..., j])
"""
try:
evals, evecs = decompose_tensor(tensor[:6], min_diffusivity=min_diffusivity)
except np.linalg.LinAlgError:
evals, evecs = decompose_tensor(
tensor_alternative[:6], min_diffusivity=min_diffusivity
)
return evals, evecs
[docs]
@warning_for_keywords()
def nlls_fit_tensor(
design_matrix,
data,
*,
weights=None,
jac=True,
return_S0_hat=False,
fail_is_nan=False,
return_lower_triangular=False,
return_leverages=False,
init_params=None,
):
r"""
Fit the cumulant expansion params (e.g. DTI, DKI) using non-linear
least-squares.
Parameters
----------
design_matrix : array (g, Npar)
Design matrix holding the covariants used to solve for the regression
coefficients. First six parameters of design matrix should correspond
to the six unique diffusion tensor elements in the lower triangular
order (Dxx, Dxy, Dyy, Dxz, Dyz, Dzz), while last parameter to -log(S0)
data : array ([X, Y, Z, ...], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
weights : array ([X, Y, Z, ...], g), optional
Weights to apply for fitting. These weights must correspond to the
squared residuals such that $S = \sum_i w_i r_i^2$.
jac : bool, optional
Use the Jacobian?
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
fail_is_nan : bool, optional
Boolean to set failed NL fitting to NaN (True) or LS (False, default).
return_lower_triangular : bool, optional
Boolean to return (True) or not (False) the coefficients of the fit.
return_leverages : bool, optional
Boolean to return (True) or not (False) the fitting leverages.
init_params : array ([X, Y, Z, ...], Npar), optional
Parameters in lower triangular form as initial optimization guess.
Returns
-------
nlls_params: the eigen-values and eigen-vectors of the tensor in each
voxel.
"""
tol = 1e-6
# Detect number of parameters to estimate from design_matrix length plus
# 5 due to diffusion tensor conversion to eigenvalue and eigenvectors
npa = design_matrix.shape[-1] + 5
# Detect if number of parameters corresponds to dti
dti = npa == 12
# Flatten for the iteration over voxels:
flat_data = data.reshape((-1, data.shape[-1]))
if init_params is None:
# Use the OLS method parameters as the starting point for nlls
D, extra = ols_fit_tensor(
design_matrix,
flat_data,
return_lower_triangular=True,
return_leverages=return_leverages,
)
if extra is not None:
leverages = extra["leverages"]
# Flatten for the iteration over voxels:
ols_params = np.reshape(D, (-1, D.shape[-1]))
else:
# Replace starting guess for opt (usually ols_params) with init_params
ols_params = init_params
# Initialize parameter matrix
params = np.empty((flat_data.shape[0], npa))
# Initialize parameter matrix for storing flattened parameters
flat_params = np.empty_like(ols_params)
# For warnings
resort_to_OLS = False
# Instance of _NllsHelper, need for nlls error func and jacobian
nlls = _NllsHelper()
err_func = nlls.err_func
jac_func = nlls.jacobian_func if jac else None
if return_S0_hat:
model_S0 = np.empty((flat_data.shape[0], 1))
for vox in range(flat_data.shape[0]):
if np.all(flat_data[vox] == 0):
raise ValueError("The data in this voxel contains only zeros")
start_params = ols_params[vox]
weights_vox = weights[vox] if weights is not None else None
try:
# Do the optimization in this voxel:
this_param, status = opt.leastsq(
err_func,
start_params,
args=(design_matrix, flat_data[vox], weights_vox),
Dfun=jac_func,
)
flat_params[vox] = this_param
# Convert diffusion tensor parameters to the evals and the evecs:
evals, evecs = decompose_tensor(
from_lower_triangular(this_param[:6]),
min_diffusivity=tol / -design_matrix.min(),
)
params[vox, :3] = evals
params[vox, 3:12] = evecs.ravel()
# If leastsq failed to converge and produced nans, we'll resort to the
# OLS solution in this voxel:
except (np.linalg.LinAlgError, TypeError):
resort_to_OLS = True
this_param = start_params
flat_params[vox] = this_param # NOTE: ignores fail_is_nan
if not fail_is_nan:
# Convert diffusion tensor parameters to evals and evecs
evals, evecs = decompose_tensor(
from_lower_triangular(this_param[:6]),
min_diffusivity=tol / -design_matrix.min(),
)
params[vox, :3] = evals
params[vox, 3:12] = evecs.ravel()
else:
# Set NaN values
this_param[:] = np.nan # so that S0_hat is NaN
params[vox, :] = np.nan
if return_S0_hat:
model_S0[vox] = np.exp(-this_param[-1])
if not dti:
md2 = evals.mean(0) ** 2
params[vox, 12:] = this_param[6:-1] / md2
if resort_to_OLS:
warnings.warn(ols_resort_msg, UserWarning, stacklevel=2)
if return_leverages:
leverages = {"leverages": leverages}
else:
leverages = None
if return_lower_triangular:
return flat_params, leverages
params.shape = data.shape[:-1] + (npa,)
if return_S0_hat:
model_S0.shape = data.shape[:-1] + (1,)
return [params, model_S0], None
else:
return params, None
[docs]
@warning_for_keywords()
def restore_fit_tensor(
design_matrix, data, *, sigma=None, jac=True, return_S0_hat=False, fail_is_nan=False
):
"""Compute a robust tensor fit using the RESTORE algorithm.
Note that the RESTORE algorithm defined in :footcite:p:`Chang2005` does not define
Geman–McClure M-estimator weights as claimed (instead, Cauchy M-estimator
weights are defined), but this function does define correct Geman–McClure
M-estimator weights.
Parameters
----------
design_matrix : array of shape (g, 7)
Design matrix holding the covariants used to solve for the regression
coefficients.
data : array of shape ([X, Y, Z, n_directions], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
sigma : float, optional
An estimate of the variance. :footcite:p:`Chang2005` recommend to use
1.5267 * std(background_noise), where background_noise is estimated
from some part of the image known to contain no signal (only noise).
If not provided, will be estimated per voxel as:
sigma = 1.4826 * sqrt(N / (N - p)) * MAD(residuals)
as in :footcite:p:`Chang2012` but with the additional correction factor
1.4826 required to link standard deviation to MAD.
jac : bool, optional
Whether to use the Jacobian of the tensor to speed the non-linear
optimization procedure used to fit the tensor parameters (see also
:func:`nlls_fit_tensor`).
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
fail_is_nan : bool, optional
Boolean to set failed NL fitting to NaN (True) or LS (False).
Returns
-------
restore_params : an estimate of the tensor parameters in each voxel.
References
----------
.. footbibliography::
"""
# Detect number of parameters to estimate from design_matrix length plus
# 5 due to diffusion tensor conversion to eigenvalue and eigenvectors
npa = design_matrix.shape[-1] + 5
# Detect if number of parameters corresponds to dti
dti = npa == 12
# define some constants
p = design_matrix.shape[-1]
N = data.shape[-1]
factor = 1.4826 * np.sqrt(N / (N - p))
# Flatten for the iteration over voxels:
flat_data = data.reshape((-1, data.shape[-1]))
# calculate OLS solution
D, _ = ols_fit_tensor(design_matrix, flat_data, return_lower_triangular=True)
# Flatten for the iteration over voxels:
ols_params = np.reshape(D, (-1, D.shape[-1]))
# Initialize parameter matrix
params = np.empty((flat_data.shape[0], npa))
# For storing whether image is used in final fit for each voxel
robust = np.ones(flat_data.shape, dtype=int)
# For warnings
resort_to_OLS = False
# Instance of _NllsHelper, need for nlls error func and jacobian
nlls = _NllsHelper()
err_func = nlls.err_func
jac_func = nlls.jacobian_func if jac else None
if return_S0_hat:
model_S0 = np.empty((flat_data.shape[0], 1))
for vox in range(flat_data.shape[0]):
if np.all(flat_data[vox] == 0):
raise ValueError("The data in this voxel contains only zeros")
start_params = ols_params[vox]
try:
# Do unweighted nlls in this voxel:
this_param, status = opt.leastsq(
err_func,
start_params,
args=(design_matrix, flat_data[vox]),
Dfun=jac_func,
)
# Get the residuals:
pred_sig = np.exp(np.dot(design_matrix, this_param))
residuals = flat_data[vox] - pred_sig
# estimate or set sigma
if sigma is not None:
C = sigma
else:
C = factor * np.median(np.abs(residuals - np.median(residuals)))
# If any of the residuals are outliers (using 3 sigma as a
# criterion following Chang et al., e.g page 1089):
test_sigma = np.any(np.abs(residuals) > 3 * C)
# test for doing robust reweighting
if test_sigma:
rdx = 1
while rdx <= 10: # NOTE: capped at 10 iterations
# GM weights (original Restore paper used Cauchy weights)
C = factor * np.median(np.abs(residuals - np.median(residuals)))
denominator = (C**2 + residuals**2) ** 2
gmm = np.divide(
C**2,
denominator,
out=np.zeros_like(denominator),
where=denominator != 0,
)
# Do nlls with GMM-weighting:
this_param, status = opt.leastsq(
err_func,
start_params,
args=(design_matrix, flat_data[vox], gmm),
Dfun=jac_func,
)
# Recalculate residuals given gmm fit
pred_sig = np.exp(np.dot(design_matrix, this_param))
residuals = flat_data[vox] - pred_sig
perc = (
100
* np.linalg.norm(this_param - start_params)
/ np.linalg.norm(this_param)
)
start_params = this_param
if perc < 0.1:
break
rdx = rdx + 1
cond = np.abs(residuals) > 3 * C
if np.any(cond):
# If you still have outliers, refit without those outliers:
non_outlier_idx = np.where(np.logical_not(cond))
clean_design = design_matrix[non_outlier_idx]
clean_data = flat_data[vox][non_outlier_idx]
robust[vox] = np.logical_not(cond)
# recalculate OLS solution with clean data
new_start, _ = ols_fit_tensor(
clean_design, clean_data, return_lower_triangular=True
)
this_param, status = opt.leastsq(
err_func,
new_start,
args=(clean_design, clean_data),
Dfun=jac_func,
)
# Convert diffusion tensor parameters to the evals and the evecs:
evals, evecs = decompose_tensor(from_lower_triangular(this_param[:6]))
params[vox, :3] = evals
params[vox, 3:12] = evecs.ravel()
# If leastsq failed to converge and produced nans, we'll resort to the
# OLS solution in this voxel:
except (np.linalg.LinAlgError, TypeError):
resort_to_OLS = True
this_param = start_params
if not fail_is_nan:
# Convert diffusion tensor parameters to evals and evecs:
evals, evecs = decompose_tensor(from_lower_triangular(this_param[:6]))
params[vox, :3] = evals
params[vox, 3:12] = evecs.ravel()
else:
# Set NaN values
this_param[:] = np.nan # so that S0_hat is NaN
params[vox, :] = np.nan
if return_S0_hat:
model_S0[vox] = np.exp(-this_param[-1])
if not dti:
md2 = evals.mean(0) ** 2
params[vox, 12:] = this_param[6:-1] / md2
if resort_to_OLS:
warnings.warn(ols_resort_msg, UserWarning, stacklevel=2)
params.shape = data.shape[:-1] + (npa,)
extra = {"robust": robust}
if return_S0_hat:
model_S0.shape = data.shape[:-1] + (1,)
return [params, model_S0], extra
else:
return params, extra
[docs]
def iterative_fit_tensor(
design_matrix,
data,
*,
jac=True,
return_S0_hat=False,
fit_type=None,
num_iter=4,
weights_method=None,
):
"""Iteratively Reweighted fitting for the DTI/DKI model.
Parameters
----------
design_matrix : ndarray of shape (g, ...)
Design matrix holding the covariants used to solve for the regression
coefficients.
data : ndarray of shape ([X, Y, Z, n_directions], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
jac : bool, optional
Use the Jacobian for NLLS fitting (does nothing for WLS fitting).
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
fit_type : str, optional
Whether to use NLLS or WLS fitting scheme.
num_iter : int, optional
Number of times to iterate.
weights_method : callable, optional
A function with args and returns as follows::
(weights, robust) = weights_method(data, pred_sig, design_matrix,
leverages, idx, num_iter, robust)
Notes
-----
Take care to supply an appropriate weights_method for the fit_type.
It is possible to use NLLS fitting with weights designed for WLS fitting,
but this is a user error.
"""
tol = 1e-6
if fit_type is None:
raise ValueError("fit_type must be provided")
if weights_method is None:
raise ValueError("weights_method must be provided")
if num_iter < 2: # otherwise, weights_method will not be utilized
raise ValueError("num_iter must be 2+")
if fit_type not in ["WLS", "NLLS"]:
raise ValueError("fit_type must be 'WLS' or 'NLLS'")
# Detect number of parameters to estimate from design_matrix length plus
# 5 due to diffusion tensor conversion to eigenvalue and eigenvectors
p = design_matrix.shape[-1]
N = data.shape[-1]
if N <= p:
raise ValueError("Fewer data points than parameters.")
# Detect if number of parameters corresponds to dti
npa = p + 5
dti = npa == 12
w, robust = None, None # w = None means wls_fit_tensor uses WLS weights
D, extra, leverages = None, None, None # initialize, for clarity
TDX = num_iter
for rdx in range(1, TDX + 1):
if rdx > 1:
log_pred_sig = np.dot(design_matrix, D.T).T
pred_sig = np.exp(log_pred_sig)
w, robust = weights_method(
data, pred_sig, design_matrix, leverages, rdx, TDX, robust
)
if fit_type == "WLS":
D, extra = wls_fit_tensor(
design_matrix,
data,
weights=w,
return_lower_triangular=True,
return_leverages=True,
)
leverages = extra["leverages"] # for WLS, update leverages
if fit_type == "NLLS":
D, extra = nlls_fit_tensor(
design_matrix,
data,
weights=w,
return_lower_triangular=True,
return_leverages=(rdx == 1),
jac=jac,
init_params=D,
)
if rdx == 1: # for NLLS, leverages from OLS, so they never change
leverages = extra["leverages"]
# Convert diffusion tensor parameters to the evals and the evecs:
evals, evecs = decompose_tensor(
from_lower_triangular(D[:, :6]), min_diffusivity=tol / -design_matrix.min()
)
params = np.empty((data.shape[0:-1] + (npa,)))
params[:, :3] = evals
params[:, 3:12] = evecs.reshape(params.shape[0:-1] + (-1,))
if return_S0_hat:
model_S0 = np.exp(-D[:, -1])
if not dti:
md2 = evals.mean(axis=1)[:, None] ** 2 # NOTE: changed from axis=0
params[:, 12:] = D[:, 6:-1] / md2
extra = {"robust": robust}
if return_S0_hat:
model_S0.shape = data.shape[:-1] + (1,)
return [params, model_S0], extra
else:
return params, extra
[docs]
def robust_fit_tensor_wls(design_matrix, data, *, return_S0_hat=False, num_iter=4):
"""Iteratively Reweighted fitting for WLS for the DTI/DKI model.
Parameters
----------
design_matrix : ndarray of shape (g, ...)
Design matrix holding the covariants used to solve for the regression
coefficients.
data : ndarray of shape ([X, Y, Z, n_directions], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
num_iter : int, optional
Number of times to iterate.
Notes
-----
This is a convenience function that does::
iterative_fit_tensor(*args, **kwargs, fit_type="WLS",
weights_method=weights_method_wls)
"""
return iterative_fit_tensor(
design_matrix,
data,
return_S0_hat=return_S0_hat,
fit_type="WLS",
num_iter=num_iter,
weights_method=weights_method_wls_m_est,
)
[docs]
def robust_fit_tensor_nlls(
design_matrix, data, *, jac=True, return_S0_hat=False, num_iter=4
):
"""Iteratively Reweighted fitting for NLLS for the DTI/DKI model.
Parameters
----------
design_matrix : ndarray of shape (g, ...)
Design matrix holding the covariants used to solve for the regression
coefficients.
data : ndarray of shape ([X, Y, Z, n_directions], g)
Data or response variables holding the data. Note that the last
dimension should contain the data. It makes no copies of data.
jac : bool, optional
Use the Jacobian?
return_S0_hat : bool, optional
Boolean to return (True) or not (False) the S0 values for the fit.
num_iter : int, optional
Number of times to iterate.
Notes
-----
This is a convenience function that does::
iterative_fit_tensor(*args, **kwargs, fit_type="NLLS",
weights_method=weights_method_nlls)
"""
return iterative_fit_tensor(
design_matrix,
data,
jac=jac,
return_S0_hat=return_S0_hat,
fit_type="NLLS",
num_iter=num_iter,
weights_method=weights_method_nlls_m_est,
)
_lt_indices = np.array([[0, 1, 3], [1, 2, 4], [3, 4, 5]])
[docs]
def from_lower_triangular(D):
"""Returns a tensor given the six unique tensor elements
Given the six unique tensor elements (in the order: Dxx, Dxy, Dyy, Dxz,
Dyz, Dzz) returns a 3 by 3 tensor. All elements after the sixth are
ignored.
Parameters
----------
D : array_like, (..., >6)
Unique elements of the tensors
Returns
-------
tensor : ndarray (..., 3, 3)
3 by 3 tensors
"""
return D[..., _lt_indices]
_lt_rows = np.array([0, 1, 1, 2, 2, 2])
_lt_cols = np.array([0, 0, 1, 0, 1, 2])
[docs]
@warning_for_keywords()
def lower_triangular(tensor, *, b0=None):
"""
Returns the six lower triangular values of the tensor ordered as
(Dxx, Dxy, Dyy, Dxz, Dyz, Dzz) and a dummy variable if b0 is not None.
Parameters
----------
tensor : array_like (..., 3, 3)
a collection of 3, 3 diffusion tensors
b0 : float, optional
if b0 is not none log(b0) is returned as the dummy variable
Returns
-------
D : ndarray
If b0 is none, then the shape will be (..., 6) otherwise (..., 7)
"""
if tensor.shape[-2:] != (3, 3):
raise ValueError("Diffusion tensors should be (..., 3, 3)")
if b0 is None:
return tensor[..., _lt_rows, _lt_cols]
else:
D = np.empty(tensor.shape[:-2] + (7,), dtype=tensor.dtype)
D[..., 6] = -np.log(b0)
D[..., :6] = tensor[..., _lt_rows, _lt_cols]
return D
[docs]
@warning_for_keywords()
def decompose_tensor(tensor, *, min_diffusivity=0):
"""Returns eigenvalues and eigenvectors given a diffusion tensor
Computes tensor eigen decomposition to calculate eigenvalues and
eigenvectors (Basser et al., 1994a).
Parameters
----------
tensor : array (..., 3, 3)
Hermitian matrix representing a diffusion tensor.
min_diffusivity : float, optional
Because negative eigenvalues are not physical and small eigenvalues,
much smaller than the diffusion weighting, cause quite a lot of noise
in metrics such as fa, diffusivity values smaller than
`min_diffusivity` are replaced with `min_diffusivity`.
Returns
-------
eigvals : array (..., 3)
Eigenvalues from eigen decomposition of the tensor. Negative
eigenvalues are replaced by zero. Sorted from largest to smallest.
eigvecs : array (..., 3, 3)
Associated eigenvectors from eigen decomposition of the tensor.
Eigenvectors are columnar (e.g. eigvecs[..., :, j] is associated with
eigvals[..., j])
"""
# outputs multiplicity as well so need to unique
eigenvals, eigenvecs = np.linalg.eigh(tensor)
# need to sort the eigenvalues and associated eigenvectors
if eigenvals.ndim == 1:
# this is a lot faster when dealing with a single voxel
order = eigenvals.argsort()[::-1]
eigenvecs = eigenvecs[:, order]
eigenvals = eigenvals[order]
else:
# temporarily flatten eigenvals and eigenvecs to make sorting easier
shape = eigenvals.shape[:-1]
eigenvals = eigenvals.reshape(-1, 3)
eigenvecs = eigenvecs.reshape(-1, 3, 3)
size = eigenvals.shape[0]
order = eigenvals.argsort()[:, ::-1]
xi, yi = np.ogrid[:size, :3, :3][:2]
eigenvecs = eigenvecs[xi, yi, order[:, None, :]]
xi = np.ogrid[:size, :3][0]
eigenvals = eigenvals[xi, order]
eigenvecs = eigenvecs.reshape(shape + (3, 3))
eigenvals = eigenvals.reshape(shape + (3,))
eigenvals = eigenvals.clip(min=min_diffusivity)
# eigenvecs: each vector is columnar
return eigenvals, eigenvecs
[docs]
@warning_for_keywords()
def design_matrix(gtab, *, dtype=None):
"""Constructs design matrix for DTI weighted least squares or
least squares fitting. (Basser et al., 1994a)
Parameters
----------
gtab : A GradientTable class instance
dtype : str, optional
Parameter to control the dtype of returned designed matrix
Returns
-------
design_matrix : array (g,7)
Design matrix or B matrix assuming Gaussian distributed tensor model
design_matrix[j, :] = (Bxx, Byy, Bzz, Bxy, Bxz, Byz, dummy)
"""
if gtab.btens is None:
B = np.zeros((gtab.gradients.shape[0], 7))
B[:, 0] = gtab.bvecs[:, 0] * gtab.bvecs[:, 0] * 1.0 * gtab.bvals # Bxx
B[:, 1] = gtab.bvecs[:, 0] * gtab.bvecs[:, 1] * 2.0 * gtab.bvals # Bxy
B[:, 2] = gtab.bvecs[:, 1] * gtab.bvecs[:, 1] * 1.0 * gtab.bvals # Byy
B[:, 3] = gtab.bvecs[:, 0] * gtab.bvecs[:, 2] * 2.0 * gtab.bvals # Bxz
B[:, 4] = gtab.bvecs[:, 1] * gtab.bvecs[:, 2] * 2.0 * gtab.bvals # Byz
B[:, 5] = gtab.bvecs[:, 2] * gtab.bvecs[:, 2] * 1.0 * gtab.bvals # Bzz
B[:, 6] = np.ones(gtab.gradients.shape[0])
else:
B = np.zeros((gtab.gradients.shape[0], 7))
B[:, 0] = gtab.btens[:, 0, 0] # Bxx
B[:, 1] = gtab.btens[:, 0, 1] * 2 # Bxy
B[:, 2] = gtab.btens[:, 1, 1] # Byy
B[:, 3] = gtab.btens[:, 0, 2] * 2 # Bxz
B[:, 4] = gtab.btens[:, 1, 2] * 2 # Byz
B[:, 5] = gtab.btens[:, 2, 2] # Bzz
B[:, 6] = np.ones(gtab.gradients.shape[0])
return -B
[docs]
@warning_for_keywords()
def quantize_evecs(evecs, *, odf_vertices=None):
"""Find the closest orientation of an evenly distributed sphere
Parameters
----------
evecs : ndarray
Eigenvectors.
odf_vertices : ndarray, optional
If None, then set vertices from symmetric362 sphere. Otherwise use
passed ndarray as vertices
Returns
-------
IN : ndarray
"""
max_evecs = evecs[..., :, 0]
if odf_vertices is None:
odf_vertices = get_sphere(name="symmetric362").vertices
tup = max_evecs.shape[:-1]
mec = max_evecs.reshape(np.prod(np.array(tup)), 3)
IN = np.array([np.argmin(np.dot(odf_vertices, m)) for m in mec])
IN = IN.reshape(tup)
return IN
[docs]
@warning_for_keywords()
def eig_from_lo_tri(data, *, min_diffusivity=0):
"""
Calculates tensor eigenvalues/eigenvectors from an array containing the
lower diagonal form of the six unique tensor elements.
Parameters
----------
data : array_like (..., 6)
diffusion tensors elements stored in lower triangular order
min_diffusivity : float, optional
See decompose_tensor()
Returns
-------
dti_params : array (..., 12)
Eigen-values and eigen-vectors of the same array.
"""
data = np.asarray(data)
evals, evecs = decompose_tensor(
from_lower_triangular(data), min_diffusivity=min_diffusivity
)
dti_params = np.concatenate((evals[..., None, :], evecs), axis=-2)
return dti_params.reshape(data.shape[:-1] + (12,))
common_fit_methods = {
"WLS": wls_fit_tensor,
"WLLS": wls_fit_tensor,
"LS": ols_fit_tensor,
"LLS": ols_fit_tensor,
"OLS": ols_fit_tensor,
"OLLS": ols_fit_tensor,
"NLS": nlls_fit_tensor,
"NLLS": nlls_fit_tensor,
"RESTORE": restore_fit_tensor,
"IRLS": iterative_fit_tensor,
"RWLS": robust_fit_tensor_wls,
"RNLLS": robust_fit_tensor_nlls,
}