Source code for collie.model.cold_start_matrix_factorization

from functools import partial
from typing import Callable, Dict, Iterable, Optional, Union

import pandas as pd
import torch
from torch import nn
from torch.optim.lr_scheduler import ReduceLROnPlateau

from collie.interactions import (ApproximateNegativeSamplingInteractionsDataLoader,
                                 Interactions,
                                 InteractionsDataLoader)
from collie.model import MultiStagePipeline, ScaledEmbedding, ZeroEmbedding
from collie.utils import get_init_arguments, merge_docstrings


INTERACTIONS_LIKE_INPUT = Union[ApproximateNegativeSamplingInteractionsDataLoader,
                                Interactions,
                                InteractionsDataLoader]


[docs]class ColdStartModel(MultiStagePipeline): # NOTE: the full docstring is merged in with ``MultiStagePipeline``'s using # ``merge_docstrings``. Only the description of new or changed parameters are included in this # docstring """ Training pipeline for a matrix factorization model optimized for the cold-start problem. Many recommendation models suffer from the cold start problem, when a model is unable to provide adequate recommendations for a new item until enough users have interacted with it. But, if users only interact with recommended items, the item will never be recommended, and thus the model will never improve recommendations for this item. The ``ColdStartModel`` attempts to bypass this by limiting the item space down to "item buckets", training a model on this as the item space, then expanding out to all items. During this expansion, the learned-embeddings of each bucket is copied over to each corresponding item, providing a smarter initialization than a random one for both existing and new items. Now, when we have a new item, we can use its bucket embedding as an initialization into a model. The stages in a ``ColdStartModel`` are, in order: 1. ``item_buckets`` Matrix factorization with item embeddings and bias terms bucketed by ``item_buckets`` argument. Unlike in the next stage, many items may map on to a single bucket, and this will share the same embedding and bias representation. The model should learn user preference for buckets in this stage. 2. ``no_buckets`` Standard matrix factorization as we do in ``MatrixFactorizationModel``. However, upon advancing to this stage, the item embeddings are initialized with their bucketed embedding value (and same for biases). Not only does this provide better initialization than random, but allows new items to be incorporated into the model without training by using their item bucket embedding and bias terms at prediction time. Note that the cold start problem exists for new users as well, but this functionality will be added to this model in a future version. All ``ColdStartModel`` instances are subclasses of the ``LightningModule`` class provided by PyTorch Lightning. This means to train a model, you will need a ``collie.model.CollieTrainer`` object, but the model can be saved and loaded without this ``Trainer`` instance. Example usage may look like: .. code-block:: python from collie.model import ColdStartModel, CollieTrainer # instantiate and fit a ``ColdStartModel`` as expected model = ColdStartModel(train=train, item_buckets=item_buckets) trainer = CollieTrainer(model) trainer.fit(model) # train for X more epochs on the next stage, ``no_buckets`` trainer.max_epochs += X model.advance_stage() trainer.fit(model) model.eval() # do evaluation as normal with ``model`` # get item-item recommendations for a new item by using the bucket ID, Z similar_items = model.item_bucket_item_similarity(item_bucket_id=Z) model.save_model(filename='model.pth') new_model = ColdStartModel(load_model_path='model.pth') # do evaluation as normal with ``new_model`` Note ---- The ``forward`` calculation will be different depending on the stage that is set. Note this when evaluating / saving and loading models in. Parameters ---------- item_buckets: torch.tensor, 1-d An ordered iterable containing the bucket ID for each item ID. For example, if you have five films and are going to bucket by primary genre, and your data looks like: * Item ID: 0, Genre ID: 1 * Item ID: 1, Genre ID: 0 * Item ID: 2, Genre ID: 2 * Item ID: 3, Genre ID: 2 * Item ID: 4, Genre ID: 1 Then ``item_buckets`` would be: ``[1, 0, 2, 2, 1]`` embedding_dim: int Number of latent factors to use for user and item embeddings dropout_p: float Probability of dropout item_buckets_stage_lr: float Learning rate for user parameters and item bucket parameters optimized during the ``item_buckets`` stage no_buckets_stage_lr: float Learning rate for user parameters and item parameters optimized during the ``no_buckets`` stage item_buckets_stage_lr: float Optimizer used for user parameters and item bucket parameters optimized during the ``item_buckets`` stage. If a string, one of the following supported optimizers: * ``'sgd'`` (for ``torch.optim.SGD``) * ``'adam'`` (for ``torch.optim.Adam``) no_buckets_stage_lr: float Optimizer used for user parameters and item parameters optimized during the ``no_buckets`` stage. If a string, one of the following supported optimizers: * ``'sgd'`` (for ``torch.optim.SGD``) * ``'adam'`` (for ``torch.optim.Adam``) """ def __init__(self, train: INTERACTIONS_LIKE_INPUT = None, val: INTERACTIONS_LIKE_INPUT = None, item_buckets: Iterable[int] = None, embedding_dim: int = 30, dropout_p: float = 0.0, sparse: bool = False, item_buckets_stage_lr: float = 1e-3, no_buckets_stage_lr: float = 1e-3, lr_scheduler_func: Optional[torch.optim.lr_scheduler._LRScheduler] = partial( ReduceLROnPlateau, patience=1, verbose=False, ), weight_decay: float = 0.0, item_buckets_stage_optimizer: Union[str, torch.optim.Optimizer] = 'adam', no_buckets_stage_optimizer: Union[str, torch.optim.Optimizer] = 'adam', loss: Union[str, Callable[..., torch.tensor]] = 'hinge', metadata_for_loss: Optional[Dict[str, torch.tensor]] = None, metadata_for_loss_weights: Optional[Dict[str, float]] = None, load_model_path: Optional[str] = None, map_location: Optional[str] = None): optimizer_config_list = None num_item_buckets = None if load_model_path is None: # TODO: separate out optimizer and bias optimizer somehow optimizer_config_list = [ { 'lr': item_buckets_stage_lr, 'optimizer': item_buckets_stage_optimizer, 'parameter_prefix_list': [ 'user_embed', 'user_bias', 'item_bucket_embed', 'item_bucket_bias', ], 'stage': 'item_buckets', }, { 'lr': no_buckets_stage_lr, 'optimizer': no_buckets_stage_optimizer, 'parameter_prefix_list': [ 'user_embed', 'user_bias', 'item_embed', 'item_bias', ], 'stage': 'no_buckets', }, ] if not isinstance(item_buckets, torch.Tensor): item_buckets = torch.tensor(item_buckets) # data quality checks for ``item_buckets`` assert item_buckets.dim() == 1, ( f'``item_buckets`` must be 1-dimensional, not {item_buckets.dim()}-dimensional!' ) if len(item_buckets) != train.num_items: raise ValueError( 'Length of ``item_buckets`` must be equal to the number of items in the ' f'dataset: {len(item_buckets)} != {train.num_items}.' ) if min(item_buckets) != 0: raise ValueError(f'``item_buckets`` IDs must start at 0, not {min(item_buckets)}!') num_item_buckets = item_buckets.max().item() + 1 super().__init__(optimizer_config_list=optimizer_config_list, num_item_buckets=num_item_buckets, **get_init_arguments()) __doc__ = merge_docstrings(MultiStagePipeline, __doc__, __init__) def _move_any_external_data_to_device(self): """Move the item buckets to the device before training.""" super()._move_any_external_data_to_device() self.hparams.item_buckets = self.hparams.item_buckets.to(self.device) def _copy_weights(self, old: nn.Embedding, new: nn.Embedding, buckets: torch.tensor) -> None: new.weight.data.copy_(old.weight.data[buckets.to(new.weight.device)])
[docs] def set_stage(self, stage: str) -> None: """Set the stage for the model.""" current_stage = self.hparams.stage if stage in self.hparams.stage_list: if current_stage == 'item_buckets' and stage == 'no_buckets': print('Copying over item embeddings...') self._copy_weights(self.item_bucket_biases, self.item_biases, self.hparams.item_buckets) self._copy_weights(self.item_bucket_embeddings, self.item_embeddings, self.hparams.item_buckets) else: raise ValueError( f'"{stage}" is not a valid stage, please choose one of {self.hparams.stage_list}' ) self.hparams.stage = stage print(f'Set ``self.hparams.stage`` to "{stage}"')
def _setup_model(self, **kwargs) -> None: """ Method for building model internals that rely on the data passed in. This method will be called after `prepare_data`. """ # define initial embedding groups self.item_bucket_biases = ZeroEmbedding( num_embeddings=self.hparams.num_item_buckets, embedding_dim=1, sparse=self.hparams.sparse, ) self.item_bucket_embeddings = ScaledEmbedding( num_embeddings=self.hparams.num_item_buckets, embedding_dim=self.hparams.embedding_dim, sparse=self.hparams.sparse, ) # define fine-tuned embedding groups self.user_biases = ZeroEmbedding( num_embeddings=self.hparams.num_users, embedding_dim=1, sparse=self.hparams.sparse ) self.item_biases = ZeroEmbedding( num_embeddings=self.hparams.num_items, embedding_dim=1, sparse=self.hparams.sparse, ) self.user_embeddings = ScaledEmbedding( num_embeddings=self.hparams.num_users, embedding_dim=self.hparams.embedding_dim, sparse=self.hparams.sparse ) self.item_embeddings = ScaledEmbedding( num_embeddings=self.hparams.num_items, embedding_dim=self.hparams.embedding_dim, sparse=self.hparams.sparse, ) self.dropout = nn.Dropout(p=self.hparams.dropout_p)
[docs] def forward(self, users: torch.tensor, items: torch.tensor) -> torch.tensor: """ Forward pass through the model. Parameters ---------- users: tensor, 1-d Array of user indices items: tensor, 1-d Array of item indices Returns ------- preds: tensor, 1-d Predicted ratings or rankings """ user_embeddings = self.user_embeddings(users) user_biases = self.user_biases(users) if self.hparams.stage == 'item_buckets': # transform item IDs to item bucket IDs items = self.hparams.item_buckets[items] item_embeddings = self.item_bucket_embeddings(items) item_biases = self.item_bucket_biases(items) elif self.hparams.stage == 'no_buckets': item_embeddings = self.item_embeddings(items) item_biases = self.item_biases(items) pred_scores = ( torch.mul(self.dropout(user_embeddings), self.dropout(item_embeddings)).sum(axis=1) + user_biases.squeeze(1) + item_biases.squeeze(1) ) return pred_scores.squeeze()
[docs] def item_bucket_item_similarity(self, item_bucket_id: int) -> pd.Series: """ Get most similar item indices to a item bucket by cosine similarity. Cosine similarity is computed with item and item bucket embeddings from a trained model. Parameters ---------- item_id: int Returns ------- sim_score_idxs: pd.Series Sorted values as cosine similarity for each item in the dataset with the index being the item ID """ item_bucket_embeddings = self.item_bucket_embeddings.weight.data item_bucket_embeddings = ( item_bucket_embeddings / item_bucket_embeddings.norm(dim=1)[:, None] ) item_embeddings = self._get_item_embeddings() item_embeddings = item_embeddings / item_embeddings.norm(dim=1)[:, None] sim_score_idxs = ( torch.matmul(item_bucket_embeddings[[item_bucket_id], :], item_embeddings.transpose(1, 0)) .detach() .cpu() .numpy() .squeeze() ) sim_score_idxs_series = pd.Series(sim_score_idxs) sim_score_idxs_series = sim_score_idxs_series.sort_values(ascending=False) return sim_score_idxs_series
def _get_item_embeddings(self) -> torch.tensor: """Get item embeddings on device.""" return self.item_embeddings.weight.data def _get_user_embeddings(self) -> torch.tensor: """Get user embeddings on device.""" return self.user_embeddings.weight.data