Module langbrainscore.utils.encoder
Expand source code
import typing
import numpy as np
import torch
import xarray as xr
from tqdm.auto import tqdm
import random
from langbrainscore.utils.preprocessing import preprocessor_classes
from langbrainscore.utils.logging import log
def count_zero_threshold_values(
A: np.ndarray,
zero_threshold: float = 0.001,
):
"""Given matrix A, count how many values are below the zero_threshold"""
return np.sum(A < zero_threshold)
def flatten_activations_per_sample(activations: dict):
"""
Convert activations into dataframe format
Args:
Input (dict): key = layer, value = array of emb_dim
Returns:
arr_flat (np.ndarray): 1D ndarray of flattened activations across all layers
layers_arr (np.ndarray): 1D ndarray of layer indices, corresponding to arr_flat
"""
layers_arr = []
arr_flat = []
for layer, arr in activations.items(): # Iterate over layers
arr = np.array(arr)
arr_flat.append(arr)
for i in range(arr.shape[0]): # Iterate across units
layers_arr.append(layer)
arr_flat = np.concatenate(
arr_flat, axis=0
) # concatenated activations across layers
return arr_flat, np.array(layers_arr)
def aggregate_layers(
hidden_states: dict, mode: typing.Union[str, typing.Callable]
) -> np.ndarray:
"""Input a hidden states dictionary (key = layer, value = 2D array of n_tokens x emb_dim)
Args:
hidden_states (dict): key = layer (int), value = 2D PyTorch tensor of shape (n_tokens, emb_dim)
Raises:
NotImplementedError
Returns:
dict: key = layer, value = array of emb_dim
"""
states_layers = dict()
emb_aggregation = mode
# iterate over layers
for i in hidden_states.keys():
if emb_aggregation == "last":
state = hidden_states[i][-1, :] # get last token
elif emb_aggregation == "first":
state = hidden_states[i][0, :] # get first token
elif emb_aggregation == "mean":
state = torch.mean(hidden_states[i], dim=0) # mean over tokens
elif emb_aggregation == "median":
state = torch.median(hidden_states[i], dim=0) # median over tokens
elif emb_aggregation == "sum":
state = torch.sum(hidden_states[i], dim=0) # sum over tokens
elif emb_aggregation == "all" or emb_aggregation == None:
state = hidden_states
elif callable(emb_aggregation):
state = emb_aggregation(hidden_states[i])
else:
raise NotImplementedError(
f"Sentence embedding method [{emb_aggregation}] not implemented"
)
states_layers[i] = state.detach().cpu().numpy()
return states_layers
def get_torch_device():
"""
get torch device based on whether cuda is available or not
"""
import torch
# Set device to GPU if cuda is available.
if torch.cuda.is_available():
device = torch.device("cuda")
torch.set_default_tensor_type(torch.cuda.FloatTensor)
else:
device = torch.device("cpu")
return device
def set_case(sample: str, emb_case: typing.Union[str, None] = None):
if emb_case == "lower":
return sample.lower()
elif emb_case == "upper":
return sample.upper()
return sample
def get_context_groups(dataset, context_dimension):
if context_dimension is None:
context_groups = np.arange(0, dataset.stimuli.size, 1)
else:
context_groups = dataset.stimuli.coords[context_dimension].values
return context_groups
def preprocess_activations(*args, **kwargs):
return postprocess_activations(*args, **kwargs)
def postprocess_activations(
activations_2d: np.ndarray = None,
layer_ids_1d: np.ndarray = None,
emb_preproc_mode: str = None, # "demean",
):
activations_processed = []
layer_ids_processed = []
# log(f"Preprocessing activations with {p_id}")
for l_id in np.sort(np.unique(layer_ids_1d)): # For each layer
preprocessor = preprocessor_classes[emb_preproc_mode]
# Get the activations for this layer and retain 2d shape: [n_samples, emb_dim]
activations_2d_layer = activations_2d[:, layer_ids_1d == l_id]
preprocessor.fit(
activations_2d_layer
) # obtain a scaling per unit (in emb space)
# Apply the scaling to the activations and reassamble the activations (might have different shape than original)
activations_2d_layer_processed = preprocessor.transform(activations_2d_layer)
activations_processed += [activations_2d_layer_processed]
layer_ids_processed += [np.full(activations_2d_layer_processed.shape[1], l_id)]
# Concatenate to obtain [n_samples, emb_dim across layers], i.e., flattened activations
activations_2d_layer_processed = np.hstack(activations_processed)
layer_ids_1d_processed = np.hstack(layer_ids_processed)
return activations_2d_layer_processed, layer_ids_1d_processed
def repackage_flattened_activations(
activations_2d: np.ndarray = None,
layer_ids_1d: np.ndarray = None,
dataset: xr.Dataset = None,
):
return xr.DataArray(
np.expand_dims(activations_2d, axis=2), # add in time dimension
dims=("sampleid", "neuroid", "timeid"),
coords={
"sampleid": dataset.contents.sampleid.values,
"neuroid": np.arange(len(layer_ids_1d)),
"timeid": np.arange(1),
"layer": ("neuroid", np.array(layer_ids_1d, dtype="int64")),
},
)
def cos_sim_matrix(A, B):
"""Compute the cosine similarity matrix between two matrices A and B.
1 means the two vectors are identical. 0 means they are orthogonal.
-1 means they are opposite."""
return (A * B).sum(axis=1) / (A * A).sum(axis=1) ** 0.5 / (B * B).sum(axis=1) ** 0.5
def pick_matching_token_ixs(
batchencoding: "transformers.tokenization_utils_base.BatchEncoding",
char_span_of_interest: slice,
) -> slice:
"""Picks token indices in a tokenized encoded sequence that best correspond to
a substring of interest in the original sequence, given by a char span (slice)
Args:
batchencoding (transformers.tokenization_utils_base.BatchEncoding): the output of a
`tokenizer(text)` call on a single text instance (not a batch, i.e. `tokenizer([text])`).
char_span_of_interest (slice): a `slice` object denoting the character indices in the
original `text` string we want to extract the corresponding tokens for
Returns:
slice: the start and stop indices within an encoded sequence that
best match the `char_span_of_interest`
"""
from transformers import tokenization_utils_base
start_token = 0
end_token = batchencoding.input_ids.shape[-1]
for i, _ in enumerate(batchencoding.input_ids.reshape(-1)):
span = batchencoding[0].token_to_chars(
i
) # batchencoding 0 gives access to the encoded string
if span is None: # for [CLS], no span is returned
log(
f'No span returned for token at {i}: "{batchencoding.tokens()[i]}"',
type="WARN",
cmap="WARN",
verbosity_check=True,
)
continue
else:
span = tokenization_utils_base.CharSpan(*span)
if span.start <= char_span_of_interest.start:
start_token = i
if span.end >= char_span_of_interest.stop:
end_token = i + 1
break
assert (
end_token - start_token <= batchencoding.input_ids.shape[-1]
), f"Extracted span is larger than original span"
return slice(start_token, end_token)
def encode_stimuli_in_context(
stimuli_in_context,
tokenizer: "transformers.AutoTokenizer",
model: "transformers.AutoModel",
bidirectional: bool,
include_special_tokens: bool,
emb_aggregation,
device=get_torch_device(),
):
""" """
# CONTEXT LOOP
for i, stimulus in enumerate(stimuli_in_context):
# extract stim to encode based on the uni/bi-directional nature of models
if not bidirectional:
stimuli_directional = stimuli_in_context[: i + 1]
else:
stimuli_directional = stimuli_in_context
# join the stimuli together within a context group using just a single space
stimuli_directional = " ".join(stimuli_directional)
tokenized_directional_context = tokenizer(
stimuli_directional,
padding=False,
return_tensors="pt",
add_special_tokens=True,
).to(device)
# Get the hidden states
result_model = model(
tokenized_directional_context.input_ids,
output_hidden_states=True,
return_dict=True,
)
# dict with key=layer, value=3D tensor of dims: [batch, tokens, emb size]
hidden_states = result_model["hidden_states"]
layer_wise_activations = dict()
# Find which indices match the current stimulus in the given context group
start_of_interest = stimuli_directional.find(stimulus)
char_span_of_interest = slice(
start_of_interest, start_of_interest + len(stimulus)
)
token_span_of_interest = pick_matching_token_ixs(
tokenized_directional_context, char_span_of_interest
)
log(
f"Interested in the following stimulus:\n{stimuli_directional[char_span_of_interest]}\n"
f"Recovered:\n{tokenized_directional_context.tokens()[token_span_of_interest]}",
cmap="INFO",
type="INFO",
verbosity_check=True,
)
all_special_ids = set(tokenizer.all_special_ids)
# Look for special tokens in the beginning and end of the sequence
insert_first_upto = 0
insert_last_from = tokenized_directional_context.input_ids.shape[-1]
# loop through input ids
for i, tid in enumerate(tokenized_directional_context.input_ids[0, :]):
if tid.item() in all_special_ids:
insert_first_upto = i + 1
else:
break
for i in range(1, tokenized_directional_context.input_ids.shape[-1] + 1):
tid = tokenized_directional_context.input_ids[0, -i]
if tid.item() in all_special_ids:
insert_last_from -= 1
else:
break
for idx_layer, layer in enumerate(hidden_states): # Iterate over layers
# b (1), n (tokens), h (768, ...)
# collapse batch dim to obtain shape (n_tokens, emb_dim)
this_extracted = layer[
:,
token_span_of_interest,
:,
].squeeze(0)
if include_special_tokens:
# get the embeddings for the first special tokens
this_extracted = torch.cat(
[
layer[:, :insert_first_upto, :].squeeze(0),
this_extracted,
],
axis=0,
)
# get the embeddings for the last special tokens
this_extracted = torch.cat(
[
this_extracted,
layer[:, insert_last_from:, :].squeeze(0),
],
axis=0,
)
layer_wise_activations[idx_layer] = this_extracted.detach()
# Aggregate hidden states within a sample
# aggregated_layerwise_sentence_encodings is a dict with key = layer, value = array of emb dimension
aggregated_layerwise_sentence_encodings = aggregate_layers(
layer_wise_activations, mode=emb_aggregation
)
yield aggregated_layerwise_sentence_encodings
# END CONTEXT LOOP
def dataset_from_stimuli(stimuli: "pd.DataFrame"):
pass
###############################################################################
# ANALYSIS UTILS: these act upon encoded data, rather than encoders
###############################################################################
def get_decomposition_method(method: str = "pca", n_comp: int = 10, **kwargs):
"""
Return the sklearn method to use for decomposition.
Args:
method (str): Method to use for decomposition (default: "pca", other options: "mds", "tsne")
n_comp (int): Number of components to keep (default: 10)
Returns:
sklearn method
"""
if method == "pca":
from sklearn.decomposition import PCA
decomp_method = PCA(n_components=n_comp)
elif method == "mds":
from sklearn.manifold import MDS
decomp_method = MDS(n_components=n_comp)
elif method == "tsne":
from sklearn.manifold import TSNE
decomp_method = TSNE(n_components=n_comp)
else:
raise ValueError(f"Unknown method: {method}")
return decomp_method
def get_explainable_variance(
ann_encoded_dataset,
method: str = "pca",
variance_threshold: float = 0.80,
**kwargs,
) -> xr.Dataset:
"""
Returns how many components are needed to explain the variance threshold (default 80%) per layer.
Args:
ann_encoded_dataset (xr.Dataset): ANN encoded dataset
method (str): Method to use for decomposition (default: "pca", other options: "mds", "tsne")
variance_threshold (float): Variance threshold to use for determining how many components are needed to
explain explained a certain threshold of variance (default: 0.80)
**kwargs: Additional keyword arguments to pass to the underlying method
Returns:
variance_across_layers (dict): Nested dict with value of interest as key (e.g., explained variance) and
layer id as key (e.g., 0, 1, 2, ...) with corresponding values.
"""
ks = [
f"n_comp-{method}_needed-{variance_threshold}",
f"first_comp-{method}_explained_variance",
]
variance_across_layers = {k: {} for k in ks}
# Get the PCA explained variance per layer
layer_ids = ann_encoded_dataset.layer.values
_, unique_ixs = np.unique(layer_ids, return_index=True)
# Make sure that layer order is preserved
for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]):
layer_dataset = (
ann_encoded_dataset.isel(neuroid=(ann_encoded_dataset.layer == layer_id))
.drop("timeid")
.squeeze()
)
# Figure out how many PCs we attempt to fit
n_comp = np.min([layer_dataset.shape[1], layer_dataset.shape[0]])
# Get explained variance
decomp_method = get_decomposition_method(method=method, n_comp=n_comp, **kwargs)
decomp_method.fit(layer_dataset.values)
explained_variance = decomp_method.explained_variance_ratio_
# Get the number of PCs needed to explain the variance threshold
explained_variance_cum = np.cumsum(explained_variance)
n_pc_needed = np.argmax(explained_variance_cum >= variance_threshold) + 1
# Store per layer
layer_id = str(layer_id)
print(
f"Layer {layer_id}: {n_pc_needed} PCs needed to explain {variance_threshold} variance "
f"with the 1st PC explaining {explained_variance[0]:.2f}% of the total variance"
)
variance_across_layers[f"n_comp-{method}_needed-{variance_threshold}"][
layer_id
] = n_pc_needed
variance_across_layers[f"first_comp-{method}_explained_variance"][
layer_id
] = explained_variance[0]
return variance_across_layers
def get_layer_sparsity(
ann_encoded_dataset, zero_threshold: float = 0.0001, **kwargs
) -> xr.Dataset:
"""
Check how sparse activations within a given layer are.
Sparsity is defined as 1 - values below the zero_threshold / total number of values.
Args:
ann_encoded_dataset (xr.Dataset): ANN encoded dataset
zero_threshold (float): Threshold to use for determining sparsity (default: 0.0001)
**kwargs: Additional keyword arguments to pass to the underlying method
Returns:
sparsity_across_layers (dict): Nested dict with value of interest as key (e.g., sparsity) and
layer id as key (e.g., 0, 1, 2, ...) with corresponding values.
"""
# Obtain embedding dimension (for sanity checks)
# if self.model_specs["hidden_emb_dim"]:
# hidden_emb_dim = self.model_specs["hidden_emb_dim"]
# else:
# hidden_emb_dim = None
# log(
# f"Hidden embedding dimension not specified yet",
# cmap="WARN",
# type="WARN",
# )
ks = [f"sparsity-{zero_threshold}"]
sparsity_across_layers = {k: {} for k in ks}
# Get the PCA explained variance per layer
layer_ids = ann_encoded_dataset.layer.values
_, unique_ixs = np.unique(layer_ids, return_index=True)
# Make sure that layer order is preserved
for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]):
layer_dataset = (
ann_encoded_dataset.isel(neuroid=(ann_encoded_dataset.layer == layer_id))
.drop("timeid")
.squeeze()
)
# if hidden_emb_dim is not None:
# assert layer_dataset.shape[1] == hidden_emb_dim
#
# Get sparsity
zero_values = count_zero_threshold_values(layer_dataset.values, zero_threshold)
sparsity = 1 - (zero_values / layer_dataset.size)
# Store per layer
layer_id = str(layer_id)
print(f"Layer {layer_id}: {sparsity:.3f} sparsity")
sparsity_across_layers[f"sparsity-{zero_threshold}"][layer_id] = sparsity
return sparsity_across_layers
def cos_contrib(
emb1: np.ndarray,
emb2: np.ndarray,
):
"""
Cosine contribution function defined in eq. 3 by Timkey & van Schijndel (2021): https://arxiv.org/abs/2109.04404
Args:
emb1 (np.ndarray): Embedding vector 1
emb2 (np.ndarray): Embedding vector 2
Returns:
cos_contrib (float): Cosine contribution
"""
numerator_terms = emb1 * emb2
denom = np.linalg.norm(emb1) * np.linalg.norm(emb2)
return numerator_terms / denom
def get_anisotropy(
ann_encoded_dataset: "EncoderRepresentations", num_random_samples: int = 1000
):
"""
Calculate the anisotropy of the embedding vectors as Timkey & van Schijndel (2021): https://arxiv.org/abs/2109.04404
(base function from their GitHub repo: https://github.com/wtimkey/rogue-dimensions/blob/main/replication.ipynb,
but modified to work within the Language Brain-Score project)
"""
rogue_dist = []
num_toks = len(ann_encoded_dataset.sampleid) # Number of stimuli
# randomly sample embedding pairs to compute avg. cosine similiarity contribution
random_pairs = [
random.sample(range(num_toks), 2) for i in range(num_random_samples)
]
cos_contribs_by_layer = []
layer_ids = ann_encoded_dataset.layer.values
_, unique_ixs = np.unique(layer_ids, return_index=True)
for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]):
layer_dataset = (
ann_encoded_dataset.isel(neuroid=(ann_encoded_dataset.layer == layer_id))
.drop("timeid")
.squeeze()
)
layer_cosine_contribs = []
layer_rogue_cos_contribs = []
for pair in random_pairs:
emb1 = sample_data[layer, pair[0], :] # fix
emb2 = sample_data[layer, pair[1], :]
layer_cosine_contribs.append(cos_contrib(emb1, emb2))
layer_cosine_contribs = np.array(layer_cosine_contribs)
layer_cosine_sims = layer_cosine_contribs.sum(axis=1)
layer_cosine_contribs_mean = layer_cosine_contribs.mean(axis=0)
cos_contribs_by_layer.append(layer_cosine_contribs_mean)
cos_contribs_by_layer = np.array(cos_contribs_by_layer)
aniso = cos_contribs_by_layer.sum(
axis=1
) # total anisotropy, measured as avg. cosine sim between random emb. pairs
for layer in range(num_layers[model_name]):
top_3_dims = np.argsort(cos_contribs_by_layer[layer])[-3:]
top = cos_contribs_by_layer[layer, top_3_dims[2]] / aniso[layer]
second = cos_contribs_by_layer[layer, top_3_dims[1]] / aniso[layer]
third = cos_contribs_by_layer[layer, top_3_dims[0]] / aniso[layer]
print(
"& {} & {:.3f} & {:.3f} & {:.3f} & {:.3f} \\\\".format(
layer, top, second, third, aniso[layer]
)
)
Functions
def aggregate_layers(hidden_states: dict, mode: Union[str, Callable]) ‑> numpy.ndarray
-
Input a hidden states dictionary (key = layer, value = 2D array of n_tokens x emb_dim)
Args
hidden_states
:dict
- key = layer (int), value = 2D PyTorch tensor of shape (n_tokens, emb_dim)
Raises
NotImplementedError
Returns
dict
- key = layer, value = array of emb_dim
Expand source code
def aggregate_layers( hidden_states: dict, mode: typing.Union[str, typing.Callable] ) -> np.ndarray: """Input a hidden states dictionary (key = layer, value = 2D array of n_tokens x emb_dim) Args: hidden_states (dict): key = layer (int), value = 2D PyTorch tensor of shape (n_tokens, emb_dim) Raises: NotImplementedError Returns: dict: key = layer, value = array of emb_dim """ states_layers = dict() emb_aggregation = mode # iterate over layers for i in hidden_states.keys(): if emb_aggregation == "last": state = hidden_states[i][-1, :] # get last token elif emb_aggregation == "first": state = hidden_states[i][0, :] # get first token elif emb_aggregation == "mean": state = torch.mean(hidden_states[i], dim=0) # mean over tokens elif emb_aggregation == "median": state = torch.median(hidden_states[i], dim=0) # median over tokens elif emb_aggregation == "sum": state = torch.sum(hidden_states[i], dim=0) # sum over tokens elif emb_aggregation == "all" or emb_aggregation == None: state = hidden_states elif callable(emb_aggregation): state = emb_aggregation(hidden_states[i]) else: raise NotImplementedError( f"Sentence embedding method [{emb_aggregation}] not implemented" ) states_layers[i] = state.detach().cpu().numpy() return states_layers
def cos_contrib(emb1: numpy.ndarray, emb2: numpy.ndarray)
-
Cosine contribution function defined in eq. 3 by Timkey & van Schijndel (2021): https://arxiv.org/abs/2109.04404
Args
emb1
:np.ndarray
- Embedding vector 1
emb2
:np.ndarray
- Embedding vector 2
Returns
cos_contrib (float): Cosine contribution
Expand source code
def cos_contrib( emb1: np.ndarray, emb2: np.ndarray, ): """ Cosine contribution function defined in eq. 3 by Timkey & van Schijndel (2021): https://arxiv.org/abs/2109.04404 Args: emb1 (np.ndarray): Embedding vector 1 emb2 (np.ndarray): Embedding vector 2 Returns: cos_contrib (float): Cosine contribution """ numerator_terms = emb1 * emb2 denom = np.linalg.norm(emb1) * np.linalg.norm(emb2) return numerator_terms / denom
def cos_sim_matrix(A, B)
-
Compute the cosine similarity matrix between two matrices A and B. 1 means the two vectors are identical. 0 means they are orthogonal. -1 means they are opposite.
Expand source code
def cos_sim_matrix(A, B): """Compute the cosine similarity matrix between two matrices A and B. 1 means the two vectors are identical. 0 means they are orthogonal. -1 means they are opposite.""" return (A * B).sum(axis=1) / (A * A).sum(axis=1) ** 0.5 / (B * B).sum(axis=1) ** 0.5
def count_zero_threshold_values(A: numpy.ndarray, zero_threshold: float = 0.001)
-
Given matrix A, count how many values are below the zero_threshold
Expand source code
def count_zero_threshold_values( A: np.ndarray, zero_threshold: float = 0.001, ): """Given matrix A, count how many values are below the zero_threshold""" return np.sum(A < zero_threshold)
def dataset_from_stimuli(stimuli: pd.DataFrame)
-
Expand source code
def dataset_from_stimuli(stimuli: "pd.DataFrame"): pass
def encode_stimuli_in_context(stimuli_in_context, tokenizer: transformers.AutoTokenizer, model: transformers.AutoModel, bidirectional: bool, include_special_tokens: bool, emb_aggregation, device=device(type='cpu'))
-
Expand source code
def encode_stimuli_in_context( stimuli_in_context, tokenizer: "transformers.AutoTokenizer", model: "transformers.AutoModel", bidirectional: bool, include_special_tokens: bool, emb_aggregation, device=get_torch_device(), ): """ """ # CONTEXT LOOP for i, stimulus in enumerate(stimuli_in_context): # extract stim to encode based on the uni/bi-directional nature of models if not bidirectional: stimuli_directional = stimuli_in_context[: i + 1] else: stimuli_directional = stimuli_in_context # join the stimuli together within a context group using just a single space stimuli_directional = " ".join(stimuli_directional) tokenized_directional_context = tokenizer( stimuli_directional, padding=False, return_tensors="pt", add_special_tokens=True, ).to(device) # Get the hidden states result_model = model( tokenized_directional_context.input_ids, output_hidden_states=True, return_dict=True, ) # dict with key=layer, value=3D tensor of dims: [batch, tokens, emb size] hidden_states = result_model["hidden_states"] layer_wise_activations = dict() # Find which indices match the current stimulus in the given context group start_of_interest = stimuli_directional.find(stimulus) char_span_of_interest = slice( start_of_interest, start_of_interest + len(stimulus) ) token_span_of_interest = pick_matching_token_ixs( tokenized_directional_context, char_span_of_interest ) log( f"Interested in the following stimulus:\n{stimuli_directional[char_span_of_interest]}\n" f"Recovered:\n{tokenized_directional_context.tokens()[token_span_of_interest]}", cmap="INFO", type="INFO", verbosity_check=True, ) all_special_ids = set(tokenizer.all_special_ids) # Look for special tokens in the beginning and end of the sequence insert_first_upto = 0 insert_last_from = tokenized_directional_context.input_ids.shape[-1] # loop through input ids for i, tid in enumerate(tokenized_directional_context.input_ids[0, :]): if tid.item() in all_special_ids: insert_first_upto = i + 1 else: break for i in range(1, tokenized_directional_context.input_ids.shape[-1] + 1): tid = tokenized_directional_context.input_ids[0, -i] if tid.item() in all_special_ids: insert_last_from -= 1 else: break for idx_layer, layer in enumerate(hidden_states): # Iterate over layers # b (1), n (tokens), h (768, ...) # collapse batch dim to obtain shape (n_tokens, emb_dim) this_extracted = layer[ :, token_span_of_interest, :, ].squeeze(0) if include_special_tokens: # get the embeddings for the first special tokens this_extracted = torch.cat( [ layer[:, :insert_first_upto, :].squeeze(0), this_extracted, ], axis=0, ) # get the embeddings for the last special tokens this_extracted = torch.cat( [ this_extracted, layer[:, insert_last_from:, :].squeeze(0), ], axis=0, ) layer_wise_activations[idx_layer] = this_extracted.detach() # Aggregate hidden states within a sample # aggregated_layerwise_sentence_encodings is a dict with key = layer, value = array of emb dimension aggregated_layerwise_sentence_encodings = aggregate_layers( layer_wise_activations, mode=emb_aggregation ) yield aggregated_layerwise_sentence_encodings # END CONTEXT LOOP
def flatten_activations_per_sample(activations: dict)
-
Convert activations into dataframe format
Args
Input
:dict
- key = layer, value = array of emb_dim
Returns
arr_flat (np.ndarray): 1D ndarray of flattened activations across all layers layers_arr (np.ndarray): 1D ndarray of layer indices, corresponding to arr_flat
Expand source code
def flatten_activations_per_sample(activations: dict): """ Convert activations into dataframe format Args: Input (dict): key = layer, value = array of emb_dim Returns: arr_flat (np.ndarray): 1D ndarray of flattened activations across all layers layers_arr (np.ndarray): 1D ndarray of layer indices, corresponding to arr_flat """ layers_arr = [] arr_flat = [] for layer, arr in activations.items(): # Iterate over layers arr = np.array(arr) arr_flat.append(arr) for i in range(arr.shape[0]): # Iterate across units layers_arr.append(layer) arr_flat = np.concatenate( arr_flat, axis=0 ) # concatenated activations across layers return arr_flat, np.array(layers_arr)
def get_anisotropy(ann_encoded_dataset: EncoderRepresentations, num_random_samples: int = 1000)
-
Calculate the anisotropy of the embedding vectors as Timkey & van Schijndel (2021): https://arxiv.org/abs/2109.04404 (base function from their GitHub repo: https://github.com/wtimkey/rogue-dimensions/blob/main/replication.ipynb, but modified to work within the Language Brain-Score project)
Expand source code
def get_anisotropy( ann_encoded_dataset: "EncoderRepresentations", num_random_samples: int = 1000 ): """ Calculate the anisotropy of the embedding vectors as Timkey & van Schijndel (2021): https://arxiv.org/abs/2109.04404 (base function from their GitHub repo: https://github.com/wtimkey/rogue-dimensions/blob/main/replication.ipynb, but modified to work within the Language Brain-Score project) """ rogue_dist = [] num_toks = len(ann_encoded_dataset.sampleid) # Number of stimuli # randomly sample embedding pairs to compute avg. cosine similiarity contribution random_pairs = [ random.sample(range(num_toks), 2) for i in range(num_random_samples) ] cos_contribs_by_layer = [] layer_ids = ann_encoded_dataset.layer.values _, unique_ixs = np.unique(layer_ids, return_index=True) for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]): layer_dataset = ( ann_encoded_dataset.isel(neuroid=(ann_encoded_dataset.layer == layer_id)) .drop("timeid") .squeeze() ) layer_cosine_contribs = [] layer_rogue_cos_contribs = [] for pair in random_pairs: emb1 = sample_data[layer, pair[0], :] # fix emb2 = sample_data[layer, pair[1], :] layer_cosine_contribs.append(cos_contrib(emb1, emb2)) layer_cosine_contribs = np.array(layer_cosine_contribs) layer_cosine_sims = layer_cosine_contribs.sum(axis=1) layer_cosine_contribs_mean = layer_cosine_contribs.mean(axis=0) cos_contribs_by_layer.append(layer_cosine_contribs_mean) cos_contribs_by_layer = np.array(cos_contribs_by_layer) aniso = cos_contribs_by_layer.sum( axis=1 ) # total anisotropy, measured as avg. cosine sim between random emb. pairs for layer in range(num_layers[model_name]): top_3_dims = np.argsort(cos_contribs_by_layer[layer])[-3:] top = cos_contribs_by_layer[layer, top_3_dims[2]] / aniso[layer] second = cos_contribs_by_layer[layer, top_3_dims[1]] / aniso[layer] third = cos_contribs_by_layer[layer, top_3_dims[0]] / aniso[layer] print( "& {} & {:.3f} & {:.3f} & {:.3f} & {:.3f} \\\\".format( layer, top, second, third, aniso[layer] ) )
def get_context_groups(dataset, context_dimension)
-
Expand source code
def get_context_groups(dataset, context_dimension): if context_dimension is None: context_groups = np.arange(0, dataset.stimuli.size, 1) else: context_groups = dataset.stimuli.coords[context_dimension].values return context_groups
def get_decomposition_method(method: str = 'pca', n_comp: int = 10, **kwargs)
-
Return the sklearn method to use for decomposition.
Args
method
:str
- Method to use for decomposition (default: "pca", other options: "mds", "tsne")
n_comp
:int
- Number of components to keep (default: 10)
Returns
sklearn method
Expand source code
def get_decomposition_method(method: str = "pca", n_comp: int = 10, **kwargs): """ Return the sklearn method to use for decomposition. Args: method (str): Method to use for decomposition (default: "pca", other options: "mds", "tsne") n_comp (int): Number of components to keep (default: 10) Returns: sklearn method """ if method == "pca": from sklearn.decomposition import PCA decomp_method = PCA(n_components=n_comp) elif method == "mds": from sklearn.manifold import MDS decomp_method = MDS(n_components=n_comp) elif method == "tsne": from sklearn.manifold import TSNE decomp_method = TSNE(n_components=n_comp) else: raise ValueError(f"Unknown method: {method}") return decomp_method
def get_explainable_variance(ann_encoded_dataset, method: str = 'pca', variance_threshold: float = 0.8, **kwargs) ‑> xarray.core.dataset.Dataset
-
Returns how many components are needed to explain the variance threshold (default 80%) per layer.
Args
ann_encoded_dataset
:xr.Dataset
- ANN encoded dataset
method
:str
- Method to use for decomposition (default: "pca", other options: "mds", "tsne")
variance_threshold
:float
- Variance threshold to use for determining how many components are needed to explain explained a certain threshold of variance (default: 0.80)
**kwargs
- Additional keyword arguments to pass to the underlying method
Returns
variance_across_layers (dict): Nested dict with value of interest as key (e.g., explained variance) and layer id as key (e.g., 0, 1, 2, …) with corresponding values.
Expand source code
def get_explainable_variance( ann_encoded_dataset, method: str = "pca", variance_threshold: float = 0.80, **kwargs, ) -> xr.Dataset: """ Returns how many components are needed to explain the variance threshold (default 80%) per layer. Args: ann_encoded_dataset (xr.Dataset): ANN encoded dataset method (str): Method to use for decomposition (default: "pca", other options: "mds", "tsne") variance_threshold (float): Variance threshold to use for determining how many components are needed to explain explained a certain threshold of variance (default: 0.80) **kwargs: Additional keyword arguments to pass to the underlying method Returns: variance_across_layers (dict): Nested dict with value of interest as key (e.g., explained variance) and layer id as key (e.g., 0, 1, 2, ...) with corresponding values. """ ks = [ f"n_comp-{method}_needed-{variance_threshold}", f"first_comp-{method}_explained_variance", ] variance_across_layers = {k: {} for k in ks} # Get the PCA explained variance per layer layer_ids = ann_encoded_dataset.layer.values _, unique_ixs = np.unique(layer_ids, return_index=True) # Make sure that layer order is preserved for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]): layer_dataset = ( ann_encoded_dataset.isel(neuroid=(ann_encoded_dataset.layer == layer_id)) .drop("timeid") .squeeze() ) # Figure out how many PCs we attempt to fit n_comp = np.min([layer_dataset.shape[1], layer_dataset.shape[0]]) # Get explained variance decomp_method = get_decomposition_method(method=method, n_comp=n_comp, **kwargs) decomp_method.fit(layer_dataset.values) explained_variance = decomp_method.explained_variance_ratio_ # Get the number of PCs needed to explain the variance threshold explained_variance_cum = np.cumsum(explained_variance) n_pc_needed = np.argmax(explained_variance_cum >= variance_threshold) + 1 # Store per layer layer_id = str(layer_id) print( f"Layer {layer_id}: {n_pc_needed} PCs needed to explain {variance_threshold} variance " f"with the 1st PC explaining {explained_variance[0]:.2f}% of the total variance" ) variance_across_layers[f"n_comp-{method}_needed-{variance_threshold}"][ layer_id ] = n_pc_needed variance_across_layers[f"first_comp-{method}_explained_variance"][ layer_id ] = explained_variance[0] return variance_across_layers
def get_layer_sparsity(ann_encoded_dataset, zero_threshold: float = 0.0001, **kwargs) ‑> xarray.core.dataset.Dataset
-
Check how sparse activations within a given layer are.
Sparsity is defined as 1 - values below the zero_threshold / total number of values.
Args
ann_encoded_dataset
:xr.Dataset
- ANN encoded dataset
zero_threshold
:float
- Threshold to use for determining sparsity (default: 0.0001)
**kwargs
- Additional keyword arguments to pass to the underlying method
Returns
sparsity_across_layers (dict): Nested dict with value of interest as key (e.g., sparsity) and layer id as key (e.g., 0, 1, 2, …) with corresponding values.
Expand source code
def get_layer_sparsity( ann_encoded_dataset, zero_threshold: float = 0.0001, **kwargs ) -> xr.Dataset: """ Check how sparse activations within a given layer are. Sparsity is defined as 1 - values below the zero_threshold / total number of values. Args: ann_encoded_dataset (xr.Dataset): ANN encoded dataset zero_threshold (float): Threshold to use for determining sparsity (default: 0.0001) **kwargs: Additional keyword arguments to pass to the underlying method Returns: sparsity_across_layers (dict): Nested dict with value of interest as key (e.g., sparsity) and layer id as key (e.g., 0, 1, 2, ...) with corresponding values. """ # Obtain embedding dimension (for sanity checks) # if self.model_specs["hidden_emb_dim"]: # hidden_emb_dim = self.model_specs["hidden_emb_dim"] # else: # hidden_emb_dim = None # log( # f"Hidden embedding dimension not specified yet", # cmap="WARN", # type="WARN", # ) ks = [f"sparsity-{zero_threshold}"] sparsity_across_layers = {k: {} for k in ks} # Get the PCA explained variance per layer layer_ids = ann_encoded_dataset.layer.values _, unique_ixs = np.unique(layer_ids, return_index=True) # Make sure that layer order is preserved for layer_id in tqdm(layer_ids[np.sort(unique_ixs)]): layer_dataset = ( ann_encoded_dataset.isel(neuroid=(ann_encoded_dataset.layer == layer_id)) .drop("timeid") .squeeze() ) # if hidden_emb_dim is not None: # assert layer_dataset.shape[1] == hidden_emb_dim # # Get sparsity zero_values = count_zero_threshold_values(layer_dataset.values, zero_threshold) sparsity = 1 - (zero_values / layer_dataset.size) # Store per layer layer_id = str(layer_id) print(f"Layer {layer_id}: {sparsity:.3f} sparsity") sparsity_across_layers[f"sparsity-{zero_threshold}"][layer_id] = sparsity return sparsity_across_layers
def get_torch_device()
-
get torch device based on whether cuda is available or not
Expand source code
def get_torch_device(): """ get torch device based on whether cuda is available or not """ import torch # Set device to GPU if cuda is available. if torch.cuda.is_available(): device = torch.device("cuda") torch.set_default_tensor_type(torch.cuda.FloatTensor) else: device = torch.device("cpu") return device
def pick_matching_token_ixs(batchencoding: transformers.tokenization_utils_base.BatchEncoding, char_span_of_interest: slice) ‑> slice
-
Picks token indices in a tokenized encoded sequence that best correspond to a substring of interest in the original sequence, given by a char span (slice)
Args
batchencoding
:transformers.tokenization_utils_base.BatchEncoding
- the output of a
tokenizer(text)
call on a single text instance (not a batch, i.e.tokenizer([text])
). char_span_of_interest
:slice
- a
slice
object denoting the character indices in the originaltext
string we want to extract the corresponding tokens for
Returns
slice
- the start and stop indices within an encoded sequence that
best match the
char_span_of_interest
Expand source code
def pick_matching_token_ixs( batchencoding: "transformers.tokenization_utils_base.BatchEncoding", char_span_of_interest: slice, ) -> slice: """Picks token indices in a tokenized encoded sequence that best correspond to a substring of interest in the original sequence, given by a char span (slice) Args: batchencoding (transformers.tokenization_utils_base.BatchEncoding): the output of a `tokenizer(text)` call on a single text instance (not a batch, i.e. `tokenizer([text])`). char_span_of_interest (slice): a `slice` object denoting the character indices in the original `text` string we want to extract the corresponding tokens for Returns: slice: the start and stop indices within an encoded sequence that best match the `char_span_of_interest` """ from transformers import tokenization_utils_base start_token = 0 end_token = batchencoding.input_ids.shape[-1] for i, _ in enumerate(batchencoding.input_ids.reshape(-1)): span = batchencoding[0].token_to_chars( i ) # batchencoding 0 gives access to the encoded string if span is None: # for [CLS], no span is returned log( f'No span returned for token at {i}: "{batchencoding.tokens()[i]}"', type="WARN", cmap="WARN", verbosity_check=True, ) continue else: span = tokenization_utils_base.CharSpan(*span) if span.start <= char_span_of_interest.start: start_token = i if span.end >= char_span_of_interest.stop: end_token = i + 1 break assert ( end_token - start_token <= batchencoding.input_ids.shape[-1] ), f"Extracted span is larger than original span" return slice(start_token, end_token)
def postprocess_activations(activations_2d: numpy.ndarray = None, layer_ids_1d: numpy.ndarray = None, emb_preproc_mode: str = None)
-
Expand source code
def postprocess_activations( activations_2d: np.ndarray = None, layer_ids_1d: np.ndarray = None, emb_preproc_mode: str = None, # "demean", ): activations_processed = [] layer_ids_processed = [] # log(f"Preprocessing activations with {p_id}") for l_id in np.sort(np.unique(layer_ids_1d)): # For each layer preprocessor = preprocessor_classes[emb_preproc_mode] # Get the activations for this layer and retain 2d shape: [n_samples, emb_dim] activations_2d_layer = activations_2d[:, layer_ids_1d == l_id] preprocessor.fit( activations_2d_layer ) # obtain a scaling per unit (in emb space) # Apply the scaling to the activations and reassamble the activations (might have different shape than original) activations_2d_layer_processed = preprocessor.transform(activations_2d_layer) activations_processed += [activations_2d_layer_processed] layer_ids_processed += [np.full(activations_2d_layer_processed.shape[1], l_id)] # Concatenate to obtain [n_samples, emb_dim across layers], i.e., flattened activations activations_2d_layer_processed = np.hstack(activations_processed) layer_ids_1d_processed = np.hstack(layer_ids_processed) return activations_2d_layer_processed, layer_ids_1d_processed
def preprocess_activations(*args, **kwargs)
-
Expand source code
def preprocess_activations(*args, **kwargs): return postprocess_activations(*args, **kwargs)
def repackage_flattened_activations(activations_2d: numpy.ndarray = None, layer_ids_1d: numpy.ndarray = None, dataset: xarray.core.dataset.Dataset = None)
-
Expand source code
def repackage_flattened_activations( activations_2d: np.ndarray = None, layer_ids_1d: np.ndarray = None, dataset: xr.Dataset = None, ): return xr.DataArray( np.expand_dims(activations_2d, axis=2), # add in time dimension dims=("sampleid", "neuroid", "timeid"), coords={ "sampleid": dataset.contents.sampleid.values, "neuroid": np.arange(len(layer_ids_1d)), "timeid": np.arange(1), "layer": ("neuroid", np.array(layer_ids_1d, dtype="int64")), }, )
def set_case(sample: str, emb_case: Optional[str] = None)
-
Expand source code
def set_case(sample: str, emb_case: typing.Union[str, None] = None): if emb_case == "lower": return sample.lower() elif emb_case == "upper": return sample.upper() return sample