Module langbrainscore.encoder.ann
Expand source code
import typing
from enum import unique
import os
import numpy as np
import torch
from tqdm import tqdm
import xarray as xr
from langbrainscore.dataset import Dataset
from langbrainscore.interface import EncoderRepresentations, _ModelEncoder
from langbrainscore.utils.encoder import (
aggregate_layers,
cos_sim_matrix,
count_zero_threshold_values,
flatten_activations_per_sample,
get_context_groups,
get_torch_device,
pick_matching_token_ixs,
postprocess_activations,
repackage_flattened_activations,
encode_stimuli_in_context,
)
from langbrainscore.utils.logging import log
from langbrainscore.utils.xarray import copy_metadata, fix_xr_dtypes
from langbrainscore.utils.resources import model_classes, config_name_mappings
os.environ["TOKENIZERS_PARALLELISM"] = "true"
class HuggingFaceEncoder(_ModelEncoder):
def __init__(
self,
model_id,
emb_aggregation: typing.Union[str, None, typing.Callable],
device=get_torch_device(),
context_dimension: str = None,
bidirectional: bool = False,
emb_preproc: typing.Tuple[str] = (),
include_special_tokens: bool = True,
):
"""
Args:
model_id (str): the model id
device (None, ?): the device to use
context_dimension (str, optional): the dimension to use for extracting strings using context.
if None, each sampleid (stimuli) will be treated as a single context group.
if a string is specified, the string must refer to the name of a dimension in the xarray-like dataset
object (langbrainscore.dataset.Dataset) that provides groupings of sampleids (stimuli) that should be
used as context when generating encoder representations [default: None].
bidirectional (bool): whether to use bidirectional encoder (i.e., access both forward and backward context)
[default: False]
emb_aggregation (typing.Union[str, None, typing.Callable], optional): how to aggregate the hidden states of
the encoder representations for each sampleid (stimuli). [default: "last"]
emb_preproc (tuple): a list of strings specifying preprocessing functions to apply to the aggregated embeddings.
Processing is performed layer-wise.
include_special_tokens (bool): whether to include special tokens in the encoder representations.
"""
super().__init__(
model_id,
_context_dimension=context_dimension,
_bidirectional=bidirectional,
_emb_aggregation=emb_aggregation,
_emb_preproc=emb_preproc,
_include_special_tokens=include_special_tokens,
)
from transformers import AutoConfig, AutoModel, AutoTokenizer
from transformers import logging as transformers_logging
transformers_logging.set_verbosity_error()
self.device = device or get_torch_device()
self.config = AutoConfig.from_pretrained(self._model_id)
self.tokenizer = AutoTokenizer.from_pretrained(
self._model_id, multiprocessing=True
)
self.model = AutoModel.from_pretrained(self._model_id, config=self.config)
try:
self.model = self.model.to(self.device)
except RuntimeError:
self.device = "cpu"
self.model = self.model.to(self.device)
def get_encoder_representations_template(
self, dataset=None, representations=xr.DataArray()
) -> EncoderRepresentations:
"""
returns an empty `EncoderRepresentations` object with all the appropriate
attributes but the `dataset` and `representations` missing and to be filled in
later.
"""
return EncoderRepresentations(
dataset=dataset,
representations=representations,
model_id=self._model_id,
context_dimension=self._context_dimension,
bidirectional=self._bidirectional,
emb_aggregation=self._emb_aggregation,
emb_preproc=self._emb_preproc,
include_special_tokens=self._include_special_tokens,
)
def encode(
self,
dataset: Dataset,
read_cache: bool = True, # avoid recomputing if cached `EncoderRepresentations` exists, recompute if not
write_cache: bool = True, # dump the result of this computation to cache?
) -> EncoderRepresentations:
"""
Input a langbrainscore Dataset, encode the stimuli according to the parameters specified in init, and return
the an xarray DataArray of aggregated representations for each stimulus.
Args:
dataset (langbrainscore.dataset.DataSet): [description]
read_cache (bool): Avoid recomputing if cached `EncoderRepresentations` exists, recompute if not
write_cache (bool): Dump and write the result of the computed encoder representations to cache
Raises:
NotImplementedError: [description]
ValueError: [description]
Returns:
[type]: [description]
"""
# before computing the representations from scratch, we will first see if any
# cached representations exist already.
if read_cache:
to_check_in_cache: EncoderRepresentations = (
self.get_encoder_representations_template(dataset=dataset)
)
try:
to_check_in_cache.load_cache()
return to_check_in_cache
except FileNotFoundError:
log(
f"couldn't load cached reprs for {to_check_in_cache.identifier_string}; recomputing.",
cmap="WARN",
type="WARN",
)
self.model.eval()
stimuli = dataset.stimuli.values
# Initialize the context group coordinate (obtain embeddings with context)
context_groups = get_context_groups(dataset, self._context_dimension)
# list for storing activations for each stimulus with all layers flattened
# list for storing layer ids ([0 0 0 0 ... 1 1 1 ...]) indicating which layer each
# neuroid (representation dimension) came from
flattened_activations, layer_ids = [], []
###############################################################################
# ALL SAMPLES LOOP
###############################################################################
_, unique_ixs = np.unique(context_groups, return_index=True)
# Make sure context group order is preserved
for group in tqdm(context_groups[np.sort(unique_ixs)], desc="Encoding stimuli"):
# Mask based on the context group
mask_context = context_groups == group
stimuli_in_context = stimuli[mask_context]
# store model states for each stimulus in this context group
encoded_stimuli = []
###############################################################################
# CONTEXT LOOP
###############################################################################
for encoded_stim in encode_stimuli_in_context(
stimuli_in_context=stimuli_in_context,
tokenizer=self.tokenizer,
model=self.model,
bidirectional=self._bidirectional,
include_special_tokens=self._include_special_tokens,
emb_aggregation=self._emb_aggregation,
device=self.device,
):
encoded_stimuli += [encoded_stim]
###############################################################################
# END CONTEXT LOOP
###############################################################################
# Flatten activations across layers and package as xarray
flattened_activations_and_layer_ids = [
*map(flatten_activations_per_sample, encoded_stimuli)
]
for f_as, l_ids in flattened_activations_and_layer_ids:
flattened_activations += [f_as]
layer_ids += [l_ids]
assert len(f_as) == len(l_ids) # Assert all layer lists are equal
###############################################################################
# END ALL SAMPLES LOOP
###############################################################################
# Stack flattened activations and layer ids to obtain [n_samples, emb_din * n_layers]
activations_2d = np.vstack(flattened_activations)
layer_ids_1d = np.squeeze(np.unique(np.vstack(layer_ids), axis=0))
# Post-process activations after obtaining them (or "pre-process" them before computing brainscore)
if len(self._emb_preproc) > 0:
for mode in self._emb_preproc:
activations_2d, layer_ids_1d = postprocess_activations(
activations_2d=activations_2d,
layer_ids_1d=layer_ids_1d,
emb_preproc_mode=mode,
)
assert activations_2d.shape[1] == len(layer_ids_1d)
assert activations_2d.shape[0] == len(stimuli)
# Package activations as xarray and reapply metadata
encoded_dataset: xr.DataArray = repackage_flattened_activations(
activations_2d=activations_2d,
layer_ids_1d=layer_ids_1d,
dataset=dataset,
)
encoded_dataset: xr.DataArray = copy_metadata(
encoded_dataset,
dataset.contents,
"sampleid",
)
to_return: EncoderRepresentations = self.get_encoder_representations_template()
to_return.dataset = dataset
to_return.representations = fix_xr_dtypes(encoded_dataset)
if write_cache:
to_return.to_cache(overwrite=True)
return to_return
def get_modelcard(self):
"""
Returns the model card of the model (model-wise, and not layer-wise)
"""
model_classes = [
"gpt",
"bert",
] # continuously update based on new model classes supported
# based on the model_id, figure out which model class it is
model_class = [x for x in model_classes if x in self._model_id][0]
assert model_class is not None, f"model_id {self._model_id} not supported"
config_specs_of_interest = config_name_mappings[model_class]
model_specs = {}
for (
k_spec,
v_spec,
) in (
config_specs_of_interest.items()
): # key is the name we want to use in the model card,
# value is the name in the config
if v_spec is not None:
model_specs[k_spec] = getattr(self.config, v_spec)
else:
model_specs[k_spec] = None
self.model_specs = model_specs
return model_specs
class PTEncoder(_ModelEncoder):
def __init__(self, model_id: str) -> "PTEncoder":
super().__init__(model_id)
def encode(self, dataset: "langbrainscore.dataset.Dataset") -> xr.DataArray:
# TODO
...
class EncoderCheck:
"""
Class for checking whether obtained embeddings from the Encoder class are correct and similar to other encoder objects.
"""
def __init__(
self,
):
pass
def _load_cached_activations(self, encoded_ann_identifier: str):
raise NotImplementedError
def similiarity_metric_across_layers(
self,
sim_metric: str = "tol",
enc1: xr.DataArray = None,
enc2: xr.DataArray = None,
tol: float = 1e-8,
threshold: float = 1e-4,
) -> bool:
"""
Given two activations, iterate across layers and check np.allclose using different tolerance levels.
Parameters:
sim_metric: str
Similarity metric to use.
enc1: xr.DataArray
First encoder activations.
enc2: xr.DataArray
Second encoder activations.
tol: float
Tolerance level to start at (we will iterate upwards the tolerance level). Default is 1e-8.
Returns:
bool: whether the tolerance level was met (True) or not (False)
bad_stim: set of stimuli indices that did not meet tolerance level `threshold` (if any)
"""
# First check is whether number of layers / shapes match
assert enc1.shape == enc2.shape
assert (
enc1.sampleid.values == enc2.sampleid.values
).all() # ensure that we are looking at the same stimuli
layer_ids = enc1.layer.values
_, unique_ixs = np.unique(layer_ids, return_index=True)
print(f"\n\nChecking similarity across layers using sim_metric: {sim_metric}")
all_good = True
bad_stim = set() # store indices of stimuli that are not similar
# Iterate across layers
for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]):
enc1_layer = enc1.isel(neuroid=(enc1.layer == layer_id)) # .squeeze()
enc2_layer = enc2.isel(neuroid=(enc2.layer == layer_id)) # .squeeze()
# Check whether values match. If not, iteratively increase tolerance until values match
if sim_metric in ("tol", "diff"):
abs_diff = np.abs(enc1_layer - enc2_layer)
abs_diff_per_stim = np.max(
abs_diff, axis=1
) # Obtain the biggest difference aross neuroids (units)
while (abs_diff_per_stim > tol).all():
tol *= 10
elif "cos" in sim_metric:
# Check cosine distance between each row, e.g., sentence vector
cos_sim = cos_sim_matrix(enc1_layer, enc2_layer)
cos_dist = (
1 - cos_sim
) # 0 means identical, 1 means orthogonal, 2 means opposite
# We still want this as close to zero as possible for similar vectors.
cos_dist_abs = np.abs(cos_dist)
abs_diff_per_stim = cos_dist_abs
# Check how close the cosine distance is to 0
while (cos_dist_abs > tol).all():
tol *= 10
else:
raise NotImplementedError(f"Invalid `sim_metric`: {sim_metric}")
print(f"Layer {layer_id}: Similarity at tolerance: {tol:.3e}")
if tol > threshold:
print(f"WARNING: Low tolerance level")
all_good = False
bad_stim.update(
enc1.sampleid[np.where(abs_diff_per_stim > tol)[0]]
) # get sampleids of stimuli that are not similar
return all_good, bad_stim
Classes
class EncoderCheck
-
Class for checking whether obtained embeddings from the Encoder class are correct and similar to other encoder objects.
Expand source code
class EncoderCheck: """ Class for checking whether obtained embeddings from the Encoder class are correct and similar to other encoder objects. """ def __init__( self, ): pass def _load_cached_activations(self, encoded_ann_identifier: str): raise NotImplementedError def similiarity_metric_across_layers( self, sim_metric: str = "tol", enc1: xr.DataArray = None, enc2: xr.DataArray = None, tol: float = 1e-8, threshold: float = 1e-4, ) -> bool: """ Given two activations, iterate across layers and check np.allclose using different tolerance levels. Parameters: sim_metric: str Similarity metric to use. enc1: xr.DataArray First encoder activations. enc2: xr.DataArray Second encoder activations. tol: float Tolerance level to start at (we will iterate upwards the tolerance level). Default is 1e-8. Returns: bool: whether the tolerance level was met (True) or not (False) bad_stim: set of stimuli indices that did not meet tolerance level `threshold` (if any) """ # First check is whether number of layers / shapes match assert enc1.shape == enc2.shape assert ( enc1.sampleid.values == enc2.sampleid.values ).all() # ensure that we are looking at the same stimuli layer_ids = enc1.layer.values _, unique_ixs = np.unique(layer_ids, return_index=True) print(f"\n\nChecking similarity across layers using sim_metric: {sim_metric}") all_good = True bad_stim = set() # store indices of stimuli that are not similar # Iterate across layers for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]): enc1_layer = enc1.isel(neuroid=(enc1.layer == layer_id)) # .squeeze() enc2_layer = enc2.isel(neuroid=(enc2.layer == layer_id)) # .squeeze() # Check whether values match. If not, iteratively increase tolerance until values match if sim_metric in ("tol", "diff"): abs_diff = np.abs(enc1_layer - enc2_layer) abs_diff_per_stim = np.max( abs_diff, axis=1 ) # Obtain the biggest difference aross neuroids (units) while (abs_diff_per_stim > tol).all(): tol *= 10 elif "cos" in sim_metric: # Check cosine distance between each row, e.g., sentence vector cos_sim = cos_sim_matrix(enc1_layer, enc2_layer) cos_dist = ( 1 - cos_sim ) # 0 means identical, 1 means orthogonal, 2 means opposite # We still want this as close to zero as possible for similar vectors. cos_dist_abs = np.abs(cos_dist) abs_diff_per_stim = cos_dist_abs # Check how close the cosine distance is to 0 while (cos_dist_abs > tol).all(): tol *= 10 else: raise NotImplementedError(f"Invalid `sim_metric`: {sim_metric}") print(f"Layer {layer_id}: Similarity at tolerance: {tol:.3e}") if tol > threshold: print(f"WARNING: Low tolerance level") all_good = False bad_stim.update( enc1.sampleid[np.where(abs_diff_per_stim > tol)[0]] ) # get sampleids of stimuli that are not similar return all_good, bad_stim
Methods
def similiarity_metric_across_layers(self, sim_metric: str = 'tol', enc1: xarray.core.dataarray.DataArray = None, enc2: xarray.core.dataarray.DataArray = None, tol: float = 1e-08, threshold: float = 0.0001) ‑> bool
-
Given two activations, iterate across layers and check np.allclose using different tolerance levels.
Parameters: sim_metric: str Similarity metric to use. enc1: xr.DataArray First encoder activations. enc2: xr.DataArray Second encoder activations. tol: float Tolerance level to start at (we will iterate upwards the tolerance level). Default is 1e-8. Returns: bool: whether the tolerance level was met (True) or not (False) bad_stim: set of stimuli indices that did not meet tolerance level <code>threshold</code> (if any)
Expand source code
def similiarity_metric_across_layers( self, sim_metric: str = "tol", enc1: xr.DataArray = None, enc2: xr.DataArray = None, tol: float = 1e-8, threshold: float = 1e-4, ) -> bool: """ Given two activations, iterate across layers and check np.allclose using different tolerance levels. Parameters: sim_metric: str Similarity metric to use. enc1: xr.DataArray First encoder activations. enc2: xr.DataArray Second encoder activations. tol: float Tolerance level to start at (we will iterate upwards the tolerance level). Default is 1e-8. Returns: bool: whether the tolerance level was met (True) or not (False) bad_stim: set of stimuli indices that did not meet tolerance level `threshold` (if any) """ # First check is whether number of layers / shapes match assert enc1.shape == enc2.shape assert ( enc1.sampleid.values == enc2.sampleid.values ).all() # ensure that we are looking at the same stimuli layer_ids = enc1.layer.values _, unique_ixs = np.unique(layer_ids, return_index=True) print(f"\n\nChecking similarity across layers using sim_metric: {sim_metric}") all_good = True bad_stim = set() # store indices of stimuli that are not similar # Iterate across layers for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]): enc1_layer = enc1.isel(neuroid=(enc1.layer == layer_id)) # .squeeze() enc2_layer = enc2.isel(neuroid=(enc2.layer == layer_id)) # .squeeze() # Check whether values match. If not, iteratively increase tolerance until values match if sim_metric in ("tol", "diff"): abs_diff = np.abs(enc1_layer - enc2_layer) abs_diff_per_stim = np.max( abs_diff, axis=1 ) # Obtain the biggest difference aross neuroids (units) while (abs_diff_per_stim > tol).all(): tol *= 10 elif "cos" in sim_metric: # Check cosine distance between each row, e.g., sentence vector cos_sim = cos_sim_matrix(enc1_layer, enc2_layer) cos_dist = ( 1 - cos_sim ) # 0 means identical, 1 means orthogonal, 2 means opposite # We still want this as close to zero as possible for similar vectors. cos_dist_abs = np.abs(cos_dist) abs_diff_per_stim = cos_dist_abs # Check how close the cosine distance is to 0 while (cos_dist_abs > tol).all(): tol *= 10 else: raise NotImplementedError(f"Invalid `sim_metric`: {sim_metric}") print(f"Layer {layer_id}: Similarity at tolerance: {tol:.3e}") if tol > threshold: print(f"WARNING: Low tolerance level") all_good = False bad_stim.update( enc1.sampleid[np.where(abs_diff_per_stim > tol)[0]] ) # get sampleids of stimuli that are not similar return all_good, bad_stim
class HuggingFaceEncoder (model_id, emb_aggregation: Union[str, None, Callable], device=device(type='cpu'), context_dimension: str = None, bidirectional: bool = False, emb_preproc: Tuple[str] = (), include_special_tokens: bool = True)
-
Interface for *Encoder classes. Must implement an
encode
method that operates on a Dataset object.Args
model_id
:str
- the model id
- device (None, ?): the device to use
context_dimension
:str
, optional- the dimension to use for extracting strings using context. if None, each sampleid (stimuli) will be treated as a single context group. if a string is specified, the string must refer to the name of a dimension in the xarray-like dataset object (langbrainscore.dataset.Dataset) that provides groupings of sampleids (stimuli) that should be used as context when generating encoder representations [default: None].
bidirectional
:bool
- whether to use bidirectional encoder (i.e., access both forward and backward context) [default: False]
emb_aggregation
:typing.Union[str, None, typing.Callable]
, optional- how to aggregate the hidden states of the encoder representations for each sampleid (stimuli). [default: "last"]
emb_preproc
:tuple
- a list of strings specifying preprocessing functions to apply to the aggregated embeddings. Processing is performed layer-wise.
include_special_tokens
:bool
- whether to include special tokens in the encoder representations.
Expand source code
class HuggingFaceEncoder(_ModelEncoder): def __init__( self, model_id, emb_aggregation: typing.Union[str, None, typing.Callable], device=get_torch_device(), context_dimension: str = None, bidirectional: bool = False, emb_preproc: typing.Tuple[str] = (), include_special_tokens: bool = True, ): """ Args: model_id (str): the model id device (None, ?): the device to use context_dimension (str, optional): the dimension to use for extracting strings using context. if None, each sampleid (stimuli) will be treated as a single context group. if a string is specified, the string must refer to the name of a dimension in the xarray-like dataset object (langbrainscore.dataset.Dataset) that provides groupings of sampleids (stimuli) that should be used as context when generating encoder representations [default: None]. bidirectional (bool): whether to use bidirectional encoder (i.e., access both forward and backward context) [default: False] emb_aggregation (typing.Union[str, None, typing.Callable], optional): how to aggregate the hidden states of the encoder representations for each sampleid (stimuli). [default: "last"] emb_preproc (tuple): a list of strings specifying preprocessing functions to apply to the aggregated embeddings. Processing is performed layer-wise. include_special_tokens (bool): whether to include special tokens in the encoder representations. """ super().__init__( model_id, _context_dimension=context_dimension, _bidirectional=bidirectional, _emb_aggregation=emb_aggregation, _emb_preproc=emb_preproc, _include_special_tokens=include_special_tokens, ) from transformers import AutoConfig, AutoModel, AutoTokenizer from transformers import logging as transformers_logging transformers_logging.set_verbosity_error() self.device = device or get_torch_device() self.config = AutoConfig.from_pretrained(self._model_id) self.tokenizer = AutoTokenizer.from_pretrained( self._model_id, multiprocessing=True ) self.model = AutoModel.from_pretrained(self._model_id, config=self.config) try: self.model = self.model.to(self.device) except RuntimeError: self.device = "cpu" self.model = self.model.to(self.device) def get_encoder_representations_template( self, dataset=None, representations=xr.DataArray() ) -> EncoderRepresentations: """ returns an empty `EncoderRepresentations` object with all the appropriate attributes but the `dataset` and `representations` missing and to be filled in later. """ return EncoderRepresentations( dataset=dataset, representations=representations, model_id=self._model_id, context_dimension=self._context_dimension, bidirectional=self._bidirectional, emb_aggregation=self._emb_aggregation, emb_preproc=self._emb_preproc, include_special_tokens=self._include_special_tokens, ) def encode( self, dataset: Dataset, read_cache: bool = True, # avoid recomputing if cached `EncoderRepresentations` exists, recompute if not write_cache: bool = True, # dump the result of this computation to cache? ) -> EncoderRepresentations: """ Input a langbrainscore Dataset, encode the stimuli according to the parameters specified in init, and return the an xarray DataArray of aggregated representations for each stimulus. Args: dataset (langbrainscore.dataset.DataSet): [description] read_cache (bool): Avoid recomputing if cached `EncoderRepresentations` exists, recompute if not write_cache (bool): Dump and write the result of the computed encoder representations to cache Raises: NotImplementedError: [description] ValueError: [description] Returns: [type]: [description] """ # before computing the representations from scratch, we will first see if any # cached representations exist already. if read_cache: to_check_in_cache: EncoderRepresentations = ( self.get_encoder_representations_template(dataset=dataset) ) try: to_check_in_cache.load_cache() return to_check_in_cache except FileNotFoundError: log( f"couldn't load cached reprs for {to_check_in_cache.identifier_string}; recomputing.", cmap="WARN", type="WARN", ) self.model.eval() stimuli = dataset.stimuli.values # Initialize the context group coordinate (obtain embeddings with context) context_groups = get_context_groups(dataset, self._context_dimension) # list for storing activations for each stimulus with all layers flattened # list for storing layer ids ([0 0 0 0 ... 1 1 1 ...]) indicating which layer each # neuroid (representation dimension) came from flattened_activations, layer_ids = [], [] ############################################################################### # ALL SAMPLES LOOP ############################################################################### _, unique_ixs = np.unique(context_groups, return_index=True) # Make sure context group order is preserved for group in tqdm(context_groups[np.sort(unique_ixs)], desc="Encoding stimuli"): # Mask based on the context group mask_context = context_groups == group stimuli_in_context = stimuli[mask_context] # store model states for each stimulus in this context group encoded_stimuli = [] ############################################################################### # CONTEXT LOOP ############################################################################### for encoded_stim in encode_stimuli_in_context( stimuli_in_context=stimuli_in_context, tokenizer=self.tokenizer, model=self.model, bidirectional=self._bidirectional, include_special_tokens=self._include_special_tokens, emb_aggregation=self._emb_aggregation, device=self.device, ): encoded_stimuli += [encoded_stim] ############################################################################### # END CONTEXT LOOP ############################################################################### # Flatten activations across layers and package as xarray flattened_activations_and_layer_ids = [ *map(flatten_activations_per_sample, encoded_stimuli) ] for f_as, l_ids in flattened_activations_and_layer_ids: flattened_activations += [f_as] layer_ids += [l_ids] assert len(f_as) == len(l_ids) # Assert all layer lists are equal ############################################################################### # END ALL SAMPLES LOOP ############################################################################### # Stack flattened activations and layer ids to obtain [n_samples, emb_din * n_layers] activations_2d = np.vstack(flattened_activations) layer_ids_1d = np.squeeze(np.unique(np.vstack(layer_ids), axis=0)) # Post-process activations after obtaining them (or "pre-process" them before computing brainscore) if len(self._emb_preproc) > 0: for mode in self._emb_preproc: activations_2d, layer_ids_1d = postprocess_activations( activations_2d=activations_2d, layer_ids_1d=layer_ids_1d, emb_preproc_mode=mode, ) assert activations_2d.shape[1] == len(layer_ids_1d) assert activations_2d.shape[0] == len(stimuli) # Package activations as xarray and reapply metadata encoded_dataset: xr.DataArray = repackage_flattened_activations( activations_2d=activations_2d, layer_ids_1d=layer_ids_1d, dataset=dataset, ) encoded_dataset: xr.DataArray = copy_metadata( encoded_dataset, dataset.contents, "sampleid", ) to_return: EncoderRepresentations = self.get_encoder_representations_template() to_return.dataset = dataset to_return.representations = fix_xr_dtypes(encoded_dataset) if write_cache: to_return.to_cache(overwrite=True) return to_return def get_modelcard(self): """ Returns the model card of the model (model-wise, and not layer-wise) """ model_classes = [ "gpt", "bert", ] # continuously update based on new model classes supported # based on the model_id, figure out which model class it is model_class = [x for x in model_classes if x in self._model_id][0] assert model_class is not None, f"model_id {self._model_id} not supported" config_specs_of_interest = config_name_mappings[model_class] model_specs = {} for ( k_spec, v_spec, ) in ( config_specs_of_interest.items() ): # key is the name we want to use in the model card, # value is the name in the config if v_spec is not None: model_specs[k_spec] = getattr(self.config, v_spec) else: model_specs[k_spec] = None self.model_specs = model_specs return model_specs
Ancestors
- langbrainscore.interface.encoder._ModelEncoder
- langbrainscore.interface.encoder._Encoder
- langbrainscore.interface.cacheable._Cacheable
- typing.Protocol
- typing.Generic
- abc.ABC
Methods
def encode(self, dataset: Dataset, read_cache: bool = True, write_cache: bool = True) ‑> EncoderRepresentations
-
Input a langbrainscore Dataset, encode the stimuli according to the parameters specified in init, and return the an xarray DataArray of aggregated representations for each stimulus.
Args
dataset
:langbrainscore.dataset.DataSet
- [description]
read_cache
:bool
- Avoid recomputing if cached
EncoderRepresentations
exists, recompute if not write_cache
:bool
- Dump and write the result of the computed encoder representations to cache
Raises
NotImplementedError
- [description]
ValueError
- [description]
Returns
[type]
- [description]
Expand source code
def encode( self, dataset: Dataset, read_cache: bool = True, # avoid recomputing if cached `EncoderRepresentations` exists, recompute if not write_cache: bool = True, # dump the result of this computation to cache? ) -> EncoderRepresentations: """ Input a langbrainscore Dataset, encode the stimuli according to the parameters specified in init, and return the an xarray DataArray of aggregated representations for each stimulus. Args: dataset (langbrainscore.dataset.DataSet): [description] read_cache (bool): Avoid recomputing if cached `EncoderRepresentations` exists, recompute if not write_cache (bool): Dump and write the result of the computed encoder representations to cache Raises: NotImplementedError: [description] ValueError: [description] Returns: [type]: [description] """ # before computing the representations from scratch, we will first see if any # cached representations exist already. if read_cache: to_check_in_cache: EncoderRepresentations = ( self.get_encoder_representations_template(dataset=dataset) ) try: to_check_in_cache.load_cache() return to_check_in_cache except FileNotFoundError: log( f"couldn't load cached reprs for {to_check_in_cache.identifier_string}; recomputing.", cmap="WARN", type="WARN", ) self.model.eval() stimuli = dataset.stimuli.values # Initialize the context group coordinate (obtain embeddings with context) context_groups = get_context_groups(dataset, self._context_dimension) # list for storing activations for each stimulus with all layers flattened # list for storing layer ids ([0 0 0 0 ... 1 1 1 ...]) indicating which layer each # neuroid (representation dimension) came from flattened_activations, layer_ids = [], [] ############################################################################### # ALL SAMPLES LOOP ############################################################################### _, unique_ixs = np.unique(context_groups, return_index=True) # Make sure context group order is preserved for group in tqdm(context_groups[np.sort(unique_ixs)], desc="Encoding stimuli"): # Mask based on the context group mask_context = context_groups == group stimuli_in_context = stimuli[mask_context] # store model states for each stimulus in this context group encoded_stimuli = [] ############################################################################### # CONTEXT LOOP ############################################################################### for encoded_stim in encode_stimuli_in_context( stimuli_in_context=stimuli_in_context, tokenizer=self.tokenizer, model=self.model, bidirectional=self._bidirectional, include_special_tokens=self._include_special_tokens, emb_aggregation=self._emb_aggregation, device=self.device, ): encoded_stimuli += [encoded_stim] ############################################################################### # END CONTEXT LOOP ############################################################################### # Flatten activations across layers and package as xarray flattened_activations_and_layer_ids = [ *map(flatten_activations_per_sample, encoded_stimuli) ] for f_as, l_ids in flattened_activations_and_layer_ids: flattened_activations += [f_as] layer_ids += [l_ids] assert len(f_as) == len(l_ids) # Assert all layer lists are equal ############################################################################### # END ALL SAMPLES LOOP ############################################################################### # Stack flattened activations and layer ids to obtain [n_samples, emb_din * n_layers] activations_2d = np.vstack(flattened_activations) layer_ids_1d = np.squeeze(np.unique(np.vstack(layer_ids), axis=0)) # Post-process activations after obtaining them (or "pre-process" them before computing brainscore) if len(self._emb_preproc) > 0: for mode in self._emb_preproc: activations_2d, layer_ids_1d = postprocess_activations( activations_2d=activations_2d, layer_ids_1d=layer_ids_1d, emb_preproc_mode=mode, ) assert activations_2d.shape[1] == len(layer_ids_1d) assert activations_2d.shape[0] == len(stimuli) # Package activations as xarray and reapply metadata encoded_dataset: xr.DataArray = repackage_flattened_activations( activations_2d=activations_2d, layer_ids_1d=layer_ids_1d, dataset=dataset, ) encoded_dataset: xr.DataArray = copy_metadata( encoded_dataset, dataset.contents, "sampleid", ) to_return: EncoderRepresentations = self.get_encoder_representations_template() to_return.dataset = dataset to_return.representations = fix_xr_dtypes(encoded_dataset) if write_cache: to_return.to_cache(overwrite=True) return to_return
def get_encoder_representations_template(self, dataset=None, representations=<xarray.DataArray ()> array(nan)) ‑> EncoderRepresentations
-
returns an empty
EncoderRepresentations
object with all the appropriate attributes but thedataset
andrepresentations
missing and to be filled in later.Expand source code
def get_encoder_representations_template( self, dataset=None, representations=xr.DataArray() ) -> EncoderRepresentations: """ returns an empty `EncoderRepresentations` object with all the appropriate attributes but the `dataset` and `representations` missing and to be filled in later. """ return EncoderRepresentations( dataset=dataset, representations=representations, model_id=self._model_id, context_dimension=self._context_dimension, bidirectional=self._bidirectional, emb_aggregation=self._emb_aggregation, emb_preproc=self._emb_preproc, include_special_tokens=self._include_special_tokens, )
def get_modelcard(self)
-
Returns the model card of the model (model-wise, and not layer-wise)
Expand source code
def get_modelcard(self): """ Returns the model card of the model (model-wise, and not layer-wise) """ model_classes = [ "gpt", "bert", ] # continuously update based on new model classes supported # based on the model_id, figure out which model class it is model_class = [x for x in model_classes if x in self._model_id][0] assert model_class is not None, f"model_id {self._model_id} not supported" config_specs_of_interest = config_name_mappings[model_class] model_specs = {} for ( k_spec, v_spec, ) in ( config_specs_of_interest.items() ): # key is the name we want to use in the model card, # value is the name in the config if v_spec is not None: model_specs[k_spec] = getattr(self.config, v_spec) else: model_specs[k_spec] = None self.model_specs = model_specs return model_specs
class PTEncoder (model_id: str)
-
Interface for *Encoder classes. Must implement an
encode
method that operates on a Dataset object.This class is intended to be an interface for all ANN subclasses, including HuggingFaceEncoder, and, in the future, other kinds of ANN encoders
Args
model_id
:str
- description
Returns
_ModelEncoder
- description
Expand source code
class PTEncoder(_ModelEncoder): def __init__(self, model_id: str) -> "PTEncoder": super().__init__(model_id) def encode(self, dataset: "langbrainscore.dataset.Dataset") -> xr.DataArray: # TODO ...
Ancestors
- langbrainscore.interface.encoder._ModelEncoder
- langbrainscore.interface.encoder._Encoder
- langbrainscore.interface.cacheable._Cacheable
- typing.Protocol
- typing.Generic
- abc.ABC
Methods
def encode(self, dataset: langbrainscore.dataset.Dataset) ‑> xarray.core.dataarray.DataArray
-
returns computed representations for stimuli passed in as a
Dataset
objectArgs
langbrainscore.dataset.Dataset: a Dataset object with a member
xarray.DataArray
instance (Dataset._xr_obj
) containing stimuliReturns
xr.DataArray
- Model representations of each stimulus in brain dataset
Expand source code
def encode(self, dataset: "langbrainscore.dataset.Dataset") -> xr.DataArray: # TODO ...