Source code for recommenders.models.deeprec.io.sequential_iterator

# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.

import tensorflow as tf
import numpy as np
import random

from recommenders.models.deeprec.io.iterator import BaseIterator
from recommenders.models.deeprec.deeprec_utils import load_dict


__all__ = ["SequentialIterator"]


[docs]class SequentialIterator(BaseIterator):
[docs] def __init__(self, hparams, graph, col_spliter="\t"): """Initialize an iterator. Create necessary placeholders for the model. Args: hparams (object): Global hyper-parameters. Some key settings such as #_feature and #_field are there. graph (object): The running graph. All created placeholder will be added to this graph. col_spliter (str): Column splitter in one line. """ self.col_spliter = col_spliter user_vocab, item_vocab, cate_vocab = ( hparams.user_vocab, hparams.item_vocab, hparams.cate_vocab, ) self.userdict, self.itemdict, self.catedict = ( load_dict(user_vocab), load_dict(item_vocab), load_dict(cate_vocab), ) self.max_seq_length = hparams.max_seq_length self.batch_size = hparams.batch_size self.iter_data = dict() self.graph = graph with self.graph.as_default(): self.labels = tf.compat.v1.placeholder(tf.float32, [None, 1], name="label") self.users = tf.compat.v1.placeholder(tf.int32, [None], name="users") self.items = tf.compat.v1.placeholder(tf.int32, [None], name="items") self.cates = tf.compat.v1.placeholder(tf.int32, [None], name="cates") self.item_history = tf.compat.v1.placeholder( tf.int32, [None, self.max_seq_length], name="item_history" ) self.item_cate_history = tf.compat.v1.placeholder( tf.int32, [None, self.max_seq_length], name="item_cate_history" ) self.mask = tf.compat.v1.placeholder( tf.int32, [None, self.max_seq_length], name="mask" ) self.time = tf.compat.v1.placeholder(tf.float32, [None], name="time") self.time_diff = tf.compat.v1.placeholder( tf.float32, [None, self.max_seq_length], name="time_diff" ) self.time_from_first_action = tf.compat.v1.placeholder( tf.float32, [None, self.max_seq_length], name="time_from_first_action" ) self.time_to_now = tf.compat.v1.placeholder( tf.float32, [None, self.max_seq_length], name="time_to_now" )
[docs] def parse_file(self, input_file): """Parse the file to A list ready to be used for downstream tasks. Args: input_file: One of train, valid or test file which has never been parsed. Returns: list: A list with parsing result. """ with open(input_file, "r") as f: lines = f.readlines() res = [] for line in lines: if not line: continue res.append(self.parser_one_line(line)) return res
[docs] def parser_one_line(self, line): """Parse one string line into feature values. Args: line (str): a string indicating one instance. This string contains tab-separated values including: label, user_hash, item_hash, item_cate, operation_time, item_history_sequence, item_cate_history_sequence, and time_history_sequence. Returns: list: Parsed results including `label`, `user_id`, `item_id`, `item_cate`, `item_history_sequence`, `cate_history_sequence`, `current_time`, `time_diff`, `time_from_first_action`, `time_to_now`. """ words = line.strip().split(self.col_spliter) label = int(words[0]) user_id = self.userdict[words[1]] if words[1] in self.userdict else 0 item_id = self.itemdict[words[2]] if words[2] in self.itemdict else 0 item_cate = self.catedict[words[3]] if words[3] in self.catedict else 0 current_time = float(words[4]) item_history_sequence = [] cate_history_sequence = [] time_history_sequence = [] item_history_words = words[5].strip().split(",") for item in item_history_words: item_history_sequence.append( self.itemdict[item] if item in self.itemdict else 0 ) cate_history_words = words[6].strip().split(",") for cate in cate_history_words: cate_history_sequence.append( self.catedict[cate] if cate in self.catedict else 0 ) time_history_words = words[7].strip().split(",") time_history_sequence = [float(i) for i in time_history_words] time_range = 3600 * 24 time_diff = [] for i in range(len(time_history_sequence) - 1): diff = ( time_history_sequence[i + 1] - time_history_sequence[i] ) / time_range diff = max(diff, 0.5) time_diff.append(diff) last_diff = (current_time - time_history_sequence[-1]) / time_range last_diff = max(last_diff, 0.5) time_diff.append(last_diff) time_diff = np.log(time_diff) time_from_first_action = [] first_time = time_history_sequence[0] time_from_first_action = [ (t - first_time) / time_range for t in time_history_sequence[1:] ] time_from_first_action = [max(t, 0.5) for t in time_from_first_action] last_diff = (current_time - first_time) / time_range last_diff = max(last_diff, 0.5) time_from_first_action.append(last_diff) time_from_first_action = np.log(time_from_first_action) time_to_now = [] time_to_now = [(current_time - t) / time_range for t in time_history_sequence] time_to_now = [max(t, 0.5) for t in time_to_now] time_to_now = np.log(time_to_now) return ( label, user_id, item_id, item_cate, item_history_sequence, cate_history_sequence, current_time, time_diff, time_from_first_action, time_to_now, )
[docs] def load_data_from_file(self, infile, batch_num_ngs=0, min_seq_length=1): """Read and parse data from a file. Args: infile (str): Text input file. Each line in this file is an instance. batch_num_ngs (int): The number of negative sampling here in batch. 0 represents that there is no need to do negative sampling here. min_seq_length (int): The minimum number of a sequence length. Sequences with length lower than min_seq_length will be ignored. Yields: object: An iterator that yields parsed results, in the format of graph `feed_dict`. """ label_list = [] user_list = [] item_list = [] item_cate_list = [] item_history_batch = [] item_cate_history_batch = [] time_list = [] time_diff_list = [] time_from_first_action_list = [] time_to_now_list = [] cnt = 0 if infile not in self.iter_data: lines = self.parse_file(infile) self.iter_data[infile] = lines else: lines = self.iter_data[infile] if batch_num_ngs > 0: random.shuffle(lines) for line in lines: if not line: continue ( label, user_id, item_id, item_cate, item_history_sequence, item_cate_history_sequence, current_time, time_diff, time_from_first_action, time_to_now, ) = line if len(item_history_sequence) < min_seq_length: continue label_list.append(label) user_list.append(user_id) item_list.append(item_id) item_cate_list.append(item_cate) item_history_batch.append(item_history_sequence) item_cate_history_batch.append(item_cate_history_sequence) time_list.append(current_time) time_diff_list.append(time_diff) time_from_first_action_list.append(time_from_first_action) time_to_now_list.append(time_to_now) cnt += 1 if cnt == self.batch_size: res = self._convert_data( label_list, user_list, item_list, item_cate_list, item_history_batch, item_cate_history_batch, time_list, time_diff_list, time_from_first_action_list, time_to_now_list, batch_num_ngs, ) batch_input = self.gen_feed_dict(res) yield batch_input if batch_input else None label_list = [] user_list = [] item_list = [] item_cate_list = [] item_history_batch = [] item_cate_history_batch = [] time_list = [] time_diff_list = [] time_from_first_action_list = [] time_to_now_list = [] cnt = 0 if cnt > 0: res = self._convert_data( label_list, user_list, item_list, item_cate_list, item_history_batch, item_cate_history_batch, time_list, time_diff_list, time_from_first_action_list, time_to_now_list, batch_num_ngs, ) batch_input = self.gen_feed_dict(res) yield batch_input if batch_input else None
def _convert_data( self, label_list, user_list, item_list, item_cate_list, item_history_batch, item_cate_history_batch, time_list, time_diff_list, time_from_first_action_list, time_to_now_list, batch_num_ngs, ): """Convert data into numpy arrays that are good for further model operation. Args: label_list (list): A list of ground-truth labels. user_list (list): A list of user indexes. item_list (list): A list of item indexes. item_cate_list (list): A list of category indexes. item_history_batch (list): A list of item history indexes. item_cate_history_batch (list): A list of category history indexes. time_list (list): A list of current timestamp. time_diff_list (list): A list of timestamp between each sequential operations. time_from_first_action_list (list): A list of timestamp from the first operation. time_to_now_list (list): A list of timestamp to the current time. batch_num_ngs (int): The number of negative sampling while training in mini-batch. Returns: dict: A dictionary, containing multiple numpy arrays that are convenient for further operation. """ if batch_num_ngs: instance_cnt = len(label_list) if instance_cnt < 5: return label_list_all = [] item_list_all = [] item_cate_list_all = [] user_list_all = np.asarray( [[user] * (batch_num_ngs + 1) for user in user_list], dtype=np.int32 ).flatten() time_list_all = np.asarray( [[t] * (batch_num_ngs + 1) for t in time_list], dtype=np.float32 ).flatten() history_lengths = [len(item_history_batch[i]) for i in range(instance_cnt)] max_seq_length_batch = self.max_seq_length item_history_batch_all = np.zeros( (instance_cnt * (batch_num_ngs + 1), max_seq_length_batch) ).astype("int32") item_cate_history_batch_all = np.zeros( (instance_cnt * (batch_num_ngs + 1), max_seq_length_batch) ).astype("int32") time_diff_batch = np.zeros( (instance_cnt * (batch_num_ngs + 1), max_seq_length_batch) ).astype("float32") time_from_first_action_batch = np.zeros( (instance_cnt * (batch_num_ngs + 1), max_seq_length_batch) ).astype("float32") time_to_now_batch = np.zeros( (instance_cnt * (batch_num_ngs + 1), max_seq_length_batch) ).astype("float32") mask = np.zeros( (instance_cnt * (1 + batch_num_ngs), max_seq_length_batch) ).astype("float32") for i in range(instance_cnt): this_length = min(history_lengths[i], max_seq_length_batch) for index in range(batch_num_ngs + 1): item_history_batch_all[ i * (batch_num_ngs + 1) + index, :this_length ] = np.asarray(item_history_batch[i][-this_length:], dtype=np.int32) item_cate_history_batch_all[ i * (batch_num_ngs + 1) + index, :this_length ] = np.asarray( item_cate_history_batch[i][-this_length:], dtype=np.int32 ) mask[i * (batch_num_ngs + 1) + index, :this_length] = 1.0 time_diff_batch[ i * (batch_num_ngs + 1) + index, :this_length ] = np.asarray(time_diff_list[i][-this_length:], dtype=np.float32) time_from_first_action_batch[ i * (batch_num_ngs + 1) + index, :this_length ] = np.asarray( time_from_first_action_list[i][-this_length:], dtype=np.float32 ) time_to_now_batch[ i * (batch_num_ngs + 1) + index, :this_length ] = np.asarray(time_to_now_list[i][-this_length:], dtype=np.float32) for i in range(instance_cnt): positive_item = item_list[i] label_list_all.append(1) item_list_all.append(positive_item) item_cate_list_all.append(item_cate_list[i]) count = 0 while batch_num_ngs: random_value = random.randint(0, instance_cnt - 1) negative_item = item_list[random_value] if negative_item == positive_item: continue label_list_all.append(0) item_list_all.append(negative_item) item_cate_list_all.append(item_cate_list[random_value]) count += 1 if count == batch_num_ngs: break res = {} res["labels"] = np.asarray(label_list_all, dtype=np.float32).reshape(-1, 1) res["users"] = user_list_all res["items"] = np.asarray(item_list_all, dtype=np.int32) res["cates"] = np.asarray(item_cate_list_all, dtype=np.int32) res["item_history"] = item_history_batch_all res["item_cate_history"] = item_cate_history_batch_all res["mask"] = mask res["time"] = time_list_all res["time_diff"] = time_diff_batch res["time_from_first_action"] = time_from_first_action_batch res["time_to_now"] = time_to_now_batch return res else: instance_cnt = len(label_list) history_lengths = [len(item_history_batch[i]) for i in range(instance_cnt)] max_seq_length_batch = self.max_seq_length item_history_batch_all = np.zeros( (instance_cnt, max_seq_length_batch) ).astype("int32") item_cate_history_batch_all = np.zeros( (instance_cnt, max_seq_length_batch) ).astype("int32") time_diff_batch = np.zeros((instance_cnt, max_seq_length_batch)).astype( "float32" ) time_from_first_action_batch = np.zeros( (instance_cnt, max_seq_length_batch) ).astype("float32") time_to_now_batch = np.zeros((instance_cnt, max_seq_length_batch)).astype( "float32" ) mask = np.zeros((instance_cnt, max_seq_length_batch)).astype("float32") for i in range(instance_cnt): this_length = min(history_lengths[i], max_seq_length_batch) item_history_batch_all[i, :this_length] = item_history_batch[i][ -this_length: ] item_cate_history_batch_all[i, :this_length] = item_cate_history_batch[ i ][-this_length:] mask[i, :this_length] = 1.0 time_diff_batch[i, :this_length] = time_diff_list[i][-this_length:] time_from_first_action_batch[ i, :this_length ] = time_from_first_action_list[i][-this_length:] time_to_now_batch[i, :this_length] = time_to_now_list[i][-this_length:] res = {} res["labels"] = np.asarray(label_list, dtype=np.float32).reshape(-1, 1) res["users"] = np.asarray(user_list, dtype=np.float32) res["items"] = np.asarray(item_list, dtype=np.int32) res["cates"] = np.asarray(item_cate_list, dtype=np.int32) res["item_history"] = item_history_batch_all res["item_cate_history"] = item_cate_history_batch_all res["mask"] = mask res["time"] = np.asarray(time_list, dtype=np.float32) res["time_diff"] = time_diff_batch res["time_from_first_action"] = time_from_first_action_batch res["time_to_now"] = time_to_now_batch return res
[docs] def gen_feed_dict(self, data_dict): """Construct a dictionary that maps graph elements to values. Args: data_dict (dict): A dictionary that maps string name to numpy arrays. Returns: dict: A dictionary that maps graph elements to numpy arrays. """ if not data_dict: return dict() feed_dict = { self.labels: data_dict["labels"], self.users: data_dict["users"], self.items: data_dict["items"], self.cates: data_dict["cates"], self.item_history: data_dict["item_history"], self.item_cate_history: data_dict["item_cate_history"], self.mask: data_dict["mask"], self.time: data_dict["time"], self.time_diff: data_dict["time_diff"], self.time_from_first_action: data_dict["time_from_first_action"], self.time_to_now: data_dict["time_to_now"], } return feed_dict