import multiprocessing as mp
import warnings
import numpy as np
from scipy.ndimage import affine_transform
from dipy.testing.decorators import warning_for_keywords
from dipy.utils.multiproc import determine_num_processes
def _affine_transform(kwargs):
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*scipy.*18.*", category=UserWarning)
return affine_transform(**kwargs)
def _lanczos_kernel(values, num_lobes):
"""Lanczos kernel: sinc(x) * sinc(x/a) for |x| < a, else 0.
Parameters
----------
values : array-like
Input values.
num_lobes : int
Kernel support half-width. Values where ``|x| >= num_lobes`` are
set to 0 regardless of position in the array. Typically 2 or 3.
Returns
-------
result : ndarray
Kernel values at values.
"""
values = np.asarray(values, dtype=np.float64)
mask = np.abs(values) < num_lobes
return np.where(mask, np.sinc(values) * np.sinc(values / num_lobes), 0.0)
def _lanczos_resample_1d_axis0(
data, scale, out_size, *, num_lobes=2, mode="constant", cval=0
):
"""Apply 1D Lanczos resampling along axis 0 of an N-dimensional array.
Parameters
----------
data : ndarray, shape (in_size, ...)
Input array. Must be float64.
scale : float
Ratio new_zoom / old_zoom for this axis.
out_size : int
Number of output samples along axis 0.
num_lobes : int, optional
Lanczos kernel lobes (2 or 3).
mode : str, optional
Boundary mode: 'constant', 'nearest', 'reflect', or 'wrap'.
cval : float, optional
Fill value for mode='constant'.
Returns
-------
out : ndarray, shape (out_size, ...), float64
Resampled array.
"""
valid_modes = ("constant", "nearest", "reflect", "wrap")
if mode not in valid_modes:
raise ValueError(f"Invalid mode {mode!r}. Valid modes: {valid_modes}")
in_size = data.shape[0]
extra_dims = data.ndim - 1
bc_shape = (-1,) + (1,) * extra_dims
out_idx = np.arange(out_size, dtype=np.float64)
in_coords = out_idx * scale
floor_coords = np.floor(in_coords).astype(np.intp)
result = np.zeros((out_size,) + data.shape[1:], dtype=np.float64)
weight_sum = np.zeros(out_size, dtype=np.float64)
for delta in range(-(num_lobes - 1), num_lobes + 1):
neigh = floor_coords + delta
w = _lanczos_kernel(in_coords - neigh, num_lobes)
if mode == "wrap":
sample = data[np.mod(neigh, in_size)]
elif mode == "reflect":
period = 2 * (in_size - 1) if in_size > 1 else 1
idx = np.mod(neigh, period)
idx = np.where(idx >= in_size, period - idx, idx)
sample = data[idx]
else:
sample = data[np.clip(neigh, 0, in_size - 1)]
if mode == "constant":
invalid = (neigh < 0) | (neigh >= in_size)
if invalid.any():
sample = sample.copy()
sample[invalid] = float(cval)
result += w.reshape(bc_shape) * sample
weight_sum += w
nonzero = (np.abs(weight_sum) > 1e-10).reshape(bc_shape)
weight_sum_bc = weight_sum.reshape(bc_shape)
fill = float(cval) if mode == "constant" else 0.0
return np.where(nonzero, result / np.where(nonzero, weight_sum_bc, 1.0), fill)
def _lanczos_resample_3d(
data, scale, output_shape, *, num_lobes=2, mode="constant", cval=0
):
"""Resample a 3D volume using separable Lanczos interpolation.
Applies 1D Lanczos resampling along each spatial axis in sequence.
Separability of the Lanczos kernel makes this equivalent to the full
3D tensor-product kernel at a fraction of the cost.
Parameters
----------
data : ndarray, shape (I, J, K)
Input 3D volume.
scale : ndarray, shape (3,)
Scaling factors per axis (new_zooms / zooms).
output_shape : tuple of int
Shape of the output volume.
num_lobes : int, optional
Lanczos kernel lobes (2 or 3).
mode : str, optional
Boundary mode: 'constant', 'nearest', 'reflect', or 'wrap'.
cval : float, optional
Fill value for mode='constant'.
Returns
-------
out : ndarray, shape output_shape, dtype matches data.dtype
Resampled volume.
"""
vol = data.astype(np.float64)
for axis in range(3):
vol = np.moveaxis(vol, axis, 0)
vol = _lanczos_resample_1d_axis0(
vol,
scale[axis],
output_shape[axis],
num_lobes=num_lobes,
mode=mode,
cval=cval,
)
vol = np.moveaxis(vol, 0, axis)
return vol.astype(data.dtype)
[docs]
@warning_for_keywords()
def reslice(
data,
affine,
zooms,
new_zooms,
*,
order=1,
mode="constant",
cval=0,
num_processes=1,
new_shape=None,
):
"""Reslice data with new voxel resolution defined by ``new_zooms``.
Parameters
----------
data : array, shape (I,J,K) or (I,J,K,N)
3d volume or 4d volume with datasets
affine : array, shape (4,4)
mapping from voxel coordinates to world coordinates
zooms : tuple, shape (3,)
voxel size for (i,j,k) dimensions
new_zooms : tuple, shape (3,)
new voxel size for (i,j,k) after resampling
order : int or str
Interpolation order. Integer 0–5 selects spline order via scipy
(0 nearest, 1 trilinear, …). String values ``'lanczos'`` or
``'lanczos2'`` select a 2-lobe Lanczos kernel; ``'lanczos3'``
selects a 3-lobe Lanczos kernel.
mode : string ('constant', 'nearest', 'reflect' or 'wrap')
Points outside the boundaries of the input are filled according
to the given mode.
cval : float
Value used for points outside the boundaries of the input if
mode='constant'.
num_processes : int, optional
Split the calculation to a pool of children processes. This only
applies to 4D `data` arrays. Default is 1. If < 0 the maximal number
of cores minus ``num_processes + 1`` is used (enter -1 to use as many
cores as possible). 0 raises an error. Ignored when order is
``'lanczos'``, ``'lanczos2'``, or ``'lanczos3'``.
new_shape : tuple, shape (3,), optional
Sets the shape the image should take after affine transformation.
If None, it is calculated through the affine matrix and current shape.
Returns
-------
data2 : array, shape (I,J,K) or (I,J,K,N)
datasets resampled into isotropic voxel size
affine2 : array, shape (4,4)
new affine for the resampled image
Examples
--------
>>> from dipy.io.image import load_nifti
>>> from dipy.align.reslice import reslice
>>> from dipy.data import get_fnames
>>> f_name = get_fnames(name="aniso_vox")
>>> data, affine, zooms = load_nifti(f_name, return_voxsize=True)
>>> data.shape == (58, 58, 24)
True
>>> zooms
(4.0, 4.0, 5.0)
>>> new_zooms = (3.,3.,3.)
>>> new_zooms
(3.0, 3.0, 3.0)
>>> data2, affine2 = reslice(data, affine, zooms, new_zooms)
>>> data2.shape == (77, 77, 40)
True
"""
num_processes = determine_num_processes(num_processes)
lanczos_lobes = None
if isinstance(order, str):
if order in ("lanczos", "lanczos2"):
lanczos_lobes = 2
elif order == "lanczos3":
lanczos_lobes = 3
else:
raise ValueError(
f"Unknown interpolation order {order!r}. "
"Valid string values: 'lanczos', 'lanczos2', 'lanczos3'."
)
elif not (0 <= order <= 5):
raise ValueError(f"order must be 0-5, got {order!r}.")
_valid_modes_lanczos = ("constant", "nearest", "reflect", "wrap")
_valid_modes_scipy = ("constant", "nearest", "reflect", "mirror", "wrap")
_valid_modes = (
_valid_modes_lanczos if lanczos_lobes is not None else _valid_modes_scipy
)
if mode not in _valid_modes:
raise ValueError(f"Invalid mode {mode!r}. Valid modes: {_valid_modes}")
# We are suppressing warnings emitted by scipy >= 0.18,
# described in https://github.com/dipy/dipy/issues/1107.
# These warnings are not relevant to us, as long as our offset
# input to scipy's affine_transform is [0, 0, 0]
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*scipy.*18.*", category=UserWarning)
new_zooms = np.array(new_zooms, dtype="f8")
zooms = np.array(zooms, dtype="f8")
scale = new_zooms / zooms
if new_shape is None:
new_shape = zooms / new_zooms * np.array(data.shape[:3])
new_shape = tuple(np.round(new_shape).astype("i8"))
if data.ndim not in (3, 4):
raise ValueError(
f"dimension of data should be 3 or 4 but you provided {data.ndim}"
)
if lanczos_lobes is not None:
if data.ndim == 3:
data2 = _lanczos_resample_3d(
data,
scale,
new_shape,
num_lobes=lanczos_lobes,
mode=mode,
cval=cval,
)
else:
data2 = np.zeros(new_shape + (data.shape[-1],), data.dtype)
for vol_idx in range(data.shape[-1]):
data2[..., vol_idx] = _lanczos_resample_3d(
data[..., vol_idx],
scale,
new_shape,
num_lobes=lanczos_lobes,
mode=mode,
cval=cval,
)
else:
kwargs = {
"matrix": scale,
"output_shape": new_shape,
"order": order,
"mode": mode,
"cval": cval,
}
if data.ndim == 3:
data2 = affine_transform(input=data, **kwargs)
else:
data2 = np.zeros(new_shape + (data.shape[-1],), data.dtype)
if num_processes == 1:
for i in range(data.shape[-1]):
affine_transform(
input=data[..., i], output=data2[..., i], **kwargs
)
else:
params = []
for i in range(data.shape[-1]):
_kwargs = {"input": data[..., i]}
_kwargs.update(kwargs)
params.append(_kwargs)
mp.set_start_method("spawn", force=True)
pool = mp.Pool(num_processes)
for i, res in enumerate(pool.imap(_affine_transform, params)):
data2[..., i] = res
pool.close()
affine_scale = np.eye(4)
affine_scale[:3, :3] = np.diag(scale)
affine2 = np.dot(affine, affine_scale)
return (data2, affine2)