# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.
import numpy as np
import math
import logging
from recommenders.utils.constants import DEFAULT_ITEM_COL, DEFAULT_USER_COL
logger = logging.getLogger(__name__)
try:
from pyspark.sql import functions as F, Window
except ImportError:
pass # so the environment without spark doesn't break
[docs]def process_split_ratio(ratio):
"""Generate split ratio lists.
Args:
ratio (float or list): a float number that indicates split ratio or a list of float
numbers that indicate split ratios (if it is a multi-split).
Returns:
tuple:
- bool: A boolean variable multi that indicates if the splitting is multi or single.
- list: A list of normalized split ratios.
"""
if isinstance(ratio, float):
if ratio <= 0 or ratio >= 1:
raise ValueError("Split ratio has to be between 0 and 1")
multi = False
elif isinstance(ratio, list):
if any([x <= 0 for x in ratio]):
raise ValueError(
"All split ratios in the ratio list should be larger than 0."
)
# normalize split ratios if they are not summed to 1
if math.fsum(ratio) != 1.0:
ratio = [x / math.fsum(ratio) for x in ratio]
multi = True
else:
raise TypeError("Split ratio should be either float or a list of floats.")
return multi, ratio
[docs]def min_rating_filter_pandas(
data,
min_rating=1,
filter_by="user",
col_user=DEFAULT_USER_COL,
col_item=DEFAULT_ITEM_COL,
):
"""Filter rating DataFrame for each user with minimum rating.
Filter rating data frame with minimum number of ratings for user/item is usually useful to
generate a new data frame with warm user/item. The warmth is defined by min_rating argument. For
example, a user is called warm if he has rated at least 4 items.
Args:
data (pandas.DataFrame): DataFrame of user-item tuples. Columns of user and item
should be present in the DataFrame while other columns like rating,
timestamp, etc. can be optional.
min_rating (int): minimum number of ratings for user or item.
filter_by (str): either "user" or "item", depending on which of the two is to
filter with min_rating.
col_user (str): column name of user ID.
col_item (str): column name of item ID.
Returns:
pandas.DataFrame: DataFrame with at least columns of user and item that has been filtered by the given specifications.
"""
split_by_column = _get_column_name(filter_by, col_user, col_item)
if min_rating < 1:
raise ValueError("min_rating should be integer and larger than or equal to 1.")
return data.groupby(split_by_column).filter(lambda x: len(x) >= min_rating)
[docs]def min_rating_filter_spark(
data,
min_rating=1,
filter_by="user",
col_user=DEFAULT_USER_COL,
col_item=DEFAULT_ITEM_COL,
):
"""Filter rating DataFrame for each user with minimum rating.
Filter rating data frame with minimum number of ratings for user/item is usually useful to
generate a new data frame with warm user/item. The warmth is defined by min_rating argument. For
example, a user is called warm if he has rated at least 4 items.
Args:
data (pyspark.sql.DataFrame): DataFrame of user-item tuples. Columns of user and item
should be present in the DataFrame while other columns like rating,
timestamp, etc. can be optional.
min_rating (int): minimum number of ratings for user or item.
filter_by (str): either "user" or "item", depending on which of the two is to
filter with min_rating.
col_user (str): column name of user ID.
col_item (str): column name of item ID.
Returns:
pyspark.sql.DataFrame: DataFrame with at least columns of user and item that has been filtered by the given specifications.
"""
split_by_column = _get_column_name(filter_by, col_user, col_item)
if min_rating < 1:
raise ValueError("min_rating should be integer and larger than or equal to 1.")
if min_rating > 1:
window = Window.partitionBy(split_by_column)
data = (
data.withColumn("_count", F.count(split_by_column).over(window))
.where(F.col("_count") >= min_rating)
.drop("_count")
)
return data
def _get_column_name(name, col_user, col_item):
if name == "user":
return col_user
elif name == "item":
return col_item
else:
raise ValueError("name should be either 'user' or 'item'.")
[docs]def split_pandas_data_with_ratios(data, ratios, seed=42, shuffle=False):
"""Helper function to split pandas DataFrame with given ratios
.. note::
Implementation referenced from `this source <https://stackoverflow.com/questions/38250710/how-to-split-data-into-3-sets-train-validation-and-test>`_.
Args:
data (pandas.DataFrame): Pandas data frame to be split.
ratios (list of floats): list of ratios for split. The ratios have to sum to 1.
seed (int): random seed.
shuffle (bool): whether data will be shuffled when being split.
Returns:
list: List of pd.DataFrame split by the given specifications.
"""
if math.fsum(ratios) != 1.0:
raise ValueError("The ratios have to sum to 1")
split_index = np.cumsum(ratios).tolist()[:-1]
if shuffle:
data = data.sample(frac=1, random_state=seed)
splits = np.split(data, [round(x * len(data)) for x in split_index])
# Add split index (this makes splitting by group more efficient).
for i in range(len(ratios)):
splits[i]["split_index"] = i
return splits
[docs]def filter_k_core(data, core_num=0, col_user="userID", col_item="itemID"):
"""Filter rating dataframe for minimum number of users and items by
repeatedly applying min_rating_filter until the condition is satisfied.
"""
num_users, num_items = len(data[col_user].unique()), len(data[col_item].unique())
logger.info("Original: %d users and %d items", num_users, num_items)
df_inp = data.copy()
if core_num > 0:
while True:
df_inp = min_rating_filter_pandas(
df_inp, min_rating=core_num, filter_by="item"
)
df_inp = min_rating_filter_pandas(
df_inp, min_rating=core_num, filter_by="user"
)
count_u = df_inp.groupby(col_user)[col_item].count()
count_i = df_inp.groupby(col_item)[col_user].count()
if (
len(count_i[count_i < core_num]) == 0
and len(count_u[count_u < core_num]) == 0
):
break
df_inp = df_inp.sort_values(by=[col_user])
num_users = len(df_inp[col_user].unique())
num_items = len(df_inp[col_item].unique())
logger.info("Final: %d users and %d items", num_users, num_items)
return df_inp