""" Fingerprinting module
See SIHNPY documentation for more information on the functions of the script.
"""
import os
import numpy as np
import pandas as pd
from scipy import stats
[docs]def import_fingerprint_ids(id_list):
"""Function importing the list of IDs to analyze. We assume that the list of IDs are stored
in either a .csv or .tsv file, or a text file with 1 ID per line.
Parameters
----------
id_list : str
Path on the local computer to the file where the IDs are stored.
Returns
-------
list
Returns a list where each element is a participant ID.
"""
#First, import the IDs. Assume the IDs are correct and in the first column only
if id_list.endswith('.csv'):
id_df = pd.read_csv(f'{id_list}', usecols=[0], index_col=0)
id_ls = id_df.index.values.astype(str).tolist() #Gives a list
elif id_list.endswith('.tsv'):
id_df = pd.read_csv(f'{id_list}', sep='\t', usecols=[0], index_col=0,)
id_ls = id_df.index.values.astype(str).tolist()
else:
try:
id_arr = np.loadtxt(f'{id_list}', usecols=0)
except ValueError:
#If there is column header or some/all IDs are not floats, we force to be a string
id_arr = np.loadtxt(f'{id_list}', dtype='str', usecols=0)
id_clean = id_arr[1:] #Assume there is a column header
id_ls = id_clean.astype(str).tolist()
return id_ls
[docs]def _slice_matrix(matrix_file, nodes_index_within, nodes_index_between=None):
"""Internal function slicing matrices and returning "flattened" vectors.
Parameters
----------
matrix_file : numpy.array
Array for a given participant comprising all the functional connectivity nodes.
nodes_index_between : list
List of nodes to include in the fingerprinting calculation. If not interested in looking
at between-network, the function defaults to calculating within-network. Defaults to None.
nodes_index_within : list
List of nodes to include in the fingerprinting calculation.
Returns
-------
numpy.array
Returns a flattened array of the functional connectivity data.
"""
if nodes_index_between:
submatrix = matrix_file[nodes_index_within][:, nodes_index_between]
#If between network, we force numpy to flatten the array to match the within-network input
r_flat = submatrix.flatten()
else:
submatrix = matrix_file[nodes_index_within][:, nodes_index_within]
#In the within-network, using np.triu_indices return by default a flat array.
#We use triu_indices because the within-network matrix is symmetric. Including
#the bottom half of the matrix would cause over-estimation of the correlation.
r_flat = submatrix[np.triu_indices(len(submatrix), k=1)]
return r_flat
[docs]def _norm_data(array_to_norm, norm=True):
"""Internal function normalizing (if necessary) the arrays before fingeprinting. If normalizing, we change the cells that are calculated as Infinity to be missing.
Parameters
----------
array_to_norm : numpy.array
Raw sliced array to normalize.
norm : bool, optional
Whether or not the normalization should be applied, by default True
Returns
-------
numpy.array
Array with the chosen normalization applied to. Returns a copy of the array if no
normalization is applied.
"""
if norm is True:
#By default, we apply a Fisher normalization.
np.seterr(all='ignore') #Ignore "division by Zero Warning."
z1_transf = np.arctanh(array_to_norm)
z1_norm = np.where(np.isinf(z1_transf), 0, z1_transf)
else:
z1_norm = array_to_norm.copy()
return z1_norm
[docs]class FingerprintMats:
"""Class object used to store information for the fingerprinting and to output
the results of the fingerprinting analysis. This object is to be used when the
input data is folders with 1 matrix per subject.
"""
def __init__(self, id_ls, path_m1, path_m2):
"""Creates a FingerprintMats object made up of a list of ids, and the path to the data.
Parameters
----------
id_ls : list
List of participants to fingerprint
path_m1 : str
Path (string) to the folder containing the participants
path_m2 : _type_
_description_
"""
self.id_ls = id_ls #Final list of IDs to fingerprint
self.path_m1 = path_m1 #Location of the first set of matrices (first modality)
self.path_m2 = path_m2 #Location of the second set of matrices (second modality)
#Empty variables to store further computation.
self.sub_final = None
self.final_m1 = None
self.final_m2 = None
[docs] def fetch_matrix_file_names(self):
"""Simple function importing the matrices as input for the fingerprinting computation.
Does not require any argument (will use the path variables from the FingerprintMats
objects).
Raises
------
OSError
Checks whether the path exists and is able to import the file.
"""
files_m1 = []
files_m2 = []
#First, find all the files in directories and store in a list
try:
for filename in os.listdir(self.path_m1):
if os.path.isfile(f"{self.path_m1}/{filename}"):
files_m1.append(filename)
for filename in os.listdir(self.path_m2):
if os.path.isfile(f"{self.path_m2}/{filename}"):
files_m2.append(filename)
except OSError:
raise OSError("ERROR: Path given as input doesn't exist.")
return files_m1, files_m2
[docs] def subject_selection(self, files_m1, files_m2, verbose=True):
"""Select participant files that are present in both modalities (i.e., intersection).
The function assumes that the ID in the ID list will match in some way the file name
in the folder (e.g., ID 6745 would match a matrix file named `6745.txt` or
`part6745_rest.txt` or `6745`, but it will not match `674.txt`).
Parameters
----------
files_m1 : list of str
List of files for the first modality
files_m2 : list of str
List of files for the second modality
verbose : bool, optional
Whether or not we want an explicit description of participants included,
by default True
Returns
-------
list
Returns three lists: participant ids included in the end, and the list
of their filenames
Raises
------
SystemExit
If no subject ID is matched to any files, exit.
SystemExit
If files are duplicated after matching with subject list, exit.
SystemExit
If files are duplicated after matching with subject list, exit.
"""
if verbose is True:
print(f'We have {len(self.id_ls)} subjects in the list.')
#Figure out which participants we have files for
sub_1 = [subject for subject in self.id_ls
for filename in files_m1 if subject in filename]
sub_2 = [subject for subject in self.id_ls
for filename in files_m2 if subject in filename]
#Figure out which participants intersect and sort them in order
sub_final = sorted(list(set(sub_1) & set(sub_2)))
#Figure out which files we have for the participants in both modalities
final_m1 = [filename for subject in sub_final
for filename in files_m1 if subject in filename]
final_m2 = [filename for subject in sub_final
for filename in files_m2 if subject in filename]
#Check user input to make sure it is ok.
if ((len(final_m1) == 0) | (len(final_m2) == 0)):
raise SystemExit("ERROR: Could not match subject IDs from the list to any file.")
if len(final_m1) != len(set(final_m1)):
raise SystemExit("ERROR: Files of modality 1 are duplicated")
if len(final_m2) != len(set(final_m2)):
raise SystemExit("ERROR: Files of modality 2 are duplicated")
if verbose is True:
print(f"We have in total {len(sub_1)} participants in modality 1 & {len(sub_2)} " +
"participants in modality 2.")
print(f"A total of {len(sub_final)} have both modalities. Only these are used.")
print(sub_final)
#Store the final stuff for later computation
self.sub_final = sub_final
self.final_m1 = final_m1
self.final_m2 = final_m2
return sub_final, final_m1, final_m2
[docs] def _import_matrix(self, mod, i):
"""Internal function importing the matrices of interest from the local computer during
the fingerprinting operation.
Parameters
----------
mod : int
Integer (1 or 2) indicating which folder to fetch the folders from
i : int
Integer given by the loop in the fingerprint function. It identifies which list
element we should import.
Returns
-------
numpy.array
Returns a numpy array containing the matrix of interest
"""
if mod == 1:
try:
matrix_file = np.loadtxt(f'{self.path_m1}/{self.final_m1[i]}', dtype=np.double)
except ValueError:
matrix_file = np.loadtxt(f'{self.path_m1}/{self.final_m1[i]}',
delimiter=',', dtype=np.double)
elif mod == 2:
try:
matrix_file = np.loadtxt(f'{self.path_m2}/{self.final_m2[i]}', dtype=np.double)
except ValueError:
matrix_file = np.loadtxt(f'{self.path_m2}/{self.final_m2[i]}',
delimiter=',', dtype=np.double)
return matrix_file
[docs] def fingerprint_mats(self, nodes_index_within, nodes_index_between=None,
norm=True, corr_type="Pearson", verbose=True):
"""Core fingerprinting function. Takes every pair of matrices from modality 1 and 2
and applies the fingerprint methodology between them.
Parameters
----------
nodes_index_within : list of int
List of integers representing the number of nodes to select. If nodes_index_between is
not given, we assume we want to extract a symmetric sub-matrix (i.e., within-network).
nodes_index_between : list of int, optional
If requested, the matrix fed to the fingerprint can be asymmetric, which is the case
when wanting to do between-network fingerprinting, by default None
norm : bool, optional
Whether or not to Fisher normalize the data before fingerprinting, by default True
corr_type : str, optional
Which correlation measure to use for generating fingerprinting, by default "Pearson".
Options include: ["Pearson"]
verbose : bool, optional
Whether or not to print a message of which participants we are doing, by default True
Returns
-------
numpy.array
Returns a similarity matrix of the correlations within and between participants.
Raises
------
SystemExit
If the FingerprintMats step was skipped, we fail this function.
"""
if self.sub_final is None:
raise SystemExit("ERROR: Did you instantiate the FingerprintMats class and/or \
run the fetch_matrix_file_names and subject_selection functions first?")
similar_matrix = np.empty((len(self.sub_final), len(self.sub_final)))
#For every participant, we need to correlate to every other participant.
# We do this using a nested loop
for i, sub in enumerate(self.sub_final):
if verbose is True:
print(f"Working on participant {i + 1}: {sub}")
#Imports the matrix for participant i (depending of where we are in loop)
matrix_file_m1 = self._import_matrix(1, i)
#Slice and return the flat array of values to correlate
#Removes the lower triangle and diagonal if using within-network nodes as it will be
# symetric and the diagonal will be "1"
r1_flat = _slice_matrix(matrix_file_m1, nodes_index_within, nodes_index_between)
#If necessary, we normalize the data using Fisher's transformation
z1_data = _norm_data(r1_flat, norm=norm)
#For every participant "i", repeat the operations for participant "j"
# (i.e., every participant) included
for j in range(0, len(self.sub_final)):
matrix_file_m2 = self._import_matrix(2, j)
r2_flat = _slice_matrix(matrix_file_m2, nodes_index_within, nodes_index_between)
z2_data = _norm_data(r2_flat, norm=norm)
#In case of missing value because of the normalization in "i" or "j",
# we remove missing cells, otherwise Scipy will throw an error
missing_removed = ~np.logical_or(np.isnan(z1_data), np.isnan(z2_data))
z1_clean = np.compress(missing_removed, z1_data)
z2_clean = np.compress(missing_removed, z2_data)
#Correlate the array from the first subject to the array of the second subject
if corr_type == "Pearson":
similar_matrix[i,j] = stats.pearsonr(z1_clean, z2_clean)[0] #Extract just the correlation
#Fill lower triangle of the matrix for symmetry
similar_matrix = np.triu(similar_matrix, k=0) + np.triu(similar_matrix, k=1).T
return similar_matrix
[docs] def _fia_calculator(self, similar_matrix):
"""Internal function computing the fingerprint identification accuracy,
(number of correct identifications).
Parameters
-------
similar_matrix : numpy.array
Similarity matrix from `fingerprint_mats` function
Returns
-------
numpy.array
Binary array for every participant included: a 1 indicates correct identification
within the cohort and a 0 indicates incorrect identification.
"""
fia_coef = np.empty(shape=len(self.sub_final))
#For every row in the similarity matrix, if the maximum is achieved at the diagonal,
# attribute a 1, otherwise a 0.
for i in range(0, len(self.sub_final)):
if np.argmax(similar_matrix[i, :]) == i:
fia_coef[i] = 1
else:
fia_coef[i] = 0
return fia_coef
[docs] def _si_calculator(self, similar_matrix):
"""Internal function computing the self-identifiability (within-individual correlation).
This is defined as the diagonal (within-individual correlations) of the similarity matrix.
Parameters
-------
similar_matrix : numpy.array
Similarity matrix from `fingerprint_mats` function
Returns
-------
numpy.array
Returns an array containing the self-identifiability.
"""
si_coef = np.diag(similar_matrix)
return si_coef
[docs] def _oi_calculator(self, similar_matrix):
"""Internal function computing the others-identifiability (between-individual correlation).
This is defined as the average of the off-diagonal elements (row-wise) of the similarity
matrix.
Parameters
-------
similar_matrix : numpy.array
Similarity matrix from `fingerprint_mats` function
Returns
-------
numpy.array
Returns an array containing the others-identifiability.
"""
oi_coef = (similar_matrix.sum(1)-np.diag(similar_matrix))\
/(similar_matrix.shape[1]-1)
return oi_coef
[docs] def _identif_calculator(self, si_coef, oi_coef):
"""Internal function computing the differential identifiability metric
from Amico and Goni (2018). This is simply the substraction of the diagonal and average
off-diagonal elements from the similarity matrix.
Parameters
----------
si_coef : numpy.array
Array containing the fingerprinting coefficient.
oi_coef : numpy.array
Array containing the alikeness coefficient.
Returns
-------
numpy.array
Returns an array containing the differential identifiability.
"""
diff_ident = si_coef - oi_coef
return diff_ident
[docs] def fp_metrics_calc(self, similar_matrix, name):
"""Method computing the different fingerprint metrics and stores them in a dataframe
for export. Each metric is computed and stored in a numpy.array which are then used
to populate the dataframe.
Parameters
----------
similar_matrix : numpy.array
Similarity matrix from `fingerprint_mats` function
name : str
String to add to the variables. This is so the user can differentiate the different
runs of the fingerprinting if multiple are used.
Returns
-------
pandas.DataFrame
Returns a pandas.DataFrame containing 5 columns: the ID and each of the four metrics.
"""
#Compute the different metrics
fia_coef = self._fia_calculator(similar_matrix=similar_matrix)
si_coef = self._si_calculator(similar_matrix=similar_matrix)
oi_coef = self._oi_calculator(similar_matrix=similar_matrix)
diff_identif_coef = self._identif_calculator(si_coef, oi_coef)
#Create a dictionary and store the measures
coef_data = pd.DataFrame(data={
'ID':self.sub_final,
f"si_{name}":si_coef,
f"oi_{name}":oi_coef,
f"fia_{name}":fia_coef,
f"di_{name}":diff_identif_coef})\
.set_index('ID')
if coef_data[f"si_{name}"].isnull().sum() != 0:
raise SystemExit("ERROR: Some participants have missing values from final dataframe")
return coef_data
[docs] def fp_mat_export(self, output_path, coef_data, similar_matrix, name, out_full=True, dir_struct=True):
"""Export the fingerprinting output to file. What is outputted and how is user
dependant. By default, exports the similarity matrix, the subject list and the
computed fingerprint metrics, and creates separate dictories for the similarity
matrix and the subject list.
Parameters
----------
output_path : str
Path where all the fingerprinting output should go.
coef_data : pandas.Dataframe
Dataframe containing the fingerprinting coefficients calculated before.
similar_matrix : numpy.array
Similarity matrix containing the fingerprinting coefficients
name : str
String to add to the file names
out_full : bool, optional
Whether we want the similarity matrix and subject list to be outputted, by default True
dir_struct : bool, optional
Whether we want similarity matrix and subject list to have their own directory, by
default True
"""
path_fp_final = f'{output_path}/{name}'
if os.path.exists(path_fp_final) is False:
os.makedirs(path_fp_final)
coef_data.to_csv(f"{path_fp_final}/fp_metrics_{name}.csv")
#If we want to output the similarity matrices and the subject lists too...
if out_full is True:
#We output ALL the elements of the fingerprinting (similarity matrix, subject_list
# and fingerprint measures)
if dir_struct is True:
#If we do a full directory structure, we want to output all the elements to
# separate directories
dir_sym = f"{path_fp_final}/similarity_matrices"
dir_sub = f"{path_fp_final}/subject_list"
if not os.path.exists(dir_sym):
os.makedirs(dir_sym)
if not os.path.exists(dir_sub):
os.makedirs(dir_sub)
np.savetxt(f"{dir_sym}/similarity_matrix_{name}.csv", similar_matrix,
delimiter=",", fmt='%1.3f')
np.savetxt(f"{dir_sub}/subject_list_{name}.csv", self.id_ls,
delimiter="\n", fmt="%s")
else:
np.savetxt(f"{path_fp_final}/similarity_matrix_{name}.csv", similar_matrix,
delimiter=",", fmt='%1.3f')
np.savetxt(f"{path_fp_final}/subject_list_{name}.csv", self.id_ls,
delimiter="\n", fmt="%s")
##########
[docs]def import_fingerprint_data(data, var):
""" Function importing the data used for fingerprinting. This function assumes two important
things: 1) The dataframe you are feeding it has an index that comprises the IDs of the
participants and 2) the dataframe is in long form (i.e., one participant has more than one
visit). Specifically, there should be a variable in the dataframe specifying the visit (`var`)
argument.
Note that by default, `sihnpy` will grab the first and last visit of a participant if there
are more than two visits. If you are interested in fingerprinting specific visits
`sihnpy` will also remove participants with only 1 visit as they can't be fingerprinted.
"""
#Find the unique values of the IDs
unique_ids = (data
.groupby(data.index)[var] #Group by the index and keep only the column specifying session
.nunique() #Compute number of unique entries for each participant
.loc[lambda x: x < 2] #Keep IDs of participants we know only have 1 visit
)
#Drop the participants with only one visit
final_data = data.drop(labels=unique_ids.index, axis=0)
if len(final_data.index) == 0:
return "ERROR: Dropping participants with 1 visit resulted in no participants being left. Confirm data is in long format."
#Take the first and last visit, and store in dataframes.
data_first = final_data.groupby(final_data.index).first()
data_last = final_data.groupby(final_data.index).last()
if data_first.index.values.all() != data_last.index.values.all():
return "ERROR: Index of the two datasets do not match. Won't fingerprint."
return data_first, data_last
[docs]def fingerprint_tabs(data1, data2, pref):
""" Main function computing fingerprinting for tabular data. It assumes that the variables to
use for fingerprinting start with naming convention (e.g., "ctx").
"""
data1_final = data1.filter(like=pref) #Restrict columns to the ones we need only
data2_final = data2.filter(like=pref) #Restrict columns to the ones we need only
if data1_final.index.values.all() != data2_final.index.values.all():
return "ERROR: Index of the two datasets do not match. Can't fingerprint."
if data1_final.columns.values.all() != data2_final.columns.values.all():
return "ERROR: Columns of the two datasets do not match. Can't fingerprint."
#Create similarity matrix to store the data
similar_matrix = np.empty((len(data1_final), len(data2_final)))
#Fingerprinting
## For each participant...
for i, col in enumerate(data1_final.T): #We transpose the dataframe to leverage vectorized ops
print(f"Participant {i+1} / {len(data1_final)}")
data1_array = data1_final.T[col].to_numpy() #Transform column to numpy array
for j, col in enumerate(data2_final.T):
data2_array = data2_final.T[col].to_numpy()
similar_matrix[i,j] = stats.pearsonr(data1_array, data2_array)[0]
#Clean the similarity matrix and return
return np.triu(similar_matrix, k=0) + np.triu(similar_matrix, k=1).T
[docs]def tab_metrics_calc(data, similar_matrix, name):
""" Function computing the different fingerprint metrics and stores them in a dataframe
for export. Each metric is computed and stored in a numpy.array which are then used
to populate the dataframe.
Parameters
----------
data : pandas.DataFrame
Either first or last visit of fingerprinting used. This is only used to grab the
IDs of the participants and set them as index.
similar_matrix : numpy.array
Similarity matrix from `fingerprint_tabs` function
name : str
String to add to the variables. This is so the user can differentiate the different
runs of the fingerprinting if multiple are used.
Returns
-------
pandas.DataFrame
Returns a pandas.DataFrame containing 5 columns: the ID and each of the four metrics.
"""
#Compute the different metrics
fia_coef = _fia_calculator(similar_matrix=similar_matrix)
si_coef = _si_calculator(similar_matrix=similar_matrix)
oi_coef = _oi_calculator(similar_matrix=similar_matrix)
diff_identif_coef = _identif_calculator(si_coef, oi_coef)
#Create a dictionary and store the measures
fp_metrics = pd.DataFrame(data={
'participant_id':data.index.values,
f"si_{name}":si_coef,
f"oi_{name}":oi_coef,
f"fia_{name}":fia_coef,
f"di_{name}":diff_identif_coef})\
.set_index('participant_id')
if fp_metrics[f"si_{name}"].isnull().sum() != 0:
raise SystemExit("ERROR: Some participants have missing values from final dataframe")
return fp_metrics
[docs]def tab_export(outpath, data1, data2, similar_matrix, fp_metrics, name):
""" Simple wrapper function exporting the data for both visits of participants fingerprinted,
the similarity matrix, the fingerprint metrics and the name given by the user.
"""
data1.to_csv(f"{outpath}/fp_data_first_session_{name}.csv")
data2.to_csv(f"{outpath}/fp_data_second_session_{name}.csv")
sim_matrix_df = pd.DataFrame(data=similar_matrix,
index=data1.index.values, columns=data1.index.values)\
.to_csv(f"{outpath}/similarity_matrix_{name}.csv")
fp_metrics.to_csv(f"{outpath}/fp_metrics_{name}.csv")
##### Utility functions
[docs]def _fia_calculator(similar_matrix):
"""Internal function computing the fingerprint identification accuracy,
(number of correct identifications).
Parameters
-------
similar_matrix : numpy.array
Similarity matrix
Returns
-------
numpy.array
Binary array for every participant included: a 1 indicates correct identification
within the cohort and a 0 indicates incorrect identification.
"""
fia_coef = np.empty(shape=len(similar_matrix))
#For every row in the similarity matrix, if the maximum is achieved at the diagonal,
# attribute a 1, otherwise a 0.
for i in range(0, len(similar_matrix)):
if np.argmax(similar_matrix[i, :]) == i:
fia_coef[i] = 1
else:
fia_coef[i] = 0
return fia_coef
[docs]def _si_calculator(similar_matrix):
"""Internal function computing the self-identifiability (within-individual correlation).
This is defined as the diagonal (within-individual correlations) of the similarity matrix.
Parameters
-------
similar_matrix : numpy.array
Similarity matrix
Returns
-------
numpy.array
Returns an array containing the self-identifiability.
"""
si_coef = np.diag(similar_matrix)
return si_coef
[docs]def _oi_calculator(similar_matrix):
"""Internal function computing the others-identifiability (between-individual correlation).
This is defined as the average of the off-diagonal elements (row-wise) of the similarity
matrix.
Parameters
-------
similar_matrix : numpy.array
Similarity matrix
Returns
-------
numpy.array
Returns an array containing the others-identifiability.
"""
oi_coef = (similar_matrix.sum(1)-np.diag(similar_matrix))\
/(similar_matrix.shape[1]-1)
return oi_coef
[docs]def _identif_calculator(si_coef, oi_coef):
"""Internal function computing the differential identifiability metric
from Amico and Goni (2018). This is simply the substraction of the diagonal and average
off-diagonal elements from the similarity matrix.
Parameters
----------
si_coef : numpy.array
Array containing the self-identifiability.
oi_coef : numpy.array
Array containing the others-identifiability.
Returns
-------
numpy.array
Returns an array containing the differential identifiability.
"""
diff_ident = si_coef - oi_coef
return diff_ident