from collections import defaultdict
import functools
import operator
from typing import Any, Iterable, Optional, Tuple, Union
from joblib import delayed, Parallel
import numpy as np
from scipy.sparse import coo_matrix
from sklearn.model_selection import train_test_split
from collie.interactions import (BaseInteractions,
ExplicitInteractions,
HDF5Interactions,
Interactions)
from collie.utils import get_random_seed
def _subset_interactions(interactions: BaseInteractions,
idxs: Iterable[int]) -> Union[ExplicitInteractions, Interactions]:
idxs = np.array(idxs)
coo_mat = coo_matrix(
(interactions.mat.data[idxs], (interactions.mat.row[idxs], interactions.mat.col[idxs])),
shape=(interactions.num_users, interactions.num_items)
)
# disable all ``Interactions`` checks for the data splits, since we assume the initial
# ``Interactions`` object would have these checks already applied prior to the data split
if isinstance(interactions, Interactions):
return Interactions(
mat=coo_mat,
num_negative_samples=interactions.num_negative_samples,
allow_missing_ids=True,
remove_duplicate_user_item_pairs=False,
num_users=interactions.num_users,
num_items=interactions.num_items,
check_num_negative_samples_is_valid=False,
max_number_of_samples_to_consider=interactions.max_number_of_samples_to_consider,
seed=interactions.seed,
)
else:
return ExplicitInteractions(
mat=coo_mat,
allow_missing_ids=True,
remove_duplicate_user_item_pairs=False,
num_users=interactions.num_users,
num_items=interactions.num_items,
)
[docs]def random_split(interactions: BaseInteractions,
val_p: float = 0.0,
test_p: float = 0.2,
processes: Optional[Any] = None,
seed: Optional[int] = None) -> Tuple[BaseInteractions, ...]:
"""
Randomly split interactions into training, validation, and testing sets.
This split does NOT guarantee that every user will be represented in both the training and
testing datasets. While much faster than ``stratified_split``, it is not the most representative
data split because of this.
Note that this function is not supported for ``HDF5Interactions`` objects, since this data split
implementation requires all data to fit in memory. A data split for large datasets should be
done using a big data processing technology, like Spark.
Parameters
----------
interactions: collie.interactions.BaseInteractions
val_p: float
Proportion of data used for validation
test_p: float
Proportion of data used for testing
processes: Any
Ignored, included only for compatability with ``stratified_split`` API
seed: int
Random seed for splits
Returns
-------
train_interactions: collie.interactions.BaseInteractions
Training data of size proportional to ``1 - val_p - test_p``
validate_interactions: collie.interactions.BaseInteractions
Validation data of size proportional to ``val_p``, returned only if ``val_p > 0``
test_interactions: collie.interactions.BaseInteractions
Testing data of size proportional to ``test_p``
Examples
--------
.. code-block:: python
>>> interactions = Interactions(...)
>>> len(interactions)
100000
>>> train, test = random_split(interactions)
>>> len(train), len(test)
(80000, 20000)
"""
assert not isinstance(interactions, HDF5Interactions), (
'``HDF5Interactions`` data type not supported in cross validation splits!'
)
_validate_val_p_and_test_p(val_p=val_p, test_p=test_p)
if seed is None:
seed = get_random_seed()
np.random.seed(seed)
shuffle_indices = np.arange(len(interactions))
np.random.shuffle(shuffle_indices)
interactions = _subset_interactions(interactions=interactions,
idxs=shuffle_indices)
validate_and_test_p = val_p + test_p
validate_cutoff = int((1.0 - validate_and_test_p) * len(interactions))
test_cutoff = int((1.0 - test_p) * len(interactions))
train_idxs = np.arange(validate_cutoff)
validate_idxs = np.arange(validate_cutoff, test_cutoff)
test_idxs = np.arange(test_cutoff, len(interactions))
train_interactions = _subset_interactions(interactions=interactions,
idxs=train_idxs)
test_interactions = _subset_interactions(interactions=interactions,
idxs=test_idxs)
if val_p > 0:
validate_interactions = _subset_interactions(interactions=interactions,
idxs=validate_idxs)
return train_interactions, validate_interactions, test_interactions
else:
return train_interactions, test_interactions
[docs]def stratified_split(interactions: BaseInteractions,
val_p: float = 0.0,
test_p: float = 0.2,
processes: int = -1,
seed: Optional[int] = None,
force_split: bool = False) -> Tuple[BaseInteractions, ...]:
"""
Split an ``Interactions`` instance into train, validate, and test datasets in a stratified
manner such that each user appears at least once in each of the datasets.
This split guarantees that every user will be represented in the training, validation, and
testing datasets given they appear in ``interactions`` at least three times. If ``val_p ==
0``, they will appear in the training and testing datasets given they appear at least two times.
If a user appears fewer than this number of times, a ``ValueError`` will
be raised. To filter users with fewer than ``n`` points out, use
``collie.utils.remove_users_with_fewer_than_n_interactions``.
This is computationally more complex than ``random_split``, but produces a more representative
data split. Note that when ``val_p > 0``, the algorithm will perform the data split twice,
once to create the test set and another to create the validation set, essentially doubling the
computational time.
Note that this function is not supported for ``HDF5Interactions`` objects, since this data split
implementation requires all data to fit in memory. A data split for large datasets should be
done using a big data processing technology, like Spark.
Parameters
----------
interactions: collie.interactions.BaseInteractions
``Interactions`` instance containing the data to split
val_p: float
Proportion of data used for validation
test_p: float
Proportion of data used for testing
processes: int
Number of CPUs to use for parallelization. If ``processes == 0``, this will be run
sequentially in a single list comprehension, else this function uses ``joblib.delayed`` and
``joblib.Parallel`` for parallelization. A value of ``-1`` means that all available cores
will be used
seed: int
Random seed for splits
force_split: bool
Ignore error raised when a user in the dataset has only a single interaction. Normally,
a ``ValueError`` is raised when this occurs. When ``force_split=True``, however,
users with a single interaction will be placed in the training set and an error will NOT be
raised
Returns
-------
train_interactions: collie.interactions.BaseInteractions
Training data of size proportional to ``1 - val_p - test_p``
validate_interactions: collie.interactions.BaseInteractions
Validation data of size proportional to ``val_p``, returned only if ``val_p > 0``
test_interactions: collie.interactions.BaseInteractions
Testing data of size proportional to ``test_p``
Examples
--------
.. code-block:: python
>>> interactions = Interactions(...)
>>> len(interactions)
100000
>>> train, test = stratified_split(interactions)
>>> len(train), len(test)
(80000, 20000)
"""
assert not isinstance(interactions, HDF5Interactions), (
'``HDF5Interactions`` data types not supported in cross validation splits!'
)
_validate_val_p_and_test_p(val_p=val_p, test_p=test_p)
if seed is None:
seed = get_random_seed()
train, test = _stratified_split(interactions=interactions,
test_p=test_p,
processes=processes,
seed=seed,
force_split=force_split)
if val_p > 0:
train, validate = _stratified_split(interactions=train,
test_p=val_p / (1 - test_p),
processes=processes,
seed=seed,
force_split=force_split)
return train, validate, test
else:
return train, test
def _stratified_split(interactions: BaseInteractions,
test_p: float,
processes: int,
seed: int,
force_split: bool) -> Tuple[Interactions, Interactions]:
users = interactions.mat.row
unique_users = set(users)
# while we should be able to run ``np.where(users == user)[0]`` to find all items each user
# interacted with, by building up a dictionary to get these values instead, we can achieve the
# same result in O(N) complexity rather than O(M * N), a nice timesave to have when working with
# larger datasets
all_idxs_for_users_dict = defaultdict(list)
for idx, user in enumerate(users):
all_idxs_for_users_dict[user].append(idx)
if processes == 0:
test_idxs = [
_stratified_split_parallel_worker(idxs_to_split=all_idxs_for_users_dict[user],
test_p=test_p,
seed=(seed + user),
force_split=force_split)
for user in unique_users
]
else:
# run the function below in parallel for each user
# by setting the seed to ``seed + user``, we get a balance between reproducability and
# actual randomness so users with the same number of interactions are not split the exact
# same way
test_idxs = Parallel(n_jobs=processes)(
delayed(_stratified_split_parallel_worker)(all_idxs_for_users_dict[user],
test_p,
seed + user,
force_split)
for user in unique_users
)
# reduce the list of lists down to a 1-d list
test_idxs = functools.reduce(operator.iconcat, test_idxs, [])
# find all indices not in test set - they are now train
train_idxs = list(set(range(len(users))) - set(test_idxs))
train_interactions = _subset_interactions(interactions=interactions,
idxs=train_idxs)
test_interactions = _subset_interactions(interactions=interactions,
idxs=test_idxs)
return train_interactions, test_interactions
def _stratified_split_parallel_worker(idxs_to_split: Iterable[Any],
test_p: float,
seed: int,
force_split: bool) -> np.array:
try:
_, test_idxs = train_test_split(idxs_to_split,
test_size=test_p,
random_state=seed,
shuffle=True,
stratify=np.ones_like(idxs_to_split))
except ValueError as ve:
if 'the resulting train set will be empty' in str(ve):
if force_split is False:
raise ValueError(
'Unable to stratify split on users - the ``interactions`` object contains users'
' with a single interaction. Either set ``force_split = True`` to put all users'
' with a single interaction in the training set or run'
' ``collie.utils.remove_users_with_fewer_than_n_interactions`` first.'
)
else:
test_idxs = []
return test_idxs
def _validate_val_p_and_test_p(val_p: float, test_p: float) -> None:
validate_and_test_p = val_p + test_p
if val_p >= 1 or val_p < 0:
raise ValueError('``val_p`` must be in the range [0, 1).')
if test_p >= 1 or test_p < 0:
raise ValueError('``test_p`` must be in the range [0, 1).')
if validate_and_test_p >= 1 or validate_and_test_p <= 0:
raise ValueError('The sum of ``val_p`` and ``test_p`` must be in the range (0, 1).')