Source code for dipy.workflows.stats

from glob import glob
import json
import logging
import os
from time import time
import warnings

import numpy as np
from scipy.ndimage import binary_dilation

from dipy.core.gradients import gradient_table
from dipy.io import read_bvals_bvecs
from dipy.io.image import load_nifti, save_nifti
from dipy.io.peaks import load_peaks
from dipy.io.streamline import load_tractogram
from dipy.reconst.dti import TensorModel
from dipy.segment.bundles import bundle_shape_similarity
from dipy.segment.mask import bounding_box, segment_from_cfa
from dipy.stats.analysis import anatomical_measures, assignment_map, peak_values
from dipy.testing.decorators import warning_for_keywords
from dipy.tracking.streamline import transform_streamlines
from dipy.utils.optpkg import optional_package
from dipy.workflows.workflow import Workflow

pd, have_pd, _ = optional_package("pandas")
smf, have_smf, _ = optional_package("statsmodels.formula.api")
matplt, have_matplotlib, _ = optional_package("matplotlib")
plt, have_plt, _ = optional_package("matplotlib.pyplot")


[docs] class SNRinCCFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "snrincc"
[docs] def run( self, data_files, bvals_files, bvecs_files, mask_file, bbox_threshold=(0.6, 1, 0, 0.1, 0, 0.1), out_dir="", out_file="product.json", out_mask_cc="cc.nii.gz", out_mask_noise="mask_noise.nii.gz", ): """Compute the signal-to-noise ratio in the corpus callosum. Parameters ---------- data_files : string Path to the dwi.nii.gz file. This path may contain wildcards to process multiple inputs at once. bvals_files : string Path of bvals. bvecs_files : string Path of bvecs. mask_file : string Path of a brain mask file. bbox_threshold : variable float, optional Threshold for bounding box, values separated with commas for ex. [0.6,1,0,0.1,0,0.1]. out_dir : string, optional Where the resulting file will be saved. (default current directory) out_file : string, optional Name of the result file to be saved. out_mask_cc : string, optional Name of the CC mask volume to be saved. out_mask_noise : string, optional Name of the mask noise volume to be saved. """ io_it = self.get_io_iterator() for ( dwi_path, bvals_path, bvecs_path, mask_path, out_path, cc_mask_path, mask_noise_path, ) in io_it: data, affine = load_nifti(dwi_path) bvals, bvecs = read_bvals_bvecs(bvals_path, bvecs_path) gtab = gradient_table(bvals=bvals, bvecs=bvecs) mask, affine = load_nifti(mask_path) logging.info("Computing tensors...") tenmodel = TensorModel(gtab) tensorfit = tenmodel.fit(data, mask=mask) logging.info("Computing worst-case/best-case SNR using the CC...") if np.ndim(data) == 4: CC_box = np.zeros_like(data[..., 0]) elif np.ndim(data) == 3: CC_box = np.zeros_like(data) else: raise OSError("DWI data has invalid dimensions") mins, maxs = bounding_box(mask) mins = np.array(mins) maxs = np.array(maxs) diff = (maxs - mins) // 4 bounds_min = mins + diff bounds_max = maxs - diff CC_box[ bounds_min[0] : bounds_max[0], bounds_min[1] : bounds_max[1], bounds_min[2] : bounds_max[2], ] = 1 if len(bbox_threshold) != 6: raise OSError("bbox_threshold should have 6 float values") mask_cc_part, cfa = segment_from_cfa( tensorfit, CC_box, bbox_threshold, return_cfa=True ) if not np.count_nonzero(mask_cc_part.astype(np.uint8)): logging.warning( "Empty mask: corpus callosum not found." " Update your data or your threshold" ) save_nifti(cc_mask_path, mask_cc_part.astype(np.uint8), affine) logging.info(f"CC mask saved as {cc_mask_path}") masked_data = data[mask_cc_part] mean_signal = 0 if masked_data.size: mean_signal = np.mean(masked_data, axis=0) mask_noise = binary_dilation(mask, iterations=10) mask_noise[..., : mask_noise.shape[-1] // 2] = 1 mask_noise = ~mask_noise save_nifti(mask_noise_path, mask_noise.astype(np.uint8), affine) logging.info(f"Mask noise saved as {mask_noise_path}") noise_std = 0 if np.count_nonzero(mask_noise.astype(np.uint8)): noise_std = np.std(data[mask_noise, :]) logging.info(f"Noise standard deviation sigma= {noise_std}") idx = np.sum(gtab.bvecs, axis=-1) == 0 gtab.bvecs[idx] = np.inf axis_X = np.argmin(np.sum((gtab.bvecs - np.array([1, 0, 0])) ** 2, axis=-1)) axis_Y = np.argmin(np.sum((gtab.bvecs - np.array([0, 1, 0])) ** 2, axis=-1)) axis_Z = np.argmin(np.sum((gtab.bvecs - np.array([0, 0, 1])) ** 2, axis=-1)) SNR_output = [] SNR_directions = [] for direction in ["b0", axis_X, axis_Y, axis_Z]: if direction == "b0": SNR = mean_signal[0] / noise_std if noise_std else 0 logging.info(f"SNR for the b=0 image is : {SNR}") else: logging.info( f"SNR for direction {direction} {gtab.bvecs[direction]} is: " f"{SNR}" ) SNR_directions.append(direction) SNR = mean_signal[direction] / noise_std if noise_std else 0 SNR_output.append(SNR) snr_str = f"{SNR_output[0]} {SNR_output[1]} {SNR_output[2]} {SNR_output[3]}" dir_str = f"b0 {SNR_directions[0]} {SNR_directions[1]} {SNR_directions[2]}" data = [{"data": snr_str, "directions": dir_str}] with open(os.path.join(out_dir, out_path), "w") as myfile: json.dump(data, myfile)
[docs] @warning_for_keywords() def buan_bundle_profiles( model_bundle_folder, bundle_folder, orig_bundle_folder, metric_folder, group_id, subject, *, no_disks=100, out_dir="", ): """ Applies statistical analysis on bundles and saves the results in a directory specified by ``out_dir``. See :footcite:p:`Chandio2020a` for further details about the method. Parameters ---------- model_bundle_folder : string Path to the input model bundle files. This path may contain wildcards to process multiple inputs at once. bundle_folder : string Path to the input bundle files in common space. This path may contain wildcards to process multiple inputs at once. orig_bundle_folder : string Path to the input bundle files in native space. This path may contain wildcards to process multiple inputs at once. metric_folder : string Path to the input dti metric or/and peak files. It will be used as metric for statistical analysis of bundles. group_id : integer what group subject belongs to either 0 for control or 1 for patient. subject : string subject id e.g. 10001. no_disks : integer, optional Number of disks used for dividing bundle into disks. out_dir : string, optional Output directory. (default current directory) References ---------- .. footbibliography:: """ t = time() mb = glob(os.path.join(model_bundle_folder, "*.trk")) print(mb) mb.sort() bd = glob(os.path.join(bundle_folder, "*.trk")) bd.sort() print(bd) org_bd = glob(os.path.join(orig_bundle_folder, "*.trk")) org_bd.sort() print(org_bd) n = len(mb) for io in range(n): mbundles = load_tractogram( mb[io], reference="same", bbox_valid_check=False ).streamlines bundles = load_tractogram( bd[io], reference="same", bbox_valid_check=False ).streamlines orig_bundles = load_tractogram( org_bd[io], reference="same", bbox_valid_check=False ).streamlines if len(orig_bundles) > 5: indx = assignment_map(bundles, mbundles, no_disks) ind = np.array(indx) metric_files_names_dti = glob(os.path.join(metric_folder, "*.nii.gz")) metric_files_names_csa = glob(os.path.join(metric_folder, "*.pam5")) _, affine = load_nifti(metric_files_names_dti[0]) affine_r = np.linalg.inv(affine) transformed_orig_bundles = transform_streamlines(orig_bundles, affine_r) for mn in range(len(metric_files_names_dti)): ab = os.path.split(metric_files_names_dti[mn]) metric_name = ab[1] fm = metric_name[:-7] bm = os.path.split(mb[io])[1][:-4] logging.info(f"bm = {bm}") dt = {} logging.info(f"metric = {metric_files_names_dti[mn]}") metric, _ = load_nifti(metric_files_names_dti[mn]) anatomical_measures( transformed_orig_bundles, metric, dt, fm, bm, subject, group_id, ind, out_dir, ) for mn in range(len(metric_files_names_csa)): ab = os.path.split(metric_files_names_csa[mn]) metric_name = ab[1] fm = metric_name[:-5] bm = os.path.split(mb[io])[1][:-4] logging.info(f"bm = {bm}") logging.info(f"metric = {metric_files_names_csa[mn]}") dt = {} metric = load_peaks(metric_files_names_csa[mn]) peak_values( transformed_orig_bundles, metric, dt, fm, bm, subject, group_id, ind, out_dir, ) print("total time taken in minutes = ", (-t + time()) / 60)
[docs] class BundleAnalysisTractometryFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "ba"
[docs] @warning_for_keywords() def run(self, model_bundle_folder, subject_folder, *, no_disks=100, out_dir=""): """Workflow of bundle analytics. Applies statistical analysis on bundles of subjects and saves the results in a directory specified by ``out_dir``. See :footcite:p:`Chandio2020a` for further details about the method. Parameters ---------- model_bundle_folder : string Path to the input model bundle files. This path may contain wildcards to process multiple inputs at once. subject_folder : string Path to the input subject folder. This path may contain wildcards to process multiple inputs at once. no_disks : integer, optional Number of disks used for dividing bundle into disks. out_dir : string, optional Output directory. (default current directory) References ---------- .. footbibliography:: """ if os.path.isdir(subject_folder) is False: raise ValueError("Invalid path to subjects") groups = os.listdir(subject_folder) groups.sort() for group in groups: if os.path.isdir(os.path.join(subject_folder, group)): logging.info(f"group = {group}") all_subjects = os.listdir(os.path.join(subject_folder, group)) all_subjects.sort() logging.info(all_subjects) if group.lower() == "patient": group_id = 1 # 1 means patient elif group.lower() == "control": group_id = 0 # 0 means control else: print(group) raise ValueError("Invalid group. Neither patient nor control") for sub in all_subjects: logging.info(sub) pre = os.path.join(subject_folder, group, sub) logging.info(pre) b = os.path.join(pre, "rec_bundles") c = os.path.join(pre, "org_bundles") d = os.path.join(pre, "anatomical_measures") buan_bundle_profiles( model_bundle_folder, b, c, d, group_id, sub, no_disks=no_disks, out_dir=out_dir, )
[docs] class LinearMixedModelsFlow(Workflow):
[docs] @classmethod def get_short_name(cls): return "lmm"
[docs] def get_metric_name(self, path): """Splits the path string and returns name of anatomical measure (eg: fa), bundle name eg(AF_L) and bundle name with metric name (eg: AF_L_fa) Parameters ---------- path : string Path to the input metric files. This path may contain wildcards to process multiple inputs at once. """ head_tail = os.path.split(path) name = head_tail[1] count = 0 i = len(name) - 1 while i > 0: if name[i] == ".": count = i break i = i - 1 for j in range(len(name)): if name[j] == "_": if name[j + 1] != "L" and name[j + 1] != "R" and name[j + 1] != "F": return name[j + 1 : count], name[:j], name[:count] return " ", " ", " "
[docs] def save_lmm_plot(self, plot_file, title, bundle_name, x, y): """Saves LMM plot with segment/disk number on x-axis and -log10(pvalues) on y-axis in out_dir folder. Parameters ---------- plot_file : string Path to the plot file. This path may contain wildcards to process multiple inputs at once. title : string Title for the plot. bundle_name : string Bundle name. x : list list containing segment/disk number for x-axis. y : list list containing -log10(pvalues) per segment/disk number for y-axis. """ n = len(x) dotted = np.ones(n) dotted[:] = 2 c1 = np.random.rand(1, 3) y_pos = np.arange(n) (l1,) = plt.plot( y_pos, dotted, color="red", marker=".", linestyle="solid", linewidth=0.6, markersize=0.7, label="p-value < 0.01", ) (l2,) = plt.plot( y_pos, dotted + 1, color="black", marker=".", linestyle="solid", linewidth=0.4, markersize=0.4, label="p-value < 0.001", ) first_legend = plt.legend(handles=[l1, l2], loc="upper right") axes = plt.gca() axes.add_artist(first_legend) axes.set_ylim([0, 6]) l3 = plt.bar(y_pos, y, color=c1, alpha=0.5, label=bundle_name) plt.legend(handles=[l3], loc="upper left") plt.title(title.upper()) plt.xlabel("Segment Number") plt.ylabel("-log10(Pvalues)") plt.savefig(plot_file) plt.clf()
[docs] @warning_for_keywords() def run(self, h5_files, *, no_disks=100, out_dir=""): """Workflow of linear Mixed Models. Applies linear Mixed Models on bundles of subjects and saves the results in a directory specified by ``out_dir``. Parameters ---------- h5_files : string Path to the input metric files. This path may contain wildcards to process multiple inputs at once. no_disks : integer, optional Number of disks used for dividing bundle into disks. out_dir : string, optional Output directory. (default current directory) """ io_it = self.get_io_iterator() for file_path in io_it: logging.info(f"Applying metric {file_path}") file_name, bundle_name, save_name = self.get_metric_name(file_path) logging.info(f" file name = {file_name}") logging.info(f"file path = {file_path}") pvalues = np.zeros(no_disks) warnings.filterwarnings("ignore") # run mixed linear model for every disk for i in range(no_disks): disk_count = i + 1 df = pd.read_hdf(file_path, where="disk=disk_count") logging.info(f"read the dataframe for disk number {disk_count}") # check if data has significant data to perform LMM if len(df) < 10: raise ValueError("Dataset for Linear Mixed Model is too small") criteria = file_name + " ~ group" md = smf.mixedlm(criteria, df, groups=df["subject"]) mdf = md.fit() pvalues[i] = mdf.pvalues[1] x = list(range(1, len(pvalues) + 1)) y = -1 * np.log10(pvalues) save_file = os.path.join(out_dir, save_name + "_pvalues.npy") np.save(save_file, pvalues) save_file = os.path.join(out_dir, save_name + "_pvalues_log.npy") np.save(save_file, y) save_file = os.path.join(out_dir, save_name + ".png") self.save_lmm_plot(save_file, file_name, bundle_name, x, y)
[docs] class BundleShapeAnalysis(Workflow):
[docs] @classmethod def get_short_name(cls): return "BS"
[docs] @warning_for_keywords() def run(self, subject_folder, *, clust_thr=(5, 3, 1.5), threshold=6, out_dir=""): """Workflow of bundle analytics. Applies bundle shape similarity analysis on bundles of subjects and saves the results in a directory specified by ``out_dir``. See :footcite:p:`Chandio2020a` for further details about the method. Parameters ---------- subject_folder : string Path to the input subject folder. This path may contain wildcards to process multiple inputs at once. clust_thr : variable float, optional list of bundle clustering thresholds used in QuickBundlesX. threshold : float, optional Bundle shape similarity threshold. out_dir : string, optional Output directory. (default current directory) References ---------- .. footbibliography:: """ rng = np.random.default_rng() all_subjects = [] if os.path.isdir(subject_folder): groups = os.listdir(subject_folder) groups.sort() else: raise ValueError("Not a directory") for group in groups: if os.path.isdir(os.path.join(subject_folder, group)): subjects = os.listdir(os.path.join(subject_folder, group)) subjects.sort() logging.info( "first " + str(len(subjects)) + " subjects in matrix belong to " + group + " group" ) for sub in subjects: dpath = os.path.join(subject_folder, group, sub) if os.path.isdir(dpath): all_subjects.append(dpath) N = len(all_subjects) bundles = os.listdir(os.path.join(all_subjects[0], "rec_bundles")) for bun in bundles: # bundle shape similarity matrix ba_matrix = np.zeros((N, N)) i = 0 logging.info(bun) for sub in all_subjects: j = 0 bundle1 = load_tractogram( os.path.join(sub, "rec_bundles", bun), reference="same", bbox_valid_check=False, ).streamlines for subi in all_subjects: logging.info(subi) bundle2 = load_tractogram( os.path.join(subi, "rec_bundles", bun), reference="same", bbox_valid_check=False, ).streamlines ba_value = bundle_shape_similarity( bundle1, bundle2, rng, clust_thr=clust_thr, threshold=threshold ) ba_matrix[i][j] = ba_value j += 1 i += 1 logging.info("saving BA score matrix") np.save(os.path.join(out_dir, bun[:-4] + ".npy"), ba_matrix) cmap = matplt.colormaps["Blues"] plt.title(bun[:-4]) plt.imshow(ba_matrix, cmap=cmap) plt.colorbar() plt.clim(0, 1) plt.savefig(os.path.join(out_dir, f"SM_{bun[:-4]}")) plt.clf()