Source code for collie.utils

from datetime import datetime
import inspect
from pathlib import Path
import re
import time
from typing import Any, Dict, Iterable, List, Optional, Union

import docstring_parser
import numpy as np
import pandas as pd
import pytorch_lightning
from scipy.sparse import coo_matrix
import torch

from collie.interactions import Interactions


# helpful constants just for clearer code
NEWLINE_CHARACTER = '\n'
FOUR_SPACES = '    '
NEWLINE_CHARACTER_FOUR_SPACES = NEWLINE_CHARACTER + FOUR_SPACES


def get_random_seed() -> int:
    """Generate a random seed based on the current datetime."""
    return int(datetime.now().replace(microsecond=0).timestamp())


[docs]def create_ratings_matrix(df: pd.DataFrame, user_col: str = 'user_id', item_col: str = 'item_id', ratings_col: str = 'rating', sparse: bool = False) -> (coo_matrix, np.array): """ Helper function to convert a Pandas DataFrame to 2-dimensional matrix. Parameters ---------- df: pd.DataFrame Dataframe with columns for user IDs, item IDs, and ratings user_col: str Column name for the user IDs item_col: str Column name for the item IDs ratings_col: str Column name for the ratings column sparse: bool Whether to return data as a sparse ``coo_matrix`` (True) or np.array (False) Returns ------- ratings_matrix: np.array or scipy.sparse.coo_matrix, 2-d Data with users as rows, items as columns, and ratings as values """ if sparse: ratings_matrix = _create_sparse_ratings_matrix_helper(users=df[user_col], items=df[item_col], ratings=df[ratings_col]) else: if df[user_col].min() != 0 or df[item_col].min() != 0: raise ValueError( 'Minimum values of ``df[user_col]`` and ``df[item_col]`` must both be 0.' ) ratings_df = df.pivot(index=user_col, columns=item_col, values=ratings_col).fillna(0) ratings_matrix = ratings_df.to_numpy() return ratings_matrix
def _create_sparse_ratings_matrix_helper(users: Iterable[int], items: Iterable[int], ratings: Optional[Iterable[int]] = None, num_users: Union[int, str] = 'infer', num_items: Union[int, str] = 'infer') -> coo_matrix: """Create a sparse matrix from a series of arrays.""" num_users = _infer_num_if_needed_for_1d_array(num_users, users) num_items = _infer_num_if_needed_for_1d_array(num_items, items) if ratings is None: ratings = np.ones_like(users) return coo_matrix( (np.array(ratings), (np.array(users), np.array(items))), shape=(num_users, num_items) ) def _infer_num_if_needed_for_1d_array(num: Union[int, str], array: Iterable[int]) -> int: """Return ``num`` or, if ``None``, the maximum value of ``array`` + 1.""" if num == 'infer': num = max(array) + 1 return num
[docs]def df_to_interactions(df: pd.DataFrame, user_col: str = 'user_id', item_col: str = 'item_id', ratings_col: Optional[str] = 'rating', **kwargs) -> Interactions: """ Helper function to convert a DataFrame to an ``Interactions`` object. Parameters ---------- df: pd.DataFrame Dataframe with columns for user IDs, item IDs, and (optionally) ratings user_col: str Column name for the user IDs item_col: str Column name for the item IDs ratings_col: str Column name for the ratings column. If ``None``, will default to ratings of all 1s **kwargs Keyword arguments to pass to ``Interactions`` Returns ------- interactions: collie.interactions.Interactions """ ratings = df[ratings_col] if ratings_col is not None else None return Interactions(users=df[user_col], items=df[item_col], ratings=ratings, **kwargs)
[docs]def convert_to_implicit(explicit_df: pd.DataFrame, min_rating_to_keep: Optional[float] = 4, user_col: str = 'user_id', item_col: str = 'item_id', ratings_col: str = 'rating') -> pd.DataFrame: """ Convert explicit interactions data to implicit data. Duplicate user ID and item ID pairs will be dropped, as well as all scores that are ``< min_rating_to_keep``. All remaining interactions will have a rating of ``1``. Parameters ---------- explicit_df: pd.DataFrame Dataframe with explicit ratings in the rating column min_rating_to_keep: int Minimum rating to be considered a valid interaction ratings_col: str Column name for the ratings column Returns ------- implicit_df: pd.DataFrame Dataframe that converts all ``ratings >= min_rating_to_keep`` to 1 and drops the rest with a reset index. Note that the order of ``implicit_df`` will not be equal to ``explicit_df`` """ implicit_df = explicit_df.copy() implicit_df = ( implicit_df .sort_values(by=ratings_col) .drop_duplicates(subset=[user_col, item_col], keep='last', ignore_index=True) ) implicit_df = implicit_df.drop(implicit_df[implicit_df[ratings_col] < min_rating_to_keep].index) implicit_df[ratings_col] = 1 return implicit_df.reset_index(drop=True)
[docs]def remove_users_with_fewer_than_n_interactions(df: pd.DataFrame, min_num_of_interactions: int = 3, user_col: str = 'user_id') -> pd.DataFrame: """ Remove DataFrame rows with users who appear fewer than ``min_num_of_interactions`` times. Parameters ---------- df: pd.DataFrame min_num_of_interactions: int Minimum number of interactions a user can have while remaining in ``filtered_df`` user_col: str Column name for the user IDs Returns ------- filtered_df: pd.DataFrame """ value_counts_df = df[user_col].value_counts() return ( df[~df[user_col].isin( value_counts_df[value_counts_df < min_num_of_interactions].index )].reset_index(drop=True) )
[docs]def trunc_normal(embedding_weight: torch.tensor, mean: float = 0.0, std: float = 1.0) -> torch.tensor: """ Truncated normal initialization (approximation). Taken from FastAI: https://github.com/fastai/fastai/blob/master/fastai/layers.py """ # From https://discuss.pytorch.org/t/implementing-truncated-normal-initializer/4778/12 return embedding_weight.normal_().fmod_(2).mul_(std).add_(mean)
def get_init_arguments(exclude: Optional[Iterable[str]] = [], verbose: bool = False) -> Dict[str, Any]: """ Get all input arguments (*args and **kwargs) sent to the most-recently called method, given it is an ``__init__`` of a class. Parameters ---------- exclude: list Arguments to exclude from ``init_args``. If an argument is not found in ``init_args``, it will be ignored verbose: bool Print keys in ``exclude`` not found in ``init_args`` Returns ------- init_args: dict Argument dictionary with keys being argument names and values being arguments Note ---- If the most-recently called method is not an ``__init__`` of a class, this function will return an empty dictionary. """ frame = inspect.currentframe().f_back init_args = pytorch_lightning.utilities.parsing.get_init_args(frame) if exclude: for exclude_arg in exclude: try: del init_args[exclude_arg] except KeyError: if verbose: print(f'Key {exclude_arg} not found in ``init_args`` and will be ignored.') continue return init_args
[docs]def pandas_df_to_hdf5(df: pd.DataFrame, out_path: Union[str, Path], key: str = 'interactions') -> None: """Append a Pandas DataFrame to HDF5 using a ``table`` format and ``blosc`` compression.""" df.to_hdf(str(out_path), key=key, mode='a', append=True, format='table', complib='blosc')
[docs]def df_to_html(df: pd.DataFrame, image_cols: List[str] = [], hyperlink_cols: List[str] = [], html_tags: Dict[str, Union[str, List[str]]] = dict(), transpose: bool = False, image_width: Optional[int] = None, max_num_rows: int = 200, **kwargs) -> str: """ Convert a Pandas DataFrame to HTML. Parameters ---------- df: DataFrame DataFrame to convert to HTML image_cols: str or list Column names that contain image urls or file paths. Columns specified as images will make all other transformations to those columns be ignored. Local files will display correctly in Jupyter if specified using relative paths but not if specified using absolute paths (see https://github.com/jupyter/notebook/issues/3810). hyperlink_cols: str or list Column names that contain hyperlinks to open in a new tab html_tags: dictionary A transformation to be inserted directly into the HTML tag. Ex: ``{'col_name_1': 'strong'}`` becomes ``<strong>col_name_1</strong>`` Ex: ``{'col_name_2': 'mark'}`` becomes ``<mark>col_name_2</mark>`` Ex: ``{'col_name_3': 'h2'}`` becomes ``<h2>col_name_3</h2>`` Ex: ``{'col_name_4': ['em', 'strong']}`` becomes ``<em><strong>col_name_4</strong></em>`` transpose: bool Transpose the DataFrame before converting to HTML image_width: int Set image width for each image generated max_num_rows: int Maximum number of rows to display **kwargs: keyword arguments Additional arguments sent to ``pandas.DataFrame.to_html``, as listed in: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_html.html Returns ------- df_html: HTML DataFrame converted to a HTML string, ready for displaying Examples -------- In a Jupyter notebook: .. code-block:: python from IPython.core.display import display, HTML import pandas as pd df = pd.DataFrame({ 'item': ['Beefy FritosĀ® Burrito'], 'price': ['1.00'], 'image_url': ['https://www.tacobell.com/images/22480_beefy_fritos_burrito_269x269.jpg'], }) display( HTML( df_to_html( df, image_cols='image_url', html_tags={'item': 'strong', 'price': 'em'}, image_width=200, ) ) ) Note ---- Converted table will have CSS class 'dataframe', unless otherwise specified. """ def _wrap_cols_if_needed(cols: [str, List[str]]) -> List[str]: """Necessary for columns named with integers.""" try: iter(cols) except TypeError: cols = [cols] if isinstance(cols, str): cols = [cols] return cols if max_num_rows is None or len(df) <= max_num_rows: df = df.copy() # copy the dataframe so we don't edit the original! else: # explicit copy eliminates a warning we don't need df = df.head(max_num_rows).copy() image_cols = _wrap_cols_if_needed(image_cols) for image_col in image_cols: if image_col not in df.columns: raise ValueError('{} not a column in df!'.format(image_col)) if not image_width: df[image_col] = df[image_col].map(lambda x: f'<img src="{x}">') else: df[image_col] = df[image_col].map(lambda x: f'<img src="{x}" width={image_width}>') hyperlink_cols = _wrap_cols_if_needed(hyperlink_cols) for hyperlink_col in hyperlink_cols: if hyperlink_col not in df.columns: raise ValueError('{} not a column in df!'.format(hyperlink_col)) if hyperlink_col in image_cols: continue df[hyperlink_col] = ( df[hyperlink_col].map(lambda x: f'<a target="_blank" href="{x}">{x}</a>') ) for col, transformations in html_tags.items(): if col not in df.columns: raise ValueError(f'{col} not a column in df!') if col in image_cols: continue if isinstance(transformations, str): transformations = [transformations] opening_tag = '' for extra in transformations: opening_tag += f'<{extra}>' closing_tag = '' for extra in transformations[::-1]: closing_tag += f'</{extra}>' df[col] = df[col].map(lambda x: f'{opening_tag}{x}{closing_tag}') max_colwidth = pd.get_option('display.max_colwidth') if pd.__version__ != '0': # this option is not backwards compatible with Pandas v1.0.0 pd.set_option('display.max_colwidth', None) else: pd.set_option('display.max_colwidth', -1) if transpose: df = df.T df_html = df.to_html(escape=False, **kwargs) pd.set_option('display.max_colwidth', max_colwidth) return df_html
[docs]class Timer(object): """Class to manage timing different sections of a job.""" def __init__(self): self.start_time = time.time() self.current_time = self.start_time
[docs] def timecheck(self, message: str = 'Finished') -> float: """Get time since last timecheck.""" tmp_time = time.time() elapsed_time = (tmp_time-self.current_time)/60.0 print('{0} ({1:.2f} min)'.format(message, elapsed_time)) self.current_time = tmp_time return elapsed_time
[docs] def time_since_start(self, message: str = 'Total time') -> float: """Get time since timer was instantiated.""" total_time = (time.time() - self.start_time) / 60.0 print('{0}: {1:.2f} min'.format(message, total_time)) return total_time
def merge_docstrings(parent_class, child_docstring, child_class__init__): """ Merge docstrings for Collie models to reduce the amount of repeated, shared docstrings. This method notes the arguments of the ``child_class``'s ``__init__`` function and searches the docstrings of both the child and parent (in order) to construct the docstring for the child class. Specifically, the final docstring returned will be, in order: ``` CHILD SHORT DESCRIPTION CHILD LONG DESCRIPTION Parameters ---------- for each ``arg`` in CHILD ``__init__`` ARGUMENTS: CHILD ARGUMENT DOCSTRING (if it exists in child docstring), else PARENT ARGUMENT DOCSTRING ... POST ``Parameters`` CHILD DOCSTRING, SIGNIFIED BY A REPEATED HYPHEN SEPARATOR ``` Notes ----- * The docstring returned will be ordered with a description immediately followed by the ``Parameters`` section. * ``Returns``, ``Raises``, and ``Deprecated`` sections are currently not supported and will be filtered out in the returned docstring. * Additional sections will be returned following the ``Parameters`` section if they are noted with a line of `-` the length of the title. If not, it will be filtered out. """ # get parent class documentation parent_docstring = parent_class.__doc__ # find the line where the ``Parameters`` section begins child_docstring_list = child_docstring.split(NEWLINE_CHARACTER) child_parameters_idx = [ idx for idx, arg in enumerate(child_docstring_list) if re.search('\\sParameters\\s?$', arg) ] if len(child_parameters_idx) == 0: # no ``Parameters`` section is bad, fail early return child_docstring # parse both parent and child docstrings parent_parse = docstring_parser.numpydoc.NumpydocParser().parse(parent_docstring) child_parse = docstring_parser.numpydoc.NumpydocParser().parse(child_docstring) parent_arg_name_idx_dict = { param.arg_name: idx for idx, param in enumerate(parent_parse.params) } child_arg_name_idx_dict = { param.arg_name: idx for idx, param in enumerate(child_parse.params) } # list all arguments the child class's ``__init__`` method defines child_class_inspect_result = inspect.getfullargspec(child_class__init__) child_class_has_args = child_class_inspect_result.varargs is not None child_class_has_kwargs = child_class_inspect_result.varkw is not None child_class__init__args = child_class_inspect_result.args if child_class_has_args: child_class__init__args.append('*' + child_class_inspect_result.varargs) if child_class_has_kwargs: child_class__init__args.append('**' + child_class_inspect_result.varkw) # format the description of the class prior to the ``Parameters`` section short_description = child_parse.short_description if child_parse.short_description else '' long_description = child_parse.long_description if child_parse.long_description else '' if short_description: short_description = ( NEWLINE_CHARACTER_FOUR_SPACES + short_description.replace(NEWLINE_CHARACTER, NEWLINE_CHARACTER_FOUR_SPACES) + NEWLINE_CHARACTER ) if long_description: long_description = ( NEWLINE_CHARACTER_FOUR_SPACES + long_description.replace(NEWLINE_CHARACTER, NEWLINE_CHARACTER_FOUR_SPACES) + NEWLINE_CHARACTER * 2 ) else: long_description = NEWLINE_CHARACTER final_docstring = short_description + long_description # loop through each expected argument, check if the docstring exists in the child (preferred) # or the parent docuementation, then add that to the final docstring if len(child_class__init__args) > 0: final_docstring += ( FOUR_SPACES + 'Parameters' + NEWLINE_CHARACTER_FOUR_SPACES + '----------' + NEWLINE_CHARACTER ) for arg in child_class__init__args: if arg in child_arg_name_idx_dict: param_idx = child_arg_name_idx_dict[arg] param = child_parse.params[param_idx] elif arg in parent_arg_name_idx_dict: param_idx = parent_arg_name_idx_dict[arg] param = parent_parse.params[param_idx] else: # argument isn't in the docstring, we can skip it continue arg_name = param.arg_name if param.arg_name else '' type_name = ': ' + param.type_name if param.type_name else '' description = param.description if param.description else '' final_docstring += ( f'{FOUR_SPACES}{arg_name.strip()}{type_name.strip()}' f'{NEWLINE_CHARACTER}' ) if description != '': final_docstring += FOUR_SPACES + FOUR_SPACES + ( description .strip() .replace(NEWLINE_CHARACTER, NEWLINE_CHARACTER_FOUR_SPACES + FOUR_SPACES) ) + NEWLINE_CHARACTER # add in the rest of the docstring post-``Parameters`` section for x in child_parse.meta: if type(x) == docstring_parser.DocstringMeta: final_docstring += ( NEWLINE_CHARACTER_FOUR_SPACES + x.args[0].title() + NEWLINE_CHARACTER_FOUR_SPACES + '-' * len(x.args[0]) + NEWLINE_CHARACTER_FOUR_SPACES ) if x.description != '': final_docstring += ( x.description .strip() .replace(NEWLINE_CHARACTER, NEWLINE_CHARACTER_FOUR_SPACES) ) + NEWLINE_CHARACTER # replace lines that are just spaces with a newline character only final_docstring = re.sub(r'\n(\s)*\n', NEWLINE_CHARACTER + NEWLINE_CHARACTER, final_docstring) final_docstring += NEWLINE_CHARACTER_FOUR_SPACES return final_docstring