Source code for recommenders.models.ncf.dataset

# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.

import os
from collections import OrderedDict
import random
import numpy as np
import pandas as pd
import csv
import logging
from tqdm import tqdm

from recommenders.utils.constants import (
    DEFAULT_ITEM_COL,
    DEFAULT_USER_COL,
    DEFAULT_RATING_COL,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


[docs]class EmptyFileException(Exception): """Exception raised if file is empty"""
[docs]class MissingFieldsException(Exception): """Exception raised if file is missing expected fields"""
[docs]class FileNotSortedException(Exception): """Exception raised if file is not sorted correctly"""
[docs]class MissingUserException(Exception): """Exception raised if user is not in file"""
[docs]class DataFile: """ DataFile class for NCF. Iterator to read data from a csv file. Data must be sorted by user. Includes utilities for loading user data from file, formatting it and returning a Pandas dataframe. """
[docs] def __init__( self, filename, col_user, col_item, col_rating, col_test_batch=None, binary=True ): """Constructor Args: filename (str): Path to file to be processed. col_user (str): User column name. col_item (str): Item column name. col_rating (str): Rating column name. col_test_batch (str): Test batch column name. binary (bool): If true, set rating > 0 to rating = 1. """ self.filename = filename self.col_user = col_user self.col_item = col_item self.col_rating = col_rating self.col_test_batch = col_test_batch self.expected_fields = [self.col_user, self.col_item, self.col_rating] if self.col_test_batch is not None: self.expected_fields.append(self.col_test_batch) self.binary = binary self._init_data() self.id2user = {self.user2id[k]: k for k in self.user2id} self.id2item = {self.item2id[k]: k for k in self.item2id}
@property def users(self): return self.user2id.keys() @property def items(self): return self.item2id.keys() @property def end_of_file(self): return (self.line_num > 0) and self.next_row is None def __iter__(self): return self def __enter__(self, *args): self.file = open(self.filename, "r", encoding="UTF8") self.reader = csv.DictReader(self.file) self._check_for_missing_fields(self.expected_fields) self.line_num = 0 self.row, self.next_row = None, None return self def __exit__(self, *args): self.file.close() self.reader = None self.line_num = 0 self.row, self.next_row = None, None def __next__(self): if self.next_row: self.row = self.next_row elif self.line_num == 0: self.row = self._extract_row_data(next(self.reader, None)) if self.row is None: raise EmptyFileException("{} is empty.".format(self.filename)) else: raise StopIteration # end of file self.next_row = self._extract_row_data(next(self.reader, None)) self.line_num += 1 return self.row def _check_for_missing_fields(self, fields_to_check): missing_fields = set(fields_to_check).difference(set(self.reader.fieldnames)) if len(missing_fields): raise MissingFieldsException( "Columns {} not in header of file {}".format( missing_fields, self.filename ) ) def _extract_row_data(self, row): if row is None: return row user = int(row[self.col_user]) item = int(row[self.col_item]) rating = float(row[self.col_rating]) if self.binary: rating = float(rating > 0) test_batch = None if self.col_test_batch: test_batch = int(row[self.col_test_batch]) return { self.col_user: user, self.col_item: item, self.col_rating: rating, self.col_test_batch: test_batch, } def _init_data(self): # Compile lists of unique users and items, assign IDs to users and items, # and ensure file is sorted by user (and batch index if test set) logger.info("Indexing {} ...".format(self.filename)) with self: user_items = [] self.item2id, self.user2id = OrderedDict(), OrderedDict() batch_index = 0 for _ in self: item = self.row[self.col_item] user = self.row[self.col_user] test_batch = self.row[self.col_test_batch] if not self.end_of_file: next_user = self.next_row[self.col_user] next_test_batch = self.next_row[self.col_test_batch] if item not in self.items: self.item2id[item] = len(self.item2id) user_items.append(item) if (next_user != user) or self.next_row is None: if not self.end_of_file: if next_user in self.users: raise FileNotSortedException( "File {} is not sorted by user".format(self.filename) ) self.user2id[user] = len(self.user2id) if self.col_test_batch: if (next_test_batch != test_batch) or self.next_row is None: if not self.end_of_file: if next_test_batch < batch_index: raise FileNotSortedException( "File {} is not sorted by {}".format( self.filename, self.col_test_batch ) ) batch_index += 1 self.batch_indices_range = range(0, batch_index) self.data_len = self.line_num
[docs] def load_data(self, key, by_user=True): """Load data for a specified user or test batch Args: key (int): user or test batch index by_user (bool): load data by usr if True, else by test batch Returns: pandas.DataFrame """ records = [] key_col = self.col_user if by_user else self.col_test_batch # fast forward in file to user/test batch while (self.line_num == 0) or (self.row[key_col] != key): if self.end_of_file: raise MissingUserException( "User {} not in file {}".format(key, self.filename) ) next(self) # collect user/test batch data while self.row[key_col] == key: row = self.row if self.col_test_batch in row: del row[self.col_test_batch] records.append(row) if not self.end_of_file: next(self) else: break return pd.DataFrame.from_records(records)
[docs]class NegativeSampler: """NegativeSampler class for NCF. Samples a subset of negative items from a given population of items."""
[docs] def __init__( self, user, n_samples, user_positive_item_pool, item_pool, sample_with_replacement, print_warnings=True, training=True, ): """Constructor Args: user (str or int): User to be sampled for. n_samples (int): Number of required samples. user_positive_item_pool (set): Set of items with which user has previously interacted. item_pool (set): Set of all items in population. sample_with_replacement (bool): If true, sample negative examples with replacement, otherwise without replacement. print_warnings (bool): If true, prints warnings if sampling without replacement and there are not enough items to sample from to satisfy n_neg or n_neg_test. training (bool): Set to true if sampling for the training set or false if for the test set. """ self.user = user self.n_samples = n_samples self.user_positive_item_pool = user_positive_item_pool self.item_pool = item_pool self.sample_with_replacement = sample_with_replacement self.print_warnings = print_warnings self.training = training self.user_negative_item_pool = self._get_user_negatives_pool() self.population_size = len(self.user_negative_item_pool) self._sample = ( self._sample_negatives_with_replacement if self.sample_with_replacement else self._sample_negatives_without_replacement ) if not self.sample_with_replacement: self._check_sample_size()
[docs] def sample(self): """Method for sampling uniformly from a population of negative items Returns: list """ return self._sample()
def _get_user_negatives_pool(self): # get list of items user has not interacted with return list(set(self.item_pool) - self.user_positive_item_pool) def _sample_negatives_with_replacement(self): return random.choices(self.user_negative_item_pool, k=self.n_samples) def _sample_negatives_without_replacement(self): return random.sample(self.user_negative_item_pool, k=self.n_samples) def _check_sample_size(self): # if sampling without replacement, check sample population is sufficient and reduce # n_samples if not. n_neg_var = "n_neg" if self.training else "n_neg_test" dataset_name = "training" if self.training else "test" k = min(self.n_samples, self.population_size) if k < self.n_samples and self.print_warnings: warning_string = ( "The population of negative items to sample from is too small for user {}. " "Samples needed = {}, negative items = {}. " "Reducing samples to {} for this user." "If an equal number of negative samples for each user is required in the {} set, sample with replacement or reduce {}. " "This warning can be turned off by setting print_warnings=False".format( self.user, self.n_samples, self.population_size, self.population_size, dataset_name, n_neg_var, ) ) logging.warning(warning_string) self.n_samples = k
[docs]class Dataset(object): """Dataset class for NCF"""
[docs] def __init__( self, train_file, test_file=None, test_file_full=None, overwrite_test_file_full=False, n_neg=4, n_neg_test=100, col_user=DEFAULT_USER_COL, col_item=DEFAULT_ITEM_COL, col_rating=DEFAULT_RATING_COL, binary=True, seed=None, sample_with_replacement=False, print_warnings=False, ): """Constructor Args: train_file (str): Path to training dataset file. test_file (str): Path to test dataset file for leave-one-out evaluation. test_file_full (str): Path to full test dataset file including negative samples. overwrite_test_file_full (bool): If true, recreate and overwrite test_file_full. n_neg (int): Number of negative samples per positive example for training set. n_neg_test (int): Number of negative samples per positive example for test set. col_user (str): User column name. col_item (str): Item column name. col_rating (str): Rating column name. binary (bool): If true, set rating > 0 to rating = 1. seed (int): Seed. sample_with_replacement (bool): If true, sample negative examples with replacement, otherwise without replacement. print_warnings (bool): If true, prints warnings if sampling without replacement and there are not enough items to sample from to satisfy n_neg or n_neg_test. """ self.train_file = train_file self.test_file = test_file self.test_file_full = test_file_full self.overwrite_test_file_full = overwrite_test_file_full self.n_neg = n_neg self.n_neg_test = n_neg_test self.col_user = col_user self.col_item = col_item self.col_rating = col_rating self.binary = binary self.sample_with_replacement = sample_with_replacement self.print_warnings = print_warnings self.col_test_batch = "test_batch" self.train_datafile = DataFile( filename=self.train_file, col_user=self.col_user, col_item=self.col_item, col_rating=self.col_rating, binary=self.binary, ) self.n_users = len(self.train_datafile.users) self.n_items = len(self.train_datafile.items) self.user2id = self.train_datafile.user2id self.item2id = self.train_datafile.item2id self.id2user = self.train_datafile.id2user self.id2item = self.train_datafile.id2item self.train_len = self.train_datafile.data_len if self.test_file is not None: self.test_datafile = DataFile( filename=self.test_file, col_user=self.col_user, col_item=self.col_item, col_rating=self.col_rating, binary=self.binary, ) if self.test_file_full is None: self.test_file_full = os.path.splitext(self.test_file)[0] + "_full.csv" if self.overwrite_test_file_full or not os.path.isfile(self.test_file_full): self._create_test_file() self.test_full_datafile = DataFile( filename=self.test_file_full, col_user=self.col_user, col_item=self.col_item, col_rating=self.col_rating, col_test_batch=self.col_test_batch, binary=self.binary, ) # set random seed random.seed(seed)
def _create_negative_examples_df(self, user, user_negative_samples): # create dataframe containing negative examples for user assigned zero rating n_samples = len(user_negative_samples) return pd.DataFrame( { self.col_user: [user] * n_samples, self.col_item: user_negative_samples, self.col_rating: [0.0] * n_samples, } ) def _create_test_file(self): logger.info( "Creating full leave-one-out test file {} ...".format(self.test_file_full) ) # create empty csv pd.DataFrame( columns=[self.col_user, self.col_item, self.col_rating, self.col_test_batch] ).to_csv(self.test_file_full, index=False) batch_idx = 0 with self.train_datafile as train_datafile: with self.test_datafile as test_datafile: for user in tqdm(test_datafile.users): if user in train_datafile.users: user_test_data = test_datafile.load_data(user) user_train_data = train_datafile.load_data(user) # for leave-one-out evaluation, exclude items seen in both training and test sets # when sampling negatives user_positive_item_pool = set( user_test_data[self.col_item].unique() ).union(user_train_data[self.col_item].unique()) sampler = NegativeSampler( user, self.n_neg_test, user_positive_item_pool, self.train_datafile.items, self.sample_with_replacement, self.print_warnings, training=False, ) user_examples_dfs = [] # sample n_neg_test negatives for each positive example and assign a batch index for positive_example in np.array_split( user_test_data, user_test_data.shape[0] ): negative_examples = self._create_negative_examples_df( user, sampler.sample() ) examples = pd.concat([positive_example, negative_examples]) examples[self.col_test_batch] = batch_idx user_examples_dfs.append(examples) batch_idx += 1 # append user test data to file user_examples = pd.concat(user_examples_dfs) user_examples.to_csv( self.test_file_full, mode="a", index=False, header=False ) def _split_into_batches(self, shuffle_buffer, batch_size): for i in range(0, len(shuffle_buffer), batch_size): yield shuffle_buffer[i : i + batch_size] def _prepare_batch_with_id(self, batch): return [ [self.user2id[user] for user in batch[self.col_user].values], [self.item2id[item] for item in batch[self.col_item].values], batch[self.col_rating].values.tolist(), ] def _prepare_batch_without_id(self, batch): return [ batch[self.col_user].values.tolist(), batch[self.col_item].values.tolist(), batch[self.col_rating].values.tolist(), ] def _release_shuffle_buffer( self, shuffle_buffer, batch_size, yield_id, write_to=None ): prepare_batch = ( self._prepare_batch_with_id if yield_id else self._prepare_batch_without_id ) shuffle_buffer_df = pd.concat(shuffle_buffer) shuffle_buffer_df = shuffle_buffer_df.sample( shuffle_buffer_df.shape[0] ) # shuffle the buffer for batch in self._split_into_batches(shuffle_buffer_df, batch_size): if batch.shape[0] == batch_size: if write_to: batch.to_csv(write_to, mode="a", header=False, index=False) yield prepare_batch(batch) else: return batch
[docs] def train_loader( self, batch_size, shuffle_size=None, yield_id=False, write_to=None ): """ Generator for serving batches of training data. Positive examples are loaded from the original training file, to which negative samples are added. Data is loaded in memory into a shuffle buffer up to a maximum of shuffle_size rows, before the data is shuffled and released. If out-of-memory errors are encountered, try reducing shuffle_size. Args: batch_size (int): Number of examples in each batch. shuffle_size (int): Maximum number of examples in shuffle buffer. yield_id (bool): If true, return assigned user and item IDs, else return original values. write_to (str): Path of file to write full dataset (including negative examples). Returns: list """ # if shuffle_size not supplied, use (estimated) full data size i.e. complete in-memory shuffle if shuffle_size is None: shuffle_size = self.train_len * (self.n_neg + 1) if write_to: pd.DataFrame( columns=[self.col_user, self.col_item, self.col_rating] ).to_csv(write_to, header=True, index=False) shuffle_buffer = [] with self.train_datafile as train_datafile: for user in train_datafile.users: user_positive_examples = train_datafile.load_data(user) user_positive_item_pool = set( user_positive_examples[self.col_item].unique() ) n_samples = self.n_neg * user_positive_examples.shape[0] sampler = NegativeSampler( user, n_samples, user_positive_item_pool, self.train_datafile.items, self.sample_with_replacement, self.print_warnings, ) user_negative_examples = self._create_negative_examples_df( user, sampler.sample() ) user_examples = pd.concat( [user_positive_examples, user_negative_examples] ) shuffle_buffer.append(user_examples) shuffle_buffer_len = sum([df.shape[0] for df in shuffle_buffer]) if shuffle_buffer_len >= shuffle_size: buffer_remainder = yield from self._release_shuffle_buffer( shuffle_buffer, batch_size, yield_id, write_to ) shuffle_buffer = ( [buffer_remainder] if buffer_remainder is not None else [] ) # yield remaining buffer yield from self._release_shuffle_buffer( shuffle_buffer, batch_size, yield_id, write_to )
[docs] def test_loader(self, yield_id=False): """Generator for serving batches of test data for leave-one-out evaluation. Data is loaded from test_file_full. Args: yield_id (bool): If true, return assigned user and item IDs, else return original values. Returns: list """ prepare_batch = ( self._prepare_batch_with_id if yield_id else self._prepare_batch_without_id ) with self.test_full_datafile as test_full_datafile: for test_batch_idx in test_full_datafile.batch_indices_range: test_batch_data = test_full_datafile.load_data( test_batch_idx, by_user=False ) yield prepare_batch(test_batch_data)