Source code for recommenders.datasets.split_utils

# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.

import numpy as np
import math
import logging

from recommenders.utils.constants import DEFAULT_ITEM_COL, DEFAULT_USER_COL

logger = logging.getLogger(__name__)

try:
    from pyspark.sql import functions as F, Window
except ImportError:
    pass  # so the environment without spark doesn't break


[docs]def process_split_ratio(ratio): """Generate split ratio lists. Args: ratio (float or list): a float number that indicates split ratio or a list of float numbers that indicate split ratios (if it is a multi-split). Returns: tuple: - bool: A boolean variable multi that indicates if the splitting is multi or single. - list: A list of normalized split ratios. """ if isinstance(ratio, float): if ratio <= 0 or ratio >= 1: raise ValueError("Split ratio has to be between 0 and 1") multi = False elif isinstance(ratio, list): if any([x <= 0 for x in ratio]): raise ValueError( "All split ratios in the ratio list should be larger than 0." ) # normalize split ratios if they are not summed to 1 if math.fsum(ratio) != 1.0: ratio = [x / math.fsum(ratio) for x in ratio] multi = True else: raise TypeError("Split ratio should be either float or a list of floats.") return multi, ratio
[docs]def min_rating_filter_pandas( data, min_rating=1, filter_by="user", col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, ): """Filter rating DataFrame for each user with minimum rating. Filter rating data frame with minimum number of ratings for user/item is usually useful to generate a new data frame with warm user/item. The warmth is defined by min_rating argument. For example, a user is called warm if he has rated at least 4 items. Args: data (pandas.DataFrame): DataFrame of user-item tuples. Columns of user and item should be present in the DataFrame while other columns like rating, timestamp, etc. can be optional. min_rating (int): minimum number of ratings for user or item. filter_by (str): either "user" or "item", depending on which of the two is to filter with min_rating. col_user (str): column name of user ID. col_item (str): column name of item ID. Returns: pandas.DataFrame: DataFrame with at least columns of user and item that has been filtered by the given specifications. """ split_by_column = _get_column_name(filter_by, col_user, col_item) if min_rating < 1: raise ValueError("min_rating should be integer and larger than or equal to 1.") return data.groupby(split_by_column).filter(lambda x: len(x) >= min_rating)
[docs]def min_rating_filter_spark( data, min_rating=1, filter_by="user", col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, ): """Filter rating DataFrame for each user with minimum rating. Filter rating data frame with minimum number of ratings for user/item is usually useful to generate a new data frame with warm user/item. The warmth is defined by min_rating argument. For example, a user is called warm if he has rated at least 4 items. Args: data (pyspark.sql.DataFrame): DataFrame of user-item tuples. Columns of user and item should be present in the DataFrame while other columns like rating, timestamp, etc. can be optional. min_rating (int): minimum number of ratings for user or item. filter_by (str): either "user" or "item", depending on which of the two is to filter with min_rating. col_user (str): column name of user ID. col_item (str): column name of item ID. Returns: pyspark.sql.DataFrame: DataFrame with at least columns of user and item that has been filtered by the given specifications. """ split_by_column = _get_column_name(filter_by, col_user, col_item) if min_rating < 1: raise ValueError("min_rating should be integer and larger than or equal to 1.") if min_rating > 1: window = Window.partitionBy(split_by_column) data = ( data.withColumn("_count", F.count(split_by_column).over(window)) .where(F.col("_count") >= min_rating) .drop("_count") ) return data
def _get_column_name(name, col_user, col_item): if name == "user": return col_user elif name == "item": return col_item else: raise ValueError("name should be either 'user' or 'item'.")
[docs]def split_pandas_data_with_ratios(data, ratios, seed=42, shuffle=False): """Helper function to split pandas DataFrame with given ratios .. note:: Implementation referenced from `this source <https://stackoverflow.com/questions/38250710/how-to-split-data-into-3-sets-train-validation-and-test>`_. Args: data (pandas.DataFrame): Pandas data frame to be split. ratios (list of floats): list of ratios for split. The ratios have to sum to 1. seed (int): random seed. shuffle (bool): whether data will be shuffled when being split. Returns: list: List of pd.DataFrame split by the given specifications. """ if math.fsum(ratios) != 1.0: raise ValueError("The ratios have to sum to 1") split_index = np.cumsum(ratios).tolist()[:-1] if shuffle: data = data.sample(frac=1, random_state=seed) splits = np.split(data, [round(x * len(data)) for x in split_index]) # Add split index (this makes splitting by group more efficient). for i in range(len(ratios)): splits[i]["split_index"] = i return splits
[docs]def filter_k_core(data, core_num=0, col_user="userID", col_item="itemID"): """Filter rating dataframe for minimum number of users and items by repeatedly applying min_rating_filter until the condition is satisfied. """ num_users, num_items = len(data[col_user].unique()), len(data[col_item].unique()) logger.info("Original: %d users and %d items", num_users, num_items) df_inp = data.copy() if core_num > 0: while True: df_inp = min_rating_filter_pandas( df_inp, min_rating=core_num, filter_by="item" ) df_inp = min_rating_filter_pandas( df_inp, min_rating=core_num, filter_by="user" ) count_u = df_inp.groupby(col_user)[col_item].count() count_i = df_inp.groupby(col_item)[col_user].count() if ( len(count_i[count_i < core_num]) == 0 and len(count_u[count_u < core_num]) == 0 ): break df_inp = df_inp.sort_values(by=[col_user]) num_users = len(df_inp[col_user].unique()) num_items = len(df_inp[col_item].unique()) logger.info("Final: %d users and %d items", num_users, num_items) return df_inp