import json
import os
from pathlib import Path
import sys
from time import time
import warnings
import numpy as np
from scipy.ndimage import binary_dilation
from dipy.core.gradients import gradient_table
from dipy.io import read_bvals_bvecs
from dipy.io.image import load_nifti, save_nifti
from dipy.io.peaks import load_pam
from dipy.io.streamline import load_tractogram
from dipy.reconst.dti import TensorModel
from dipy.segment.bundles import bundle_shape_similarity
from dipy.segment.mask import bounding_box, segment_from_cfa
from dipy.stats.analysis import (
anatomical_measures,
assignment_map,
buan_profile,
peak_values,
)
from dipy.testing.decorators import warning_for_keywords
from dipy.tracking.streamline import transform_streamlines
from dipy.utils.logging import logger
from dipy.utils.optpkg import optional_package
from dipy.workflows.workflow import Workflow
pd, have_pd, _ = optional_package("pandas")
smf, have_smf, _ = optional_package("statsmodels.formula.api")
matplt, have_matplotlib, _ = optional_package("matplotlib")
plt, have_plt, _ = optional_package("matplotlib.pyplot")
[docs]
class SNRinCCFlow(Workflow):
[docs]
@classmethod
def get_short_name(cls):
return "snrincc"
[docs]
def run(
self,
data_files,
bvals_files,
bvecs_files,
mask_file,
bbox_threshold=(0.6, 1, 0, 0.1, 0, 0.1),
out_dir="",
out_file="product.json",
out_mask_cc="cc.nii.gz",
out_mask_noise="mask_noise.nii.gz",
):
"""Compute the signal-to-noise ratio in the corpus callosum.
Parameters
----------
data_files : string or Path
Path to the dwi.nii.gz file. This path may contain wildcards to
process multiple inputs at once.
bvals_files : string or Path
Path of bvals.
bvecs_files : string or Path
Path of bvecs.
mask_file : string or Path
Path of a brain mask file.
bbox_threshold : variable float, optional
Threshold for bounding box, values separated with commas for ex.
[0.6,1,0,0.1,0,0.1].
out_dir : string or Path, optional
Where the resulting file will be saved.
out_file : string, optional
Name of the result file to be saved.
out_mask_cc : string, optional
Name of the CC mask volume to be saved.
out_mask_noise : string, optional
Name of the mask noise volume to be saved.
"""
io_it = self.get_io_iterator()
for (
dwi_path,
bvals_path,
bvecs_path,
mask_path,
out_path,
cc_mask_path,
mask_noise_path,
) in io_it:
data, affine = load_nifti(dwi_path)
bvals, bvecs = read_bvals_bvecs(bvals_path, bvecs_path)
gtab = gradient_table(bvals=bvals, bvecs=bvecs)
mask, affine = load_nifti(mask_path)
logger.info("Computing tensors...")
tenmodel = TensorModel(gtab)
tensorfit = tenmodel.fit(data, mask=mask)
logger.info("Computing worst-case/best-case SNR using the CC...")
if np.ndim(data) == 4:
CC_box = np.zeros_like(data[..., 0])
elif np.ndim(data) == 3:
CC_box = np.zeros_like(data)
else:
raise OSError("DWI data has invalid dimensions")
mins, maxs = bounding_box(mask)
mins = np.array(mins)
maxs = np.array(maxs)
diff = (maxs - mins) // 4
bounds_min = mins + diff
bounds_max = maxs - diff
CC_box[
bounds_min[0] : bounds_max[0],
bounds_min[1] : bounds_max[1],
bounds_min[2] : bounds_max[2],
] = 1
if len(bbox_threshold) != 6:
raise OSError("bbox_threshold should have 6 float values")
mask_cc_part, cfa = segment_from_cfa(
tensorfit, CC_box, bbox_threshold, return_cfa=True
)
if not np.count_nonzero(mask_cc_part.astype(np.uint8)):
logger.warning(
"Empty mask: corpus callosum not found."
" Update your data or your threshold"
)
save_nifti(cc_mask_path, mask_cc_part.astype(np.uint8), affine)
logger.info(f"CC mask saved as {cc_mask_path}")
masked_data = data[mask_cc_part]
mean_signal = 0
if masked_data.size:
mean_signal = np.mean(masked_data, axis=0)
mask_noise = binary_dilation(mask, iterations=10)
mask_noise[..., : mask_noise.shape[-1] // 2] = 1
mask_noise = ~mask_noise
save_nifti(mask_noise_path, mask_noise.astype(np.uint8), affine)
logger.info(f"Mask noise saved as {mask_noise_path}")
noise_std = 0
if np.count_nonzero(mask_noise.astype(np.uint8)):
noise_std = np.std(data[mask_noise, :])
logger.info(f"Noise standard deviation sigma= {noise_std}")
idx = np.sum(gtab.bvecs, axis=-1) == 0
gtab.bvecs[idx] = np.inf
axis_X = np.argmin(np.sum((gtab.bvecs - np.array([1, 0, 0])) ** 2, axis=-1))
axis_Y = np.argmin(np.sum((gtab.bvecs - np.array([0, 1, 0])) ** 2, axis=-1))
axis_Z = np.argmin(np.sum((gtab.bvecs - np.array([0, 0, 1])) ** 2, axis=-1))
SNR_output = []
SNR_directions = []
for direction in ["b0", axis_X, axis_Y, axis_Z]:
if direction == "b0":
SNR = mean_signal[0] / noise_std if noise_std else 0
logger.info(f"SNR for the b=0 image is : {SNR}")
else:
logger.info(
f"SNR for direction {direction} {gtab.bvecs[direction]} is: "
f"{SNR}"
)
SNR_directions.append(direction)
SNR = mean_signal[direction] / noise_std if noise_std else 0
SNR_output.append(SNR)
snr_str = f"{SNR_output[0]} {SNR_output[1]} {SNR_output[2]} {SNR_output[3]}"
dir_str = f"b0 {SNR_directions[0]} {SNR_directions[1]} {SNR_directions[2]}"
data = [{"data": snr_str, "directions": dir_str}]
with open(Path(out_dir) / out_path, "w") as myfile:
json.dump(data, myfile)
[docs]
@warning_for_keywords()
def buan_bundle_profiles(
model_bundle_folder,
bundle_folder,
orig_bundle_folder,
metric_folder,
group_id,
subject,
*,
no_disks=100,
out_dir="",
):
"""
Applies statistical analysis on bundles and saves the results
in a directory specified by ``out_dir``.
See :footcite:p:`Chandio2020a` for further details about the method.
Parameters
----------
model_bundle_folder : string or Path
Path to the input model bundle files. This path may contain
wildcards to process multiple inputs at once.
bundle_folder : string or Path
Path to the input bundle files in common space. This path may
contain wildcards to process multiple inputs at once.
orig_bundle_folder : string or Path
Path to the input bundle files in native space. This path may
contain wildcards to process multiple inputs at once.
metric_folder : string or Path
Path to the input dti metric or/and peak files. It will be used as
metric for statistical analysis of bundles.
group_id : integer
what group subject belongs to either 0 for control or 1 for patient.
subject : string
subject id e.g. 10001.
no_disks : integer, optional
Number of disks used for dividing bundle into disks.
out_dir : string or Path, optional
Output directory.
Notes
-----
This function uses :func:`~dipy.stats.analysis.anatomical_measures` to
store per-point metric values in HDF5 files, which are suitable for
downstream linear mixed model (LMM) analysis across a group of subjects.
This is intentionally different from :func:`~dipy.stats.analysis.buan_profile`,
which computes inverse-distance-weighted along-tract profiles for a single
subject and returns a ``numpy`` array. Use ``buan_bundle_profiles`` for
group studies and ``buan_profile`` for single-subject analysis.
References
----------
.. footbibliography::
"""
t = time()
_mb_folder = Path(model_bundle_folder)
mb = sorted(list(_mb_folder.glob("*.trk")) + list(_mb_folder.glob("*.trx")))
logger.info(mb)
_bd_folder = Path(bundle_folder)
bd = sorted(list(_bd_folder.glob("*.trk")) + list(_bd_folder.glob("*.trx")))
logger.info(bd)
_org_folder = Path(orig_bundle_folder)
org_bd = sorted(list(_org_folder.glob("*.trk")) + list(_org_folder.glob("*.trx")))
logger.info(org_bd)
n = len(mb)
for io in range(n):
mbundles = load_tractogram(
mb[io], reference="same", bbox_valid_check=False
).streamlines
bundles = load_tractogram(
bd[io], reference="same", bbox_valid_check=False
).streamlines
orig_bundles = load_tractogram(
org_bd[io], reference="same", bbox_valid_check=False
).streamlines
if len(orig_bundles) > 5:
_, indx = assignment_map(bundles, mbundles, no_disks)
ind = np.array(indx)
metric_files_names_dti = list(Path(metric_folder).glob("*.nii.gz"))
metric_files_names_csa = list(Path(metric_folder).glob("*.pam5"))
_, affine = load_nifti(metric_files_names_dti[0])
affine_r = np.linalg.inv(affine)
transformed_orig_bundles = transform_streamlines(orig_bundles, affine_r)
for mn in range(len(metric_files_names_dti)):
metric_name = Path(metric_files_names_dti[mn]).name
fm = metric_name[:-7]
bm = Path(mb[io]).name[:-4]
logger.info(f"bm = {bm}")
dt = {}
logger.info(f"metric = {metric_files_names_dti[mn]}")
metric, _ = load_nifti(metric_files_names_dti[mn])
anatomical_measures(
transformed_orig_bundles,
metric,
dt,
fm,
bm,
subject,
group_id,
ind,
out_dir,
)
for mn in range(len(metric_files_names_csa)):
metric_name = Path(metric_files_names_csa[mn]).name
fm = metric_name[:-5]
bm = Path(mb[io]).name[:-4]
logger.info(f"bm = {bm}")
logger.info(f"metric = {metric_files_names_csa[mn]}")
dt = {}
metric = load_pam(metric_files_names_csa[mn])
peak_values(
transformed_orig_bundles,
metric,
dt,
fm,
bm,
subject,
group_id,
ind,
out_dir,
)
logger.info(f"total time taken in minutes = {(-t + time()) / 60}")
[docs]
class BundleAnalysisTractometryFlow(Workflow):
[docs]
@classmethod
def get_short_name(cls):
return "ba"
[docs]
@warning_for_keywords()
def run(
self,
model_bundle_folder,
subject_folder,
*,
bundle_folder=None,
orig_bundle_folder=None,
metric_folder=None,
no_disks=100,
out_dir="",
):
"""Workflow of bundle analytics.
Applies statistical analysis on bundles of subjects and saves the
results in a directory specified by ``out_dir``.
Supports two modes auto-detected from the directory structure:
- **Group mode**: ``subject_folder`` contains ``patient/`` and
``control/`` subdirectories, each with per-subject subdirectories
containing ``rec_bundles/``, ``org_bundles/``, and
``anatomical_measures/``. Outputs HDF5 files suitable for linear
mixed model analysis.
- **Single-subject mode**: ``subject_folder`` directly contains
``rec_bundles/``, ``org_bundles/``, and ``anatomical_measures/``
(or the paths are overridden via ``bundle_folder``,
``orig_bundle_folder``, and ``metric_folder``). Outputs ``.npy``
profile arrays (one per bundle/metric pair).
See :footcite:p:`Chandio2020a` for further details about the method.
Parameters
----------
model_bundle_folder : string or Path
Path to the input model bundle files. This path may
contain wildcards to process multiple inputs at once.
subject_folder : string or Path
Path to the subject folder. Either a group-level directory
(containing ``patient/`` and ``control/`` subdirs) or a
single-subject directory (directly containing ``rec_bundles/``,
``org_bundles/``, and ``anatomical_measures/``).
bundle_folder : string or Path, optional
Override path for the registered bundles in common space
(replaces ``<subject_folder>/rec_bundles``). Only used in
single-subject mode.
orig_bundle_folder : string or Path, optional
Override path for the bundles in native space (replaces
``<subject_folder>/org_bundles``). Only used in single-subject
mode.
metric_folder : string or Path, optional
Override path for the metric files (replaces
``<subject_folder>/anatomical_measures``). Only used in
single-subject mode.
no_disks : integer, optional
Number of disks used for dividing bundle into disks.
out_dir : string or Path, optional
Output directory.
References
----------
.. footbibliography::
"""
subject_path = Path(subject_folder)
if not subject_path.is_dir():
raise ValueError("Invalid path to subjects")
has_overrides = any(
p is not None for p in (bundle_folder, orig_bundle_folder, metric_folder)
)
if has_overrides or (subject_path / "rec_bundles").is_dir():
self._run_single_subject(
model_bundle_folder,
subject_path,
no_disks,
out_dir,
bundle_folder=bundle_folder,
orig_bundle_folder=orig_bundle_folder,
metric_folder=metric_folder,
)
else:
self._run_group(model_bundle_folder, subject_path, no_disks, out_dir)
def _run_group(self, model_bundle_folder, subject_path, no_disks, out_dir):
"""Run group-mode tractometry (patient/control subdirectory structure).
Parameters
----------
model_bundle_folder : string or Path
Path to the input model bundle files.
subject_path : Path
Path object pointing to the group-level subject folder.
no_disks : integer
Number of disks used for dividing bundle into disks.
out_dir : string or Path
Output directory.
"""
groups = [p.name for p in subject_path.iterdir()]
groups.sort()
for group in groups:
group_dirname = subject_path / group
if group_dirname.is_dir():
logger.info(f"group = {group}")
all_subjects = os.listdir(group_dirname)
all_subjects.sort()
logger.info(all_subjects)
if group.lower() == "patient":
group_id = 1 # 1 means patient
elif group.lower() == "control":
group_id = 0 # 0 means control
else:
logger.info(group)
raise ValueError("Invalid group. Neither patient nor control")
for sub in all_subjects:
logger.info(sub)
pre = group_dirname / sub
logger.info(pre)
b = Path(pre) / "rec_bundles"
c = Path(pre) / "org_bundles"
d = Path(pre) / "anatomical_measures"
buan_bundle_profiles(
model_bundle_folder,
b,
c,
d,
group_id,
sub,
no_disks=no_disks,
out_dir=out_dir,
)
def _run_single_subject(
self,
model_bundle_folder,
subject_path,
no_disks,
out_dir,
*,
bundle_folder=None,
orig_bundle_folder=None,
metric_folder=None,
):
"""Run single-subject tractometry saving ``.npy`` profile arrays.
Parameters
----------
model_bundle_folder : string or Path
Path to the input model bundle files.
subject_path : Path
Path object pointing to the single-subject directory.
no_disks : integer
Number of disks used for dividing bundle into disks.
out_dir : string or Path
Output directory.
bundle_folder : string or Path, optional
Path to the registered bundles in common space. Defaults to
``<subject_path>/rec_bundles``.
orig_bundle_folder : string or Path, optional
Path to the bundles in native space. Defaults to
``<subject_path>/org_bundles``.
metric_folder : string or Path, optional
Path to the metric files. Defaults to
``<subject_path>/anatomical_measures``.
"""
bundle_folder = (
Path(bundle_folder)
if bundle_folder is not None
else subject_path / "rec_bundles"
)
orig_bundle_folder = (
Path(orig_bundle_folder)
if orig_bundle_folder is not None
else subject_path / "org_bundles"
)
metric_folder = (
Path(metric_folder)
if metric_folder is not None
else subject_path / "anatomical_measures"
)
_mb_folder = Path(model_bundle_folder)
mb_list = sorted(
list(_mb_folder.glob("*.trk")) + list(_mb_folder.glob("*.trx"))
)
if not mb_list:
logger.info("No model bundle files found in the specified folder")
sys.exit(1)
bd_list = sorted(
list(bundle_folder.glob("*.trk")) + list(bundle_folder.glob("*.trx"))
)
if not bd_list:
logger.info("No registered bundle files found in the specified folder")
sys.exit(1)
org_list = sorted(
list(orig_bundle_folder.glob("*.trk"))
+ list(orig_bundle_folder.glob("*.trx"))
)
if not org_list:
logger.info("No original bundle files found in the specified folder")
sys.exit(1)
metric_files = sorted(metric_folder.glob("*.nii.gz"))
Path(out_dir).mkdir(parents=True, exist_ok=True)
for mb_file, bd_file, org_file in zip(mb_list, bd_list, org_list):
mbundles = load_tractogram(
mb_file, reference="same", bbox_valid_check=False
).streamlines
bundles = load_tractogram(
bd_file, reference="same", bbox_valid_check=False
).streamlines
orig_bundles = load_tractogram(
org_file, reference="same", bbox_valid_check=False
).streamlines
if len(orig_bundles) <= 5:
continue
bname = Path(mb_file).stem
for metric_file in metric_files:
metric, affine = load_nifti(metric_file)
fname = Path(metric_file).stem.replace(".nii", "")
logger.info(f"Applying metric {metric_file} on bundle {bname}")
if metric.ndim > 3:
logger.info(
f"Skipping metric {metric_file} with 4D shape {metric.shape}"
)
continue
profile = buan_profile(
mbundles,
bundles,
orig_bundles,
metric,
affine,
no_disks=no_disks,
)
np.save(Path(out_dir) / f"{bname}_{fname}_profile.npy", profile)
[docs]
class LinearMixedModelsFlow(Workflow):
[docs]
@classmethod
def get_short_name(cls):
return "lmm"
[docs]
def get_metric_name(self, path):
"""Splits the path string and returns name of anatomical measure
(eg: fa), bundle name eg(AF_L) and bundle name with metric name
(eg: AF_L_fa)
Parameters
----------
path : string or Path
Path to the input metric files. This path may
contain wildcards to process multiple inputs at once.
"""
name = Path(path).name
count = 0
i = len(name) - 1
while i > 0:
if name[i] == ".":
count = i
break
i = i - 1
for j in range(len(name)):
if name[j] == "_":
if name[j + 1] != "L" and name[j + 1] != "R" and name[j + 1] != "F":
return name[j + 1 : count], name[:j], name[:count]
return " ", " ", " "
[docs]
def save_lmm_plot(self, plot_file, title, bundle_name, x, y):
"""Saves LMM plot with segment/disk number on x-axis and
-log10(pvalues) on y-axis in out_dir folder.
Parameters
----------
plot_file : string
Path to the plot file. This path may
contain wildcards to process multiple inputs at once.
title : string
Title for the plot.
bundle_name : string
Bundle name.
x : list
list containing segment/disk number for x-axis.
y : list
list containing -log10(pvalues) per segment/disk number for y-axis.
"""
n = len(x)
dotted = np.ones(n)
dotted[:] = 2
c1 = np.random.rand(1, 3)
y_pos = np.arange(n)
(l1,) = plt.plot(
y_pos,
dotted,
color="red",
marker=".",
linestyle="solid",
linewidth=0.6,
markersize=0.7,
label="p-value < 0.01",
)
(l2,) = plt.plot(
y_pos,
dotted + 1,
color="black",
marker=".",
linestyle="solid",
linewidth=0.4,
markersize=0.4,
label="p-value < 0.001",
)
first_legend = plt.legend(handles=[l1, l2], loc="upper right")
axes = plt.gca()
axes.add_artist(first_legend)
axes.set_ylim([0, 6])
l3 = plt.bar(y_pos, y, color=c1, alpha=0.5, label=bundle_name)
plt.legend(handles=[l3], loc="upper left")
plt.title(title.upper())
plt.xlabel("Segment Number")
plt.ylabel("-log10(Pvalues)")
plt.savefig(plot_file)
plt.clf()
[docs]
@warning_for_keywords()
def run(self, h5_files, *, no_disks=100, out_dir=""):
"""Workflow of linear Mixed Models.
Applies linear Mixed Models on bundles of subjects and saves the
results in a directory specified by ``out_dir``.
Parameters
----------
h5_files : string or Path
Path to the input metric files. This path may
contain wildcards to process multiple inputs at once.
no_disks : integer, optional
Number of disks used for dividing bundle into disks.
out_dir : string or Path, optional
Output directory.
"""
io_it = self.get_io_iterator()
for file_path in io_it:
logger.info(f"Applying metric {file_path}")
file_name, bundle_name, save_name = self.get_metric_name(file_path)
logger.info(f" file name = {file_name}")
logger.info(f"file path = {file_path}")
pvalues = np.zeros(no_disks)
warnings.filterwarnings("ignore")
# run mixed linear model for every disk
for i in range(no_disks):
disk_count = i + 1
df = pd.read_hdf(file_path, where="disk=disk_count")
logger.info(f"read the dataframe for disk number {disk_count}")
# check if data has significant data to perform LMM
if len(df) < 10:
raise ValueError("Dataset for Linear Mixed Model is too small")
criteria = file_name + " ~ group"
md = smf.mixedlm(criteria, df, groups=df["subject"])
mdf = md.fit()
pvalues[i] = mdf.pvalues["group"]
x = list(range(1, len(pvalues) + 1))
y = -1 * np.log10(pvalues)
save_file = Path(out_dir) / (save_name + "_pvalues.npy")
np.save(save_file, pvalues)
save_file = Path(out_dir) / (save_name + "_pvalues_log.npy")
np.save(save_file, y)
save_file = Path(out_dir) / (save_name + ".png")
self.save_lmm_plot(save_file, file_name, bundle_name, x, y)
[docs]
class BundleShapeAnalysis(Workflow):
[docs]
@classmethod
def get_short_name(cls):
return "BS"
[docs]
@warning_for_keywords()
def run(self, subject_folder, *, clust_thr=(5, 3, 1.5), threshold=6, out_dir=""):
"""Workflow of bundle analytics.
Applies bundle shape similarity analysis on bundles of subjects and
saves the results in a directory specified by ``out_dir``.
See :footcite:p:`Chandio2020a` for further details about the method.
Parameters
----------
subject_folder : string or Path
Path to the input subject folder. This path may contain
wildcards to process multiple inputs at once.
clust_thr : variable float, optional
list of bundle clustering thresholds used in QuickBundlesX.
threshold : float, optional
Bundle shape similarity threshold.
out_dir : string or Path, optional
Output directory.
References
----------
.. footbibliography::
"""
rng = np.random.default_rng()
all_subjects = []
if Path(subject_folder).is_dir():
groups = sorted([p.name for p in Path(subject_folder).iterdir()])
else:
raise ValueError("Not a directory")
for group in groups:
group_dirname = Path(subject_folder) / group
if group_dirname.is_dir():
subjects = sorted([p.name for p in Path(group_dirname).iterdir()])
logger.info(
"first "
+ str(len(subjects))
+ " subjects in matrix belong to "
+ group
+ " group"
)
for sub in subjects:
dpath = group_dirname / sub
if dpath.is_dir():
all_subjects.append(dpath)
N = len(all_subjects)
bundles = [p.name for p in (Path(all_subjects[0]) / "rec_bundles").iterdir()]
for bun in bundles:
# bundle shape similarity matrix
ba_matrix = np.zeros((N, N))
i = 0
logger.info(bun)
for sub in all_subjects:
j = 0
bundle1 = load_tractogram(
Path(sub) / "rec_bundles" / bun,
reference="same",
bbox_valid_check=False,
).streamlines
for subi in all_subjects:
logger.info(subi)
bundle2 = load_tractogram(
Path(subi) / "rec_bundles" / bun,
reference="same",
bbox_valid_check=False,
).streamlines
ba_value = bundle_shape_similarity(
bundle1, bundle2, rng, clust_thr=clust_thr, threshold=threshold
)
ba_matrix[i][j] = ba_value
j += 1
i += 1
logger.info("saving BA score matrix")
np.save(Path(out_dir) / (bun[:-4] + ".npy"), ba_matrix)
cmap = matplt.colormaps["Blues"]
plt.title(bun[:-4])
plt.imshow(ba_matrix, cmap=cmap)
plt.colorbar()
plt.clim(0, 1)
plt.savefig(Path(out_dir) / f"SM_{bun[:-4]}")
plt.clf()