Module langbrainscore.mapping.mapping
Expand source code
import typing
from functools import partial
from tqdm.auto import tqdm
import numpy as np
from joblib import Parallel, delayed
import xarray as xr
from sklearn.cross_decomposition import PLSRegression
from sklearn.linear_model import LinearRegression, RidgeCV
from langbrainscore.interface import _Mapping
from langbrainscore.utils import logging
from langbrainscore.utils.xarray import collapse_multidim_coord
mapping_classes_params = {
"linreg": (LinearRegression, {}),
"linridge_cv": (RidgeCV, {"alphas": np.logspace(-3, 3, 13)}),
"linpls": (PLSRegression, {"n_components": 20}),
}
class IdentityMap(_Mapping):
"""
Identity mapping for use with metrics that operate
on non column-aligned matrices, e.g., RSA, CKA
Imputes NaNs for downstream metrics.
"""
def __init__(self, nan_strategy: str = "drop") -> "IdentityMap":
self._nan_strategy = nan_strategy
def fit_transform(
self,
X: xr.DataArray,
Y: xr.DataArray,
ceiling: bool = False,
):
if ceiling:
logging.log("ceiling not supported for IdentityMap yet")
# TODO: figure out how to handle NaNs better...
if self._nan_strategy == "drop":
X_clean = X.copy(deep=True).dropna(dim="neuroid")
Y_clean = Y.copy(deep=True).dropna(dim="neuroid")
elif self._nan_strategy == "impute":
X_clean = X.copy(deep=True).fillna(0)
Y_clean = Y.copy(deep=True).fillna(0)
else:
raise NotImplementedError("unsupported nan strategy.")
return X_clean, Y_clean
class LearnedMap(_Mapping):
def __init__(
self,
mapping_class: typing.Union[
str, typing.Tuple[typing.Callable, typing.Mapping[str, typing.Any]]
],
random_seed: int = 42,
k_fold: int = 5,
strat_coord: str = None,
num_split_groups_out: int = None, # (p, the # of groups in the test split)
split_coord: str = None, # (grouping coord)
# TODO
# handle predict held-out subject # but then we have to do mean over ROIs
# because individual neuroids do not correspond
# we kind of already have this along the `sampleid` coordinate, but we
# need to implement this in the neuroid coordinate
**kwargs,
) -> "LearnedMap":
"""
Initializes a Mapping object that describes a mapping between two encoder representations.
Args:
mapping_class (typing.Union[str, typing.Any], required): [description].
This Class will be instatiated to get a mapping model. E.g. LinearRegression, Ridge,
from the sklearn family. Must implement <?classifier> interface
random_seed (int, optional): [description]. Defaults to 42.
k_fold (int, optional): [description]. Defaults to 5.
strat_coord (str, optional): [description]. Defaults to None.
num_split_groups_out (int, optional): [description]. Defaults to None.
split_coord (str, optional): [description]. Defaults to None.
"""
self.random_seed = random_seed
self.k_fold = k_fold
self.strat_coord = strat_coord
self.num_split_groups_out = num_split_groups_out
self.split_coord = split_coord
self.mapping_class_name = mapping_class
self.mapping_params = kwargs
if type(mapping_class) == str:
_mapping_class, _kwargs = mapping_classes_params[self.mapping_class_name]
self.mapping_params.update(_kwargs)
# in the spirit of duck-typing, we don't need any of these checks. we will automatically
# fail if we're missing any of these attributes
# else:
# assert callable(mapping_class)
# assert hasattr(mapping_class(), "fit")
# assert hasattr(mapping_class(), "predict")
# TODO: what is the difference between these two (model; full_model)? let's make this less
# confusing
self.full_model = _mapping_class(**self.mapping_params)
self.model = _mapping_class(**self.mapping_params)
logging.log(f"initialized Mapping with {type(self.model)}!")
@staticmethod
def _construct_splits(
xr_dataset: xr.Dataset,
strat_coord: str,
k_folds: int,
split_coord: str,
num_split_groups_out: int,
random_seed: int,
):
from sklearn.model_selection import (
GroupKFold,
KFold,
StratifiedGroupKFold,
StratifiedKFold,
)
sampleid = xr_dataset.sampleid.values
if strat_coord and split_coord:
kf = StratifiedGroupKFold(
n_splits=k_folds, shuffle=True, random_state=random_seed
)
split = partial(
kf.split,
sampleid,
y=xr_dataset[split_coord].values,
groups=xr_dataset[strat_coord].values,
)
elif split_coord:
kf = GroupKFold(n_splits=k_folds)
split = partial(kf.split, sampleid, groups=xr_dataset[split_coord].values)
elif strat_coord:
kf = StratifiedKFold(
n_splits=k_folds, shuffle=True, random_state=random_seed
)
split = partial(kf.split, sampleid, y=xr_dataset[strat_coord].values)
else:
kf = KFold(n_splits=k_folds, shuffle=True, random_state=random_seed)
split = partial(kf.split, sampleid)
logging.log(f"running {type(kf)}!", verbosity_check=True)
return split()
def construct_splits(self, A):
return self._construct_splits(
A,
self.strat_coord,
self.k_fold,
self.split_coord,
self.num_split_groups_out,
random_seed=self.random_seed,
)
def fit_full(self, X, Y):
# TODO
self.fit(X, Y, k_folds=1)
raise NotImplemented
def _check_sampleids(
self,
X: xr.DataArray,
Y: xr.DataArray,
):
"""
checks that the sampleids in X and Y are the same
"""
if X.sampleid.values.shape != Y.sampleid.values.shape:
raise ValueError("X and Y sampleid shapes do not match!")
if not np.all(X.sampleid.values == Y.sampleid.values):
raise ValueError("X and Y sampleids do not match!")
logging.log(
f"Passed sampleid check for neuroid {Y.neuroid.values}",
verbosity_check=True,
)
def _drop_na(
self, X: xr.DataArray, Y: xr.DataArray, dim: str = "sampleid", **kwargs
):
"""
drop samples with missing values (based on Y) in X or Y along specified dimension
Make sure that X and Y now have the same sampleids
"""
# limit data to current neuroid, and then drop the samples that are missing data for this neuroid
Y_slice = Y.dropna(dim=dim, **kwargs)
Y_filtered_ids = Y_slice[dim].values
assert set(Y_filtered_ids).issubset(set(X[dim].values))
logging.log(
f"for neuroid {Y_slice.neuroid.values}, we used {(num_retained := len(Y_filtered_ids))}"
f" samples; dropped {len(Y[dim]) - num_retained}",
verbosity_check=True,
)
# use only the samples that are in Y
X_slice = X.sel(sampleid=Y_filtered_ids)
return X_slice, Y_slice
# def _permute_X(
# self,
# X: xr.DataArray,
# method: str = "shuffle_X_rows",
# random_state: int = 42,
# ):
# """Permute the features of X.
#
# Parameters
# ----------
# X : xr.DataArray
# The embeddings to be permuted
# method : str
# The method to use for permutation.
# 'shuffle_X_rows' : Shuffle the rows of X (=shuffle the sentences and create a mismatch between the sentence embeddings and target)
# 'shuffle_each_X_col': For each column (=feature/unit) of X, permute that feature's values across all sentences.
# Retains the statistics of the original features (e.g., mean per feature) but the values of the features are shuffled for each sentence.
# random_state : int
# The seed for the random number generator.
#
# Returns
# -------
# xr.DataArray
# The permuted dataarray
# """
#
# X_orig = X.copy(deep=True)
#
# if logging.get_verbosity():
# logging.log(f"OBS: permuting X with method {method}")
#
# if method == "shuffle_X_rows":
# X = X.sample(
# n=X.shape[1], random_state=random_state
# ) # check whether X_shape is correct
#
# elif method == "shuffle_each_X_col":
# np.random.seed(random_state)
# for feat in X.data.shape[0]: # per neuroid
# np.random.shuffle(X.data[feat, :])
#
# else:
# raise ValueError(f"Invalid method: {method}")
#
# assert X.shape == X_orig.shape
# assert np.all(X.data != X_orig.data)
#
# return X
def fit_transform(
self,
X: xr.DataArray,
Y: xr.DataArray,
# permute_X: typing.Union[bool, str] = False,
ceiling: bool = False,
ceiling_coord: str = "subject",
) -> typing.Tuple[xr.DataArray, xr.DataArray]:
"""creates a mapping model using k-fold cross-validation
-> uses params from the class initialization, uses strat_coord
and split_coord to stratify and split across group boundaries
Returns:
[type]: [description]
"""
from sklearn.random_projection import GaussianRandomProjection
if ceiling:
n_neuroids = X.neuroid.values.size
X = Y.copy()
logging.log(f"X shape: {X.data.shape}", verbosity_check=True)
logging.log(f"Y shape: {Y.data.shape}", verbosity_check=True)
if self.strat_coord:
try:
assert (X[self.strat_coord].values == Y[self.strat_coord].values).all()
except AssertionError as e:
raise ValueError(
f"{self.strat_coord} coordinate does not align across X and Y"
)
if self.split_coord:
try:
assert (X[self.split_coord].values == Y[self.split_coord].values).all()
except AssertionError as e:
raise ValueError(
f"{self.split_coord} coordinate does not align across X and Y"
)
def fit_per_neuroid(neuroid):
Y_neuroid = Y.sel(neuroid=neuroid)
# limit data to current neuroid, and then drop the samples that are missing data for this neuroid
X_slice, Y_slice = self._drop_na(X, Y_neuroid, dim="sampleid")
# Assert that X and Y have the same sampleids
self._check_sampleids(X_slice, Y_slice)
# select relevant ceiling split
if ceiling:
X_slice = X_slice.isel(
neuroid=X_slice[ceiling_coord] != Y_slice[ceiling_coord]
).dropna(dim="neuroid")
# We can perform various sanity checks by 'permuting' the source, X
# NOTE this is a test! do not use under normal workflow!
# if permute_X:
# logging.log(
# f"`permute_X` flag is enabled. only do this in an adversarial setting.",
# cmap="WARN",
# type="WARN",
# verbosity_check=True,
# )
# X_slice = self._permute_X(X_slice, method=permute_X)
# these collections store each split for our records later
# TODO we aren't saving this to the object instance yet
train_indices = []
test_indices = []
# only used in case of ridge_cv or any duck type that uses an alpha hparam
splits = self.construct_splits(Y_slice)
# X_test_collection = []
Y_test_collection = []
Y_pred_collection = []
for cvfoldid, (train_index, test_index) in enumerate(splits):
train_indices.append(train_index)
test_indices.append(test_index)
# !! NOTE the _nan_removed variants instead of X and Y
X_train, X_test = (
X_slice.sel(sampleid=Y_slice.sampleid.values[train_index]),
X_slice.sel(sampleid=Y_slice.sampleid.values[test_index]),
)
y_train, y_test = (
Y_slice.sel(sampleid=Y_slice.sampleid.values[train_index]),
Y_slice.sel(sampleid=Y_slice.sampleid.values[test_index]),
)
# empty list to house the y_predictions per timeid
y_pred_over_time = []
for timeid in y_train.timeid:
# TODO: change this code for models that also have a non-singleton timeid
# i.e., output evolves in time (RNN?)
x_model_train = X_train.sel(timeid=0).values
y_model_train = y_train.sel(timeid=timeid).values.reshape(-1, 1)
if ceiling and x_model_train.shape[1] > n_neuroids:
projection = GaussianRandomProjection(
n_components=n_neuroids, random_state=0
)
x_model_train = projection.fit_transform(x_model_train)
self.model.fit(
x_model_train,
y_model_train,
)
# store the hparam values related to the fitted models
alpha = getattr(self.model, "alpha_", np.nan)
# deepcopy `y_test` as `y_pred` to inherit some of the metadata and dims
# and then populate it with our new predicted values
y_pred = (
y_test.sel(timeid=timeid)
.copy(deep=True)
.expand_dims("timeid", 1)
)
x_model_test = X_test.sel(timeid=0)
if ceiling and x_model_train.shape[1] > n_neuroids:
x_model_test = projection.transform(x_model_test)
y_pred.data = self.model.predict(x_model_test) # y_pred
y_pred = y_pred.assign_coords(timeid=("timeid", [timeid]))
y_pred = y_pred.assign_coords(alpha=("timeid", [alpha]))
y_pred = y_pred.assign_coords(cvfoldid=("timeid", [cvfoldid]))
y_pred_over_time.append(y_pred)
y_pred_over_time = xr.concat(y_pred_over_time, dim="timeid")
Y_pred_collection.append(y_pred_over_time)
Y_test_collection.append(y_test)
Y_test = xr.concat(Y_test_collection, dim="sampleid").sortby("sampleid")
Y_pred = xr.concat(Y_pred_collection, dim="sampleid").sortby("sampleid")
# test.append(Y_test)
# pred.append(Y_pred)
return Y_test, Y_pred
# Loop across each Y neuroid (target)
test = []
pred = []
# TODO: parallelize using order-preserving joblib-mapping
# for neuroid in tqdm(Y.neuroid.values, desc="fitting a model per neuroid"):
for t, p in Parallel(n_jobs=-2)(
delayed(fit_per_neuroid)(neuroid)
for neuroid in tqdm(Y.neuroid.values, desc="fitting a model per neuroid")
):
test += [t]
pred += [p]
test_xr = xr.concat(test, dim="neuroid").transpose(
"sampleid", "neuroid", "timeid"
)
pred_xr = xr.concat(pred, dim="neuroid").transpose(
"sampleid", "neuroid", "timeid"
)
if test_xr.stimulus.ndim > 1:
test_xr = collapse_multidim_coord(test_xr, "stimulus", "sampleid")
if pred_xr.stimulus.ndim > 1:
pred_xr = collapse_multidim_coord(pred_xr, "stimulus", "sampleid")
return pred_xr, test_xr
# def map(self, source, target) -> None:
# '''
# the works: constructs splits, fits models for each split, then evaluates the fit
# of each split and returns the result (also for each split)
# '''
# pass
def save_model(self) -> None:
"""TODO: stuff that needs to be saved eventually
- model weights
- CV stuff (if using CV); but all arguments needed for initializing, in general
- n_splits
- random_state
- split indices (based on random seed)
- params per split (alpha, lambda)
- validation score, etc. for each CV split?
"""
pass
def predict(self, source) -> None:
pass
Classes
class IdentityMap (nan_strategy: str = 'drop')
-
Identity mapping for use with metrics that operate on non column-aligned matrices, e.g., RSA, CKA
Imputes NaNs for downstream metrics.
Expand source code
class IdentityMap(_Mapping): """ Identity mapping for use with metrics that operate on non column-aligned matrices, e.g., RSA, CKA Imputes NaNs for downstream metrics. """ def __init__(self, nan_strategy: str = "drop") -> "IdentityMap": self._nan_strategy = nan_strategy def fit_transform( self, X: xr.DataArray, Y: xr.DataArray, ceiling: bool = False, ): if ceiling: logging.log("ceiling not supported for IdentityMap yet") # TODO: figure out how to handle NaNs better... if self._nan_strategy == "drop": X_clean = X.copy(deep=True).dropna(dim="neuroid") Y_clean = Y.copy(deep=True).dropna(dim="neuroid") elif self._nan_strategy == "impute": X_clean = X.copy(deep=True).fillna(0) Y_clean = Y.copy(deep=True).fillna(0) else: raise NotImplementedError("unsupported nan strategy.") return X_clean, Y_clean
Ancestors
- langbrainscore.interface.mapping._Mapping
- langbrainscore.interface.cacheable._Cacheable
- typing.Protocol
- typing.Generic
- abc.ABC
Methods
def fit_transform(self, X: xarray.core.dataarray.DataArray, Y: xarray.core.dataarray.DataArray, ceiling: bool = False)
-
takes in two xarrays with a shared set of samples and returns a new pair of xarrays (Y_pred, Y_true) to be compared with a metric.T
Y_pred is either derived from a learned mapping on X or can be X itself when the downstream metric supports comparison of matrices with different dimensions, e.g, RSA, CKA
args: xr.DataArray: X xr.DataArray: Y
returns: xr.DataArray: Y_pred xr.DataArray: Y_true
Expand source code
def fit_transform( self, X: xr.DataArray, Y: xr.DataArray, ceiling: bool = False, ): if ceiling: logging.log("ceiling not supported for IdentityMap yet") # TODO: figure out how to handle NaNs better... if self._nan_strategy == "drop": X_clean = X.copy(deep=True).dropna(dim="neuroid") Y_clean = Y.copy(deep=True).dropna(dim="neuroid") elif self._nan_strategy == "impute": X_clean = X.copy(deep=True).fillna(0) Y_clean = Y.copy(deep=True).fillna(0) else: raise NotImplementedError("unsupported nan strategy.") return X_clean, Y_clean
class LearnedMap (mapping_class: Union[str, Tuple[Callable, Mapping[str, Any]]], random_seed: int = 42, k_fold: int = 5, strat_coord: str = None, num_split_groups_out: int = None, split_coord: str = None, **kwargs)
-
object that defines and applies map between two xarrays with the same number of samples
Initializes a Mapping object that describes a mapping between two encoder representations.
Args
mapping_class
:typing.Union[str, typing.Any], required
- [description]. This Class will be instatiated to get a mapping model. E.g. LinearRegression, Ridge, from the sklearn family. Must implement <?classifier> interface
random_seed
:int
, optional- [description]. Defaults to 42.
k_fold
:int
, optional- [description]. Defaults to 5.
strat_coord
:str
, optional- [description]. Defaults to None.
num_split_groups_out
:int
, optional- [description]. Defaults to None.
split_coord
:str
, optional- [description]. Defaults to None.
Expand source code
class LearnedMap(_Mapping): def __init__( self, mapping_class: typing.Union[ str, typing.Tuple[typing.Callable, typing.Mapping[str, typing.Any]] ], random_seed: int = 42, k_fold: int = 5, strat_coord: str = None, num_split_groups_out: int = None, # (p, the # of groups in the test split) split_coord: str = None, # (grouping coord) # TODO # handle predict held-out subject # but then we have to do mean over ROIs # because individual neuroids do not correspond # we kind of already have this along the `sampleid` coordinate, but we # need to implement this in the neuroid coordinate **kwargs, ) -> "LearnedMap": """ Initializes a Mapping object that describes a mapping between two encoder representations. Args: mapping_class (typing.Union[str, typing.Any], required): [description]. This Class will be instatiated to get a mapping model. E.g. LinearRegression, Ridge, from the sklearn family. Must implement <?classifier> interface random_seed (int, optional): [description]. Defaults to 42. k_fold (int, optional): [description]. Defaults to 5. strat_coord (str, optional): [description]. Defaults to None. num_split_groups_out (int, optional): [description]. Defaults to None. split_coord (str, optional): [description]. Defaults to None. """ self.random_seed = random_seed self.k_fold = k_fold self.strat_coord = strat_coord self.num_split_groups_out = num_split_groups_out self.split_coord = split_coord self.mapping_class_name = mapping_class self.mapping_params = kwargs if type(mapping_class) == str: _mapping_class, _kwargs = mapping_classes_params[self.mapping_class_name] self.mapping_params.update(_kwargs) # in the spirit of duck-typing, we don't need any of these checks. we will automatically # fail if we're missing any of these attributes # else: # assert callable(mapping_class) # assert hasattr(mapping_class(), "fit") # assert hasattr(mapping_class(), "predict") # TODO: what is the difference between these two (model; full_model)? let's make this less # confusing self.full_model = _mapping_class(**self.mapping_params) self.model = _mapping_class(**self.mapping_params) logging.log(f"initialized Mapping with {type(self.model)}!") @staticmethod def _construct_splits( xr_dataset: xr.Dataset, strat_coord: str, k_folds: int, split_coord: str, num_split_groups_out: int, random_seed: int, ): from sklearn.model_selection import ( GroupKFold, KFold, StratifiedGroupKFold, StratifiedKFold, ) sampleid = xr_dataset.sampleid.values if strat_coord and split_coord: kf = StratifiedGroupKFold( n_splits=k_folds, shuffle=True, random_state=random_seed ) split = partial( kf.split, sampleid, y=xr_dataset[split_coord].values, groups=xr_dataset[strat_coord].values, ) elif split_coord: kf = GroupKFold(n_splits=k_folds) split = partial(kf.split, sampleid, groups=xr_dataset[split_coord].values) elif strat_coord: kf = StratifiedKFold( n_splits=k_folds, shuffle=True, random_state=random_seed ) split = partial(kf.split, sampleid, y=xr_dataset[strat_coord].values) else: kf = KFold(n_splits=k_folds, shuffle=True, random_state=random_seed) split = partial(kf.split, sampleid) logging.log(f"running {type(kf)}!", verbosity_check=True) return split() def construct_splits(self, A): return self._construct_splits( A, self.strat_coord, self.k_fold, self.split_coord, self.num_split_groups_out, random_seed=self.random_seed, ) def fit_full(self, X, Y): # TODO self.fit(X, Y, k_folds=1) raise NotImplemented def _check_sampleids( self, X: xr.DataArray, Y: xr.DataArray, ): """ checks that the sampleids in X and Y are the same """ if X.sampleid.values.shape != Y.sampleid.values.shape: raise ValueError("X and Y sampleid shapes do not match!") if not np.all(X.sampleid.values == Y.sampleid.values): raise ValueError("X and Y sampleids do not match!") logging.log( f"Passed sampleid check for neuroid {Y.neuroid.values}", verbosity_check=True, ) def _drop_na( self, X: xr.DataArray, Y: xr.DataArray, dim: str = "sampleid", **kwargs ): """ drop samples with missing values (based on Y) in X or Y along specified dimension Make sure that X and Y now have the same sampleids """ # limit data to current neuroid, and then drop the samples that are missing data for this neuroid Y_slice = Y.dropna(dim=dim, **kwargs) Y_filtered_ids = Y_slice[dim].values assert set(Y_filtered_ids).issubset(set(X[dim].values)) logging.log( f"for neuroid {Y_slice.neuroid.values}, we used {(num_retained := len(Y_filtered_ids))}" f" samples; dropped {len(Y[dim]) - num_retained}", verbosity_check=True, ) # use only the samples that are in Y X_slice = X.sel(sampleid=Y_filtered_ids) return X_slice, Y_slice # def _permute_X( # self, # X: xr.DataArray, # method: str = "shuffle_X_rows", # random_state: int = 42, # ): # """Permute the features of X. # # Parameters # ---------- # X : xr.DataArray # The embeddings to be permuted # method : str # The method to use for permutation. # 'shuffle_X_rows' : Shuffle the rows of X (=shuffle the sentences and create a mismatch between the sentence embeddings and target) # 'shuffle_each_X_col': For each column (=feature/unit) of X, permute that feature's values across all sentences. # Retains the statistics of the original features (e.g., mean per feature) but the values of the features are shuffled for each sentence. # random_state : int # The seed for the random number generator. # # Returns # ------- # xr.DataArray # The permuted dataarray # """ # # X_orig = X.copy(deep=True) # # if logging.get_verbosity(): # logging.log(f"OBS: permuting X with method {method}") # # if method == "shuffle_X_rows": # X = X.sample( # n=X.shape[1], random_state=random_state # ) # check whether X_shape is correct # # elif method == "shuffle_each_X_col": # np.random.seed(random_state) # for feat in X.data.shape[0]: # per neuroid # np.random.shuffle(X.data[feat, :]) # # else: # raise ValueError(f"Invalid method: {method}") # # assert X.shape == X_orig.shape # assert np.all(X.data != X_orig.data) # # return X def fit_transform( self, X: xr.DataArray, Y: xr.DataArray, # permute_X: typing.Union[bool, str] = False, ceiling: bool = False, ceiling_coord: str = "subject", ) -> typing.Tuple[xr.DataArray, xr.DataArray]: """creates a mapping model using k-fold cross-validation -> uses params from the class initialization, uses strat_coord and split_coord to stratify and split across group boundaries Returns: [type]: [description] """ from sklearn.random_projection import GaussianRandomProjection if ceiling: n_neuroids = X.neuroid.values.size X = Y.copy() logging.log(f"X shape: {X.data.shape}", verbosity_check=True) logging.log(f"Y shape: {Y.data.shape}", verbosity_check=True) if self.strat_coord: try: assert (X[self.strat_coord].values == Y[self.strat_coord].values).all() except AssertionError as e: raise ValueError( f"{self.strat_coord} coordinate does not align across X and Y" ) if self.split_coord: try: assert (X[self.split_coord].values == Y[self.split_coord].values).all() except AssertionError as e: raise ValueError( f"{self.split_coord} coordinate does not align across X and Y" ) def fit_per_neuroid(neuroid): Y_neuroid = Y.sel(neuroid=neuroid) # limit data to current neuroid, and then drop the samples that are missing data for this neuroid X_slice, Y_slice = self._drop_na(X, Y_neuroid, dim="sampleid") # Assert that X and Y have the same sampleids self._check_sampleids(X_slice, Y_slice) # select relevant ceiling split if ceiling: X_slice = X_slice.isel( neuroid=X_slice[ceiling_coord] != Y_slice[ceiling_coord] ).dropna(dim="neuroid") # We can perform various sanity checks by 'permuting' the source, X # NOTE this is a test! do not use under normal workflow! # if permute_X: # logging.log( # f"`permute_X` flag is enabled. only do this in an adversarial setting.", # cmap="WARN", # type="WARN", # verbosity_check=True, # ) # X_slice = self._permute_X(X_slice, method=permute_X) # these collections store each split for our records later # TODO we aren't saving this to the object instance yet train_indices = [] test_indices = [] # only used in case of ridge_cv or any duck type that uses an alpha hparam splits = self.construct_splits(Y_slice) # X_test_collection = [] Y_test_collection = [] Y_pred_collection = [] for cvfoldid, (train_index, test_index) in enumerate(splits): train_indices.append(train_index) test_indices.append(test_index) # !! NOTE the _nan_removed variants instead of X and Y X_train, X_test = ( X_slice.sel(sampleid=Y_slice.sampleid.values[train_index]), X_slice.sel(sampleid=Y_slice.sampleid.values[test_index]), ) y_train, y_test = ( Y_slice.sel(sampleid=Y_slice.sampleid.values[train_index]), Y_slice.sel(sampleid=Y_slice.sampleid.values[test_index]), ) # empty list to house the y_predictions per timeid y_pred_over_time = [] for timeid in y_train.timeid: # TODO: change this code for models that also have a non-singleton timeid # i.e., output evolves in time (RNN?) x_model_train = X_train.sel(timeid=0).values y_model_train = y_train.sel(timeid=timeid).values.reshape(-1, 1) if ceiling and x_model_train.shape[1] > n_neuroids: projection = GaussianRandomProjection( n_components=n_neuroids, random_state=0 ) x_model_train = projection.fit_transform(x_model_train) self.model.fit( x_model_train, y_model_train, ) # store the hparam values related to the fitted models alpha = getattr(self.model, "alpha_", np.nan) # deepcopy `y_test` as `y_pred` to inherit some of the metadata and dims # and then populate it with our new predicted values y_pred = ( y_test.sel(timeid=timeid) .copy(deep=True) .expand_dims("timeid", 1) ) x_model_test = X_test.sel(timeid=0) if ceiling and x_model_train.shape[1] > n_neuroids: x_model_test = projection.transform(x_model_test) y_pred.data = self.model.predict(x_model_test) # y_pred y_pred = y_pred.assign_coords(timeid=("timeid", [timeid])) y_pred = y_pred.assign_coords(alpha=("timeid", [alpha])) y_pred = y_pred.assign_coords(cvfoldid=("timeid", [cvfoldid])) y_pred_over_time.append(y_pred) y_pred_over_time = xr.concat(y_pred_over_time, dim="timeid") Y_pred_collection.append(y_pred_over_time) Y_test_collection.append(y_test) Y_test = xr.concat(Y_test_collection, dim="sampleid").sortby("sampleid") Y_pred = xr.concat(Y_pred_collection, dim="sampleid").sortby("sampleid") # test.append(Y_test) # pred.append(Y_pred) return Y_test, Y_pred # Loop across each Y neuroid (target) test = [] pred = [] # TODO: parallelize using order-preserving joblib-mapping # for neuroid in tqdm(Y.neuroid.values, desc="fitting a model per neuroid"): for t, p in Parallel(n_jobs=-2)( delayed(fit_per_neuroid)(neuroid) for neuroid in tqdm(Y.neuroid.values, desc="fitting a model per neuroid") ): test += [t] pred += [p] test_xr = xr.concat(test, dim="neuroid").transpose( "sampleid", "neuroid", "timeid" ) pred_xr = xr.concat(pred, dim="neuroid").transpose( "sampleid", "neuroid", "timeid" ) if test_xr.stimulus.ndim > 1: test_xr = collapse_multidim_coord(test_xr, "stimulus", "sampleid") if pred_xr.stimulus.ndim > 1: pred_xr = collapse_multidim_coord(pred_xr, "stimulus", "sampleid") return pred_xr, test_xr # def map(self, source, target) -> None: # ''' # the works: constructs splits, fits models for each split, then evaluates the fit # of each split and returns the result (also for each split) # ''' # pass def save_model(self) -> None: """TODO: stuff that needs to be saved eventually - model weights - CV stuff (if using CV); but all arguments needed for initializing, in general - n_splits - random_state - split indices (based on random seed) - params per split (alpha, lambda) - validation score, etc. for each CV split? """ pass def predict(self, source) -> None: pass
Ancestors
- langbrainscore.interface.mapping._Mapping
- langbrainscore.interface.cacheable._Cacheable
- typing.Protocol
- typing.Generic
- abc.ABC
Methods
def construct_splits(self, A)
-
Expand source code
def construct_splits(self, A): return self._construct_splits( A, self.strat_coord, self.k_fold, self.split_coord, self.num_split_groups_out, random_seed=self.random_seed, )
def fit_full(self, X, Y)
-
Expand source code
def fit_full(self, X, Y): # TODO self.fit(X, Y, k_folds=1) raise NotImplemented
def fit_transform(self, X: xarray.core.dataarray.DataArray, Y: xarray.core.dataarray.DataArray, ceiling: bool = False, ceiling_coord: str = 'subject') ‑> Tuple[xarray.core.dataarray.DataArray, xarray.core.dataarray.DataArray]
-
creates a mapping model using k-fold cross-validation -> uses params from the class initialization, uses strat_coord and split_coord to stratify and split across group boundaries
Returns
[type]
- [description]
Expand source code
def fit_transform( self, X: xr.DataArray, Y: xr.DataArray, # permute_X: typing.Union[bool, str] = False, ceiling: bool = False, ceiling_coord: str = "subject", ) -> typing.Tuple[xr.DataArray, xr.DataArray]: """creates a mapping model using k-fold cross-validation -> uses params from the class initialization, uses strat_coord and split_coord to stratify and split across group boundaries Returns: [type]: [description] """ from sklearn.random_projection import GaussianRandomProjection if ceiling: n_neuroids = X.neuroid.values.size X = Y.copy() logging.log(f"X shape: {X.data.shape}", verbosity_check=True) logging.log(f"Y shape: {Y.data.shape}", verbosity_check=True) if self.strat_coord: try: assert (X[self.strat_coord].values == Y[self.strat_coord].values).all() except AssertionError as e: raise ValueError( f"{self.strat_coord} coordinate does not align across X and Y" ) if self.split_coord: try: assert (X[self.split_coord].values == Y[self.split_coord].values).all() except AssertionError as e: raise ValueError( f"{self.split_coord} coordinate does not align across X and Y" ) def fit_per_neuroid(neuroid): Y_neuroid = Y.sel(neuroid=neuroid) # limit data to current neuroid, and then drop the samples that are missing data for this neuroid X_slice, Y_slice = self._drop_na(X, Y_neuroid, dim="sampleid") # Assert that X and Y have the same sampleids self._check_sampleids(X_slice, Y_slice) # select relevant ceiling split if ceiling: X_slice = X_slice.isel( neuroid=X_slice[ceiling_coord] != Y_slice[ceiling_coord] ).dropna(dim="neuroid") # We can perform various sanity checks by 'permuting' the source, X # NOTE this is a test! do not use under normal workflow! # if permute_X: # logging.log( # f"`permute_X` flag is enabled. only do this in an adversarial setting.", # cmap="WARN", # type="WARN", # verbosity_check=True, # ) # X_slice = self._permute_X(X_slice, method=permute_X) # these collections store each split for our records later # TODO we aren't saving this to the object instance yet train_indices = [] test_indices = [] # only used in case of ridge_cv or any duck type that uses an alpha hparam splits = self.construct_splits(Y_slice) # X_test_collection = [] Y_test_collection = [] Y_pred_collection = [] for cvfoldid, (train_index, test_index) in enumerate(splits): train_indices.append(train_index) test_indices.append(test_index) # !! NOTE the _nan_removed variants instead of X and Y X_train, X_test = ( X_slice.sel(sampleid=Y_slice.sampleid.values[train_index]), X_slice.sel(sampleid=Y_slice.sampleid.values[test_index]), ) y_train, y_test = ( Y_slice.sel(sampleid=Y_slice.sampleid.values[train_index]), Y_slice.sel(sampleid=Y_slice.sampleid.values[test_index]), ) # empty list to house the y_predictions per timeid y_pred_over_time = [] for timeid in y_train.timeid: # TODO: change this code for models that also have a non-singleton timeid # i.e., output evolves in time (RNN?) x_model_train = X_train.sel(timeid=0).values y_model_train = y_train.sel(timeid=timeid).values.reshape(-1, 1) if ceiling and x_model_train.shape[1] > n_neuroids: projection = GaussianRandomProjection( n_components=n_neuroids, random_state=0 ) x_model_train = projection.fit_transform(x_model_train) self.model.fit( x_model_train, y_model_train, ) # store the hparam values related to the fitted models alpha = getattr(self.model, "alpha_", np.nan) # deepcopy `y_test` as `y_pred` to inherit some of the metadata and dims # and then populate it with our new predicted values y_pred = ( y_test.sel(timeid=timeid) .copy(deep=True) .expand_dims("timeid", 1) ) x_model_test = X_test.sel(timeid=0) if ceiling and x_model_train.shape[1] > n_neuroids: x_model_test = projection.transform(x_model_test) y_pred.data = self.model.predict(x_model_test) # y_pred y_pred = y_pred.assign_coords(timeid=("timeid", [timeid])) y_pred = y_pred.assign_coords(alpha=("timeid", [alpha])) y_pred = y_pred.assign_coords(cvfoldid=("timeid", [cvfoldid])) y_pred_over_time.append(y_pred) y_pred_over_time = xr.concat(y_pred_over_time, dim="timeid") Y_pred_collection.append(y_pred_over_time) Y_test_collection.append(y_test) Y_test = xr.concat(Y_test_collection, dim="sampleid").sortby("sampleid") Y_pred = xr.concat(Y_pred_collection, dim="sampleid").sortby("sampleid") # test.append(Y_test) # pred.append(Y_pred) return Y_test, Y_pred # Loop across each Y neuroid (target) test = [] pred = [] # TODO: parallelize using order-preserving joblib-mapping # for neuroid in tqdm(Y.neuroid.values, desc="fitting a model per neuroid"): for t, p in Parallel(n_jobs=-2)( delayed(fit_per_neuroid)(neuroid) for neuroid in tqdm(Y.neuroid.values, desc="fitting a model per neuroid") ): test += [t] pred += [p] test_xr = xr.concat(test, dim="neuroid").transpose( "sampleid", "neuroid", "timeid" ) pred_xr = xr.concat(pred, dim="neuroid").transpose( "sampleid", "neuroid", "timeid" ) if test_xr.stimulus.ndim > 1: test_xr = collapse_multidim_coord(test_xr, "stimulus", "sampleid") if pred_xr.stimulus.ndim > 1: pred_xr = collapse_multidim_coord(pred_xr, "stimulus", "sampleid") return pred_xr, test_xr
def predict(self, source) ‑> None
-
Expand source code
def predict(self, source) -> None: pass
def save_model(self) ‑> None
-
TODO: stuff that needs to be saved eventually
- model weights
- CV stuff (if using CV); but all arguments needed for initializing, in general
- n_splits
- random_state
- split indices (based on random seed)
- params per split (alpha, lambda)
- validation score, etc. for each CV split?
Expand source code
def save_model(self) -> None: """TODO: stuff that needs to be saved eventually - model weights - CV stuff (if using CV); but all arguments needed for initializing, in general - n_splits - random_state - split indices (based on random seed) - params per split (alpha, lambda) - validation score, etc. for each CV split? """ pass