Source code for collie.metrics

from typing import Any, Callable, Iterable, List, Optional, Tuple, Union
import warnings

import numpy as np
try:
    from pytorch_lightning.loggers.logger import Logger as LightningLoggerBase
except ImportError:  # compatible with old ``LightningLoggerBase`` used in versions prior to ``1.6``
    from pytorch_lightning.loggers.base import LightningLoggerBase
from scipy.sparse import csr_matrix
import torch
from torchmetrics import Metric
from torchmetrics.functional import auroc
from tqdm.auto import tqdm

import collie
from collie.interactions import ExplicitInteractions, Interactions, InteractionsDataLoader
from collie.model import BasePipeline


def _get_user_item_pairs(user_ids: Union[np.array, torch.tensor],
                         n_items: int,
                         device: Union[str, torch.device]) -> Tuple[torch.tensor, torch.tensor]:
    """
    Create tensors pairing each input user ID with each item ID.

    Parameters
    ----------
    user_ids: np.array or torch.tensor, 1-d
        Iterable[int] of users to score
    n_items: int
        Number of items in the training data
    device: string
        Device to store tensors on

    Returns
    -------
    users: torch.tensor, 1-d
        Tensor with ``n_items`` copies of each user ID
    items: torch.tensor, 1-d
        Tensor with ``len(user_ids)`` copies of each item ID

    Example
    -------
    .. code-block:: python

        >>> user_ids = np.array([10, 11, 12])
        >>> n_items = 4
        >>> user, item = _get_user_item_pairs(user_ids: user_ids, n_items: 4, device: 'cpu'):
        >>> user
        np.array([10, 10, 10, 10, 11, 11, 11, 11, 12, 12, 12, 12])
        >>> item
        np.array([0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3])

    """
    # Added because sometimes we call this function with ``n_items`` as ``np.int64`` type which
    # breaks ``repeat_interleave``.
    if isinstance(n_items, np.int64):
        n_items = n_items.item()

    users = torch.tensor(
        user_ids,
        dtype=torch.int64,
        requires_grad=False,
        device=device,
    ).repeat_interleave(n_items)

    items = torch.arange(
        start=0,
        end=n_items,
        requires_grad=False,
        device=device,
    ).repeat(len(user_ids))

    return users, items


def get_preds(model: BasePipeline,
              user_ids: Union[np.array, torch.tensor],
              n_items: int,
              device: Union[str, torch.device]) -> torch.tensor:
    """
    Returns a ``n_users x n_items`` tensor with the item IDs of recommended products for each user
    ID.

    Parameters
    ----------
    model: collie.model.BasePipeline
        Model that can take a (user_id, item_id) pair as input and return a recommendation score
    user_ids: np.array or torch.tensor
        Iterable[int] of users to score
    n_items: int
        Number of items in the training data
    device: string
        Device torch should use

    Returns
    -------
    predicted_scores: torch.tensor
        Tensor of shape ``n_users x n_items``

    """
    user, item = _get_user_item_pairs(user_ids, n_items, device)

    with torch.no_grad():
        predicted_scores = model(user, item)

    return predicted_scores.view(-1, n_items)


def _get_labels(targets: csr_matrix,
                user_ids: Union[np.array, torch.tensor],
                preds: Union[np.array, torch.tensor],
                device: str) -> torch.tensor:
    """
    Returns a binary array indicating which of the recommended products are in each user's target
    set.

    Parameters
    ----------
    targets: scipy.sparse.csr_matrix
        Interaction matrix containing user and item IDs
    user_ids: np.array or torch.tensor
        Users corresponding to the recommendations in the top k predictions
    preds: torch.tensor
        Top ``k`` item IDs to recommend to each user of shape (n_users x k)
    device: string
        Device torch should use

    Returns
    -------
    labels: torch.tensor
        Tensor with the same dimensions as input ``preds``

    """
    return torch.tensor(
        (targets[user_ids[:, None], np.array(preds.detach().cpu())] > 0)
        .astype('double')
        .toarray(),
        requires_grad=False,
        device=device,
    )


[docs]def mapk(targets: csr_matrix, user_ids: Union[np.array, torch.tensor], preds: Union[np.array, torch.tensor], k: int = 10) -> float: """ Calculate the mean average precision at K (MAP@K) score for each user. Parameters ---------- targets: scipy.sparse.csr_matrix Interaction matrix containing user and item IDs user_ids: np.array or torch.tensor Users corresponding to the recommendations in the top k predictions preds: torch.tensor Tensor of shape (n_users x n_items) with each user's scores for each item k: int Number of recommendations to consider per user Returns ------- mapk_score: float """ device = preds.device n_users = preds.shape[0] try: predicted_items = preds.topk(k, dim=1).indices except RuntimeError as e: raise ValueError( f'Ensure ``k`` ({k}) is less than the number of items ({preds.shape[1]}):', str(e) ) topk_labeled = _get_labels(targets, user_ids, predicted_items, device) accuracy = topk_labeled.int() weights = ( 1.0 / torch.arange( start=1, end=k+1, dtype=torch.float64, requires_grad=False, device=device ) ).repeat(n_users, 1) denominator = torch.min( torch.tensor(k, device=device, dtype=torch.int).repeat(len(user_ids)), torch.tensor(targets[user_ids].getnnz(axis=1), device=device) ) res = ((accuracy * accuracy.cumsum(axis=1) * weights).sum(axis=1)) / denominator res[torch.isnan(res)] = 0 return res.mean().item()
[docs]def mrr(targets: csr_matrix, user_ids: Union[np.array, torch.tensor], preds: Union[np.array, torch.tensor], k: Optional[Any] = None) -> float: """ Calculate the mean reciprocal rank (MRR) of the input predictions. Parameters ---------- targets: scipy.sparse.csr_matrix Interaction matrix containing user and item IDs user_ids: np.array or torch.tensor Users corresponding to the recommendations in the top k predictions preds: torch.tensor Tensor of shape (n_users x n_items) with each user's scores for each item k: Any Ignored, included only for compatibility with ``mapk`` Returns ------- mrr_score: float """ predicted_items = preds.topk(preds.shape[1], dim=1).indices labeled = _get_labels(targets, user_ids, predicted_items, device=preds.device) # weighting each 0/1 by position so that topk returns index of *first* postive result position_weight = 1.0/( torch.arange(1, targets.shape[1] + 1, device=preds.device) .repeat(len(user_ids), 1) .float() ) labeled_weighted = (labeled.float() * position_weight) highest_score, rank = labeled_weighted.topk(k=1) reciprocal_rank = 1.0/(rank.float() + 1) reciprocal_rank[highest_score == 0] = 0 return reciprocal_rank.mean().item()
[docs]def auc(targets: csr_matrix, user_ids: Union[np.array, torch.tensor], preds: Union[np.array, torch.tensor], k: Optional[Any] = None) -> float: """ Calculate the area under the ROC curve (AUC) for each user and average the results. Parameters ---------- targets: scipy.sparse.csr_matrix Interaction matrix containing user and item IDs user_ids: np.array or torch.tensor Users corresponding to the recommendations in the top k predictions preds: torch.tensor Tensor of shape (n_users x n_items) with each user's scores for each item k: Any Ignored, included only for compatibility with ``mapk`` Returns ------- auc_score: float """ agg = 0 for i, user_id in enumerate(user_ids): target_tensor = torch.tensor( targets[user_id].toarray(), device=preds.device, dtype=torch.long ).view(-1) # many models' ``preds`` may be unbounded if a final activation layer is not applied # we have to normalize ``preds`` here to avoid a ``ValueError`` stating that ``preds`` # should be probabilities, but values were detected outside of [0,1] range try: auc = auroc(torch.sigmoid(preds[i, :]), target=target_tensor, pos_label=1) except TypeError: # compatible with old ``aucroc`` API used in versions prior to ``1.6`` auc = auroc(torch.sigmoid(preds[i, :]), target=target_tensor, task='binary') agg += auc return (agg/len(user_ids)).item()
[docs]def evaluate_in_batches( metric_list: Iterable[Callable[ [csr_matrix, Union[np.array, torch.tensor], Union[np.array, torch.tensor], Optional[int]], float ]], test_interactions: collie.interactions.Interactions, model: collie.model.BasePipeline, k: int = 10, batch_size: int = 20, logger: LightningLoggerBase = None, verbose: bool = True, ) -> List[float]: """ Evaluate a model with potentially several different metrics. Memory constraints require that most test sets will need to be evaluated in batches. This function handles the looping and batching boilerplate needed to properly evaluate the model without running out of memory. Parameters ---------- metric_list: list of functions List of evaluation functions to apply. Each function must accept keyword arguments: * ``targets`` * ``user_ids`` * ``preds`` * ``k`` test_interactions: collie.interactions.Interactions Interactions to use as labels model: collie.model.BasePipeline Model that can take a (user_id, item_id) pair as input and return a recommendation score k: int Number of recommendations to consider per user. This is ignored by some metrics batch_size: int Number of users to score in a single batch. For best efficiency, this number should be as high as possible without running out of memory logger: pytorch_lightning.loggers.base.LightningLoggerBase If provided, will log outputted metrics dictionary using the ``log_metrics`` method with keys being the string representation of ``metric_list`` and values being ``evaluation_results``. Additionally, if ``model.hparams.num_epochs_completed`` exists, this will be logged as well, making it possible to track metrics progress over the course of model training verbose: bool Display progress bar and print statements during function execution Returns ------- evaluation_results: list List of floats, with each metric value corresponding to the respective function passed in ``metric_list`` Examples -------- .. code-block:: python from collie.metrics import auc, evaluate_in_batches, mapk, mrr map_10_score, mrr_score, auc_score = evaluate_in_batches( metric_list=[mapk, mrr, auc], test_interactions=test, model=model, ) print(map_10_score, mrr_score, auc_score) """ if not isinstance(test_interactions, Interactions): raise ValueError( '``test_interactions`` must be of type ``Interactions``, not ' f'{type(test_interactions)}. Try using ``explicit_evaluate_in_batches`` instead.' ) device = _get_evaluate_in_batches_device(model=model) model.to(device) model._move_any_external_data_to_device() test_users = np.unique(test_interactions.mat.row) targets = test_interactions.mat.tocsr() if len(test_users) < batch_size: batch_size = len(test_users) accumulators = [0] * len(metric_list) data_to_iterate_over = range(int(np.ceil(len(test_users) / batch_size))) if verbose: data_to_iterate_over = tqdm(data_to_iterate_over) for i in data_to_iterate_over: user_range = test_users[i * batch_size:(i + 1) * batch_size] preds = get_preds(model, user_range, test_interactions.num_items, device) for metric_ind, metric in enumerate(metric_list): score = metric(targets=targets, user_ids=user_range, preds=preds, k=k) accumulators[metric_ind] += (score * len(user_range)) all_scores = [acc_score / len(test_users) for acc_score in accumulators] if logger is not None: _log_metrics(model=model, logger=logger, metric_list=metric_list, all_scores=all_scores, verbose=verbose) return all_scores[0] if len(all_scores) == 1 else all_scores
[docs]def explicit_evaluate_in_batches( metric_list: Iterable[Metric], test_interactions: collie.interactions.ExplicitInteractions, model: collie.model.BasePipeline, logger: LightningLoggerBase = None, verbose: bool = True, **kwargs, ) -> List[float]: """ Evaluate a model with potentially several different metrics. Memory constraints require that most test sets will need to be evaluated in batches. This function handles the looping and batching boilerplate needed to properly evaluate the model without running out of memory. Parameters ---------- metric_list: list of ``torchmetrics.Metric`` List of evaluation functions to apply. Each function must accept arguments for predictions and targets, in order test_interactions: collie.interactions.ExplicitInteractions model: collie.model.BasePipeline Model that can take a (user_id, item_id) pair as input and return a recommendation score batch_size: int Number of users to score in a single batch. For best efficiency, this number should be as high as possible without running out of memory logger: pytorch_lightning.loggers.base.LightningLoggerBase If provided, will log outputted metrics dictionary using the ``log_metrics`` method with keys being the string representation of ``metric_list`` and values being ``evaluation_results``. Additionally, if ``model.hparams.num_epochs_completed`` exists, this will be logged as well, making it possible to track metrics progress over the course of model training verbose: bool Display progress bar and print statements during function execution **kwargs: keyword arguments Additional arguments sent to the ``InteractionsDataLoader`` Returns ---------- evaluation_results: list List of floats, with each metric value corresponding to the respective function passed in ``metric_list`` Examples ------------- .. code-block:: python import torchmetrics from collie.metrics import explicit_evaluate_in_batches mse_score, mae_score = evaluate_in_batches( metric_list=[torchmetrics.MeanSquaredError(), torchmetrics.MeanAbsoluteError()], test_interactions=test, model=model, ) print(mse_score, mae_score) """ if not isinstance(test_interactions, ExplicitInteractions): raise ValueError( '``test_interactions`` must be of type ``ExplicitInteractions``, not ' f'{type(test_interactions)}. Try using ``evaluate_in_batches`` instead.' ) try: device = _get_evaluate_in_batches_device(model=model) model.to(device) model._move_any_external_data_to_device() test_loader = InteractionsDataLoader(interactions=test_interactions, **kwargs) data_to_iterate_over = test_loader if verbose: data_to_iterate_over = tqdm(test_loader) for batch in data_to_iterate_over: users, items, ratings = batch # move data to batch before sending to model users = users.to(device) items = items.to(device) ratings = ratings.cpu() preds = model(users, items) for metric in metric_list: metric(preds.cpu(), ratings) all_scores = [metric.compute() for metric in metric_list] if logger is not None: _log_metrics(model=model, logger=logger, metric_list=metric_list, all_scores=all_scores, verbose=verbose) return all_scores[0] if len(all_scores) == 1 else all_scores finally: for metric in metric_list: metric.reset()
def _get_evaluate_in_batches_device(model: BasePipeline): device = getattr(model, 'device', None) if torch.cuda.is_available() and str(device) == 'cpu': warnings.warn('CUDA available but model device is set to CPU - is this desired?') if device is None: if torch.cuda.is_available(): warnings.warn( '``model.device`` attribute is ``None``. Since GPU is available, putting model on ' 'GPU.' ) device = 'cuda:0' else: device = 'cpu' return device def _log_metrics(model: BasePipeline, logger: LightningLoggerBase, metric_list: List[Union[Callable[..., Any], Metric]], all_scores: List[float], verbose: bool): try: step = model.hparams.get('num_epochs_completed') except torch.nn.modules.module.ModuleAttributeError: # if, somehow, there is no ``model.hparams`` attribute, this shouldn't fail step = None try: metrics_dict = dict(zip([x.__name__ for x in metric_list], all_scores)) except AttributeError: metrics_dict = dict(zip([type(x).__name__ for x in metric_list], all_scores)) if verbose: print(f'Logging metrics {metrics_dict} to ``logger``...') logger.log_metrics(metrics=metrics_dict, step=step)