# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.
import random
import numpy as np
import pandas as pd
import scipy.sparse as sp
from recommenders.utils.constants import (
DEFAULT_ITEM_COL,
DEFAULT_USER_COL,
DEFAULT_RATING_COL,
DEFAULT_PREDICTION_COL,
)
[docs]class ImplicitCF(object):
"""Data processing class for GCN models which use implicit feedback.
Initialize train and test set, create normalized adjacency matrix and sample data for training epochs.
"""
[docs] def __init__(
self,
train,
test=None,
adj_dir=None,
col_user=DEFAULT_USER_COL,
col_item=DEFAULT_ITEM_COL,
col_rating=DEFAULT_RATING_COL,
col_prediction=DEFAULT_PREDICTION_COL,
seed=None,
):
"""Constructor
Args:
adj_dir (str): Directory to save / load adjacency matrices. If it is None, adjacency
matrices will be created and will not be saved.
train (pandas.DataFrame): Training data with at least columns (col_user, col_item, col_rating).
test (pandas.DataFrame): Test data with at least columns (col_user, col_item, col_rating).
test can be None, if so, we only process the training data.
col_user (str): User column name.
col_item (str): Item column name.
col_rating (str): Rating column name.
seed (int): Seed.
"""
self.user_idx = None
self.item_idx = None
self.adj_dir = adj_dir
self.col_user = col_user
self.col_item = col_item
self.col_rating = col_rating
self.col_prediction = col_prediction
self.train, self.test = self._data_processing(train, test)
self._init_train_data()
random.seed(seed)
def _data_processing(self, train, test):
"""Process the dataset to reindex userID and itemID and only keep records with ratings greater than 0.
Args:
train (pandas.DataFrame): Training data with at least columns (col_user, col_item, col_rating).
test (pandas.DataFrame): Test data with at least columns (col_user, col_item, col_rating).
test can be None, if so, we only process the training data.
Returns:
list: train and test pandas.DataFrame Dataset, which have been reindexed and filtered.
"""
df = (
train
if test is None
else pd.concat([train, test], axis=0, ignore_index=True)
)
if self.user_idx is None:
user_idx = df[[self.col_user]].drop_duplicates().reindex()
user_idx[self.col_user + "_idx"] = np.arange(len(user_idx))
self.n_users = len(user_idx)
self.user_idx = user_idx
self.user2id = dict(
zip(user_idx[self.col_user], user_idx[self.col_user + "_idx"])
)
self.id2user = dict(
zip(user_idx[self.col_user + "_idx"], user_idx[self.col_user])
)
if self.item_idx is None:
item_idx = df[[self.col_item]].drop_duplicates()
item_idx[self.col_item + "_idx"] = np.arange(len(item_idx))
self.n_items = len(item_idx)
self.item_idx = item_idx
self.item2id = dict(
zip(item_idx[self.col_item], item_idx[self.col_item + "_idx"])
)
self.id2item = dict(
zip(item_idx[self.col_item + "_idx"], item_idx[self.col_item])
)
return self._reindex(train), self._reindex(test)
def _reindex(self, df):
"""Process the dataset to reindex userID and itemID and only keep records with ratings greater than 0.
Args:
df (pandas.DataFrame): dataframe with at least columns (col_user, col_item, col_rating).
Returns:
list: train and test pandas.DataFrame Dataset, which have been reindexed and filtered.
"""
if df is None:
return None
df = pd.merge(df, self.user_idx, on=self.col_user, how="left")
df = pd.merge(df, self.item_idx, on=self.col_item, how="left")
df = df[df[self.col_rating] > 0]
df_reindex = df[
[self.col_user + "_idx", self.col_item + "_idx", self.col_rating]
]
df_reindex.columns = [self.col_user, self.col_item, self.col_rating]
return df_reindex
def _init_train_data(self):
"""Record items interated with each user in a dataframe self.interact_status, and create adjacency
matrix self.R.
"""
self.interact_status = (
self.train.groupby(self.col_user)[self.col_item]
.apply(set)
.reset_index()
.rename(columns={self.col_item: self.col_item + "_interacted"})
)
self.R = sp.dok_matrix((self.n_users, self.n_items), dtype=np.float32)
self.R[self.train[self.col_user], self.train[self.col_item]] = 1.0
[docs] def get_norm_adj_mat(self):
"""Load normalized adjacency matrix if it exists, otherwise create (and save) it.
Returns:
scipy.sparse.csr_matrix: Normalized adjacency matrix.
"""
try:
if self.adj_dir is None:
raise FileNotFoundError
norm_adj_mat = sp.load_npz(self.adj_dir + "/norm_adj_mat.npz")
print("Already load norm adj matrix.")
except FileNotFoundError:
norm_adj_mat = self.create_norm_adj_mat()
if self.adj_dir is not None:
sp.save_npz(self.adj_dir + "/norm_adj_mat.npz", norm_adj_mat)
return norm_adj_mat
[docs] def create_norm_adj_mat(self):
"""Create normalized adjacency matrix.
Returns:
scipy.sparse.csr_matrix: Normalized adjacency matrix.
"""
adj_mat = sp.dok_matrix(
(self.n_users + self.n_items, self.n_users + self.n_items), dtype=np.float32
)
adj_mat = adj_mat.tolil()
R = self.R.tolil()
adj_mat[: self.n_users, self.n_users :] = R
adj_mat[self.n_users :, : self.n_users] = R.T
adj_mat = adj_mat.todok()
print("Already create adjacency matrix.")
rowsum = np.array(adj_mat.sum(1))
d_inv = np.power(rowsum + 1e-9, -0.5).flatten()
d_inv[np.isinf(d_inv)] = 0.0
d_mat_inv = sp.diags(d_inv)
norm_adj_mat = d_mat_inv.dot(adj_mat)
norm_adj_mat = norm_adj_mat.dot(d_mat_inv)
print("Already normalize adjacency matrix.")
return norm_adj_mat.tocsr()
[docs] def train_loader(self, batch_size):
"""Sample train data every batch. One positive item and one negative item sampled for each user.
Args:
batch_size (int): Batch size of users.
Returns:
numpy.ndarray, numpy.ndarray, numpy.ndarray:
- Sampled users.
- Sampled positive items.
- Sampled negative items.
"""
def sample_neg(x):
while True:
neg_id = random.randint(0, self.n_items - 1)
if neg_id not in x:
return neg_id
indices = range(self.n_users)
if self.n_users < batch_size:
users = [random.choice(indices) for _ in range(batch_size)]
else:
users = random.sample(indices, batch_size)
interact = self.interact_status.iloc[users]
pos_items = interact[self.col_item + "_interacted"].apply(
lambda x: random.choice(list(x))
)
neg_items = interact[self.col_item + "_interacted"].apply(
lambda x: sample_neg(x)
)
return np.array(users), np.array(pos_items), np.array(neg_items)