Source code for recommenders.models.newsrec.models.base_model

# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.

import abc
import time
import numpy as np
from tqdm import tqdm
import tensorflow as tf
from tensorflow.compat.v1 import keras

from recommenders.models.deeprec.deeprec_utils import cal_metric

tf.compat.v1.disable_eager_execution()
tf.compat.v1.experimental.output_all_intermediates(True)
__all__ = ["BaseModel"]


[docs]class BaseModel: """Basic class of models Attributes: hparams (HParams): A HParams object, holds the entire set of hyperparameters. train_iterator (object): An iterator to load the data in training steps. test_iterator (object): An iterator to load the data in testing steps. graph (object): An optional graph. seed (int): Random seed. """
[docs] def __init__( self, hparams, iterator_creator, seed=None, ): """Initializing the model. Create common logics which are needed by all deeprec models, such as loss function, parameter set. Args: hparams (HParams): A HParams object, holds the entire set of hyperparameters. iterator_creator (object): An iterator to load the data. graph (object): An optional graph. seed (int): Random seed. """ self.seed = seed tf.compat.v1.set_random_seed(seed) np.random.seed(seed) self.train_iterator = iterator_creator( hparams, hparams.npratio, col_spliter="\t", ) self.test_iterator = iterator_creator( hparams, col_spliter="\t", ) self.hparams = hparams self.support_quick_scoring = hparams.support_quick_scoring # set GPU use with on demand growth gpu_options = tf.compat.v1.GPUOptions(allow_growth=True) sess = tf.compat.v1.Session( config=tf.compat.v1.ConfigProto(gpu_options=gpu_options) ) # set this TensorFlow session as the default session for Keras tf.compat.v1.keras.backend.set_session(sess) # IMPORTANT: models have to be loaded AFTER SETTING THE SESSION for keras! # Otherwise, their weights will be unavailable in the threads after the session there has been set self.model, self.scorer = self._build_graph() self.loss = self._get_loss() self.train_optimizer = self._get_opt() self.model.compile(loss=self.loss, optimizer=self.train_optimizer)
def _init_embedding(self, file_path): """Load pre-trained embeddings as a constant tensor. Args: file_path (str): the pre-trained glove embeddings file path. Returns: numpy.ndarray: A constant numpy array. """ return np.load(file_path) @abc.abstractmethod def _build_graph(self): """Subclass will implement this.""" pass @abc.abstractmethod def _get_input_label_from_iter(self, batch_data): """Subclass will implement this""" pass def _get_loss(self): """Make loss function, consists of data loss and regularization loss Returns: object: Loss function or loss function name """ if self.hparams.loss == "cross_entropy_loss": data_loss = "categorical_crossentropy" elif self.hparams.loss == "log_loss": data_loss = "binary_crossentropy" else: raise ValueError("this loss not defined {0}".format(self.hparams.loss)) return data_loss def _get_opt(self): """Get the optimizer according to configuration. Usually we will use Adam. Returns: object: An optimizer. """ lr = self.hparams.learning_rate optimizer = self.hparams.optimizer if optimizer == "adam": train_opt = keras.optimizers.Adam(lr=lr) return train_opt def _get_pred(self, logit, task): """Make final output as prediction score, according to different tasks. Args: logit (object): Base prediction value. task (str): A task (values: regression/classification) Returns: object: Transformed score """ if task == "regression": pred = tf.identity(logit) elif task == "classification": pred = tf.sigmoid(logit) else: raise ValueError( "method must be regression or classification, but now is {0}".format( task ) ) return pred
[docs] def train(self, train_batch_data): """Go through the optimization step once with training data in feed_dict. Args: sess (object): The model session object. feed_dict (dict): Feed values to train the model. This is a dictionary that maps graph elements to values. Returns: list: A list of values, including update operation, total loss, data loss, and merged summary. """ train_input, train_label = self._get_input_label_from_iter(train_batch_data) rslt = self.model.train_on_batch(train_input, train_label) return rslt
[docs] def eval(self, eval_batch_data): """Evaluate the data in feed_dict with current model. Args: sess (object): The model session object. feed_dict (dict): Feed values for evaluation. This is a dictionary that maps graph elements to values. Returns: list: A list of evaluated results, including total loss value, data loss value, predicted scores, and ground-truth labels. """ eval_input, eval_label = self._get_input_label_from_iter(eval_batch_data) imp_index = eval_batch_data["impression_index_batch"] pred_rslt = self.scorer.predict_on_batch(eval_input) return pred_rslt, eval_label, imp_index
[docs] def fit( self, train_news_file, train_behaviors_file, valid_news_file, valid_behaviors_file, test_news_file=None, test_behaviors_file=None, ): """Fit the model with train_file. Evaluate the model on valid_file per epoch to observe the training status. If test_news_file is not None, evaluate it too. Args: train_file (str): training data set. valid_file (str): validation set. test_news_file (str): test set. Returns: object: An instance of self. """ for epoch in range(1, self.hparams.epochs + 1): step = 0 self.hparams.current_epoch = epoch epoch_loss = 0 train_start = time.time() tqdm_util = tqdm( self.train_iterator.load_data_from_file( train_news_file, train_behaviors_file ) ) for batch_data_input in tqdm_util: step_result = self.train(batch_data_input) step_data_loss = step_result epoch_loss += step_data_loss step += 1 if step % self.hparams.show_step == 0: tqdm_util.set_description( "step {0:d} , total_loss: {1:.4f}, data_loss: {2:.4f}".format( step, epoch_loss / step, step_data_loss ) ) train_end = time.time() train_time = train_end - train_start eval_start = time.time() train_info = ",".join( [ str(item[0]) + ":" + str(item[1]) for item in [("logloss loss", epoch_loss / step)] ] ) eval_res = self.run_eval(valid_news_file, valid_behaviors_file) eval_info = ", ".join( [ str(item[0]) + ":" + str(item[1]) for item in sorted(eval_res.items(), key=lambda x: x[0]) ] ) if test_news_file is not None: test_res = self.run_eval(test_news_file, test_behaviors_file) test_info = ", ".join( [ str(item[0]) + ":" + str(item[1]) for item in sorted(test_res.items(), key=lambda x: x[0]) ] ) eval_end = time.time() eval_time = eval_end - eval_start if test_news_file is not None: print( "at epoch {0:d}".format(epoch) + "\ntrain info: " + train_info + "\neval info: " + eval_info + "\ntest info: " + test_info ) else: print( "at epoch {0:d}".format(epoch) + "\ntrain info: " + train_info + "\neval info: " + eval_info ) print( "at epoch {0:d} , train time: {1:.1f} eval time: {2:.1f}".format( epoch, train_time, eval_time ) ) return self
[docs] def group_labels(self, labels, preds, group_keys): """Devide labels and preds into several group according to values in group keys. Args: labels (list): ground truth label list. preds (list): prediction score list. group_keys (list): group key list. Returns: list, list, list: - Keys after group. - Labels after group. - Preds after group. """ all_keys = list(set(group_keys)) all_keys.sort() group_labels = {k: [] for k in all_keys} group_preds = {k: [] for k in all_keys} for label, p, k in zip(labels, preds, group_keys): group_labels[k].append(label) group_preds[k].append(p) all_labels = [] all_preds = [] for k in all_keys: all_labels.append(group_labels[k]) all_preds.append(group_preds[k]) return all_keys, all_labels, all_preds
[docs] def run_eval(self, news_filename, behaviors_file): """Evaluate the given file and returns some evaluation metrics. Args: filename (str): A file name that will be evaluated. Returns: dict: A dictionary that contains evaluation metrics. """ if self.support_quick_scoring: _, group_labels, group_preds = self.run_fast_eval( news_filename, behaviors_file ) else: _, group_labels, group_preds = self.run_slow_eval( news_filename, behaviors_file ) res = cal_metric(group_labels, group_preds, self.hparams.metrics) return res
def user(self, batch_user_input): user_input = self._get_user_feature_from_iter(batch_user_input) user_vec = self.userencoder.predict_on_batch(user_input) user_index = batch_user_input["impr_index_batch"] return user_index, user_vec def news(self, batch_news_input): news_input = self._get_news_feature_from_iter(batch_news_input) news_vec = self.newsencoder.predict_on_batch(news_input) news_index = batch_news_input["news_index_batch"] return news_index, news_vec def run_user(self, news_filename, behaviors_file): if not hasattr(self, "userencoder"): raise ValueError("model must have attribute userencoder") user_indexes = [] user_vecs = [] for batch_data_input in tqdm( self.test_iterator.load_user_from_file(news_filename, behaviors_file) ): user_index, user_vec = self.user(batch_data_input) user_indexes.extend(np.reshape(user_index, -1)) user_vecs.extend(user_vec) return dict(zip(user_indexes, user_vecs)) def run_news(self, news_filename): if not hasattr(self, "newsencoder"): raise ValueError("model must have attribute newsencoder") news_indexes = [] news_vecs = [] for batch_data_input in tqdm( self.test_iterator.load_news_from_file(news_filename) ): news_index, news_vec = self.news(batch_data_input) news_indexes.extend(np.reshape(news_index, -1)) news_vecs.extend(news_vec) return dict(zip(news_indexes, news_vecs)) def run_slow_eval(self, news_filename, behaviors_file): preds = [] labels = [] imp_indexes = [] for batch_data_input in tqdm( self.test_iterator.load_data_from_file(news_filename, behaviors_file) ): step_pred, step_labels, step_imp_index = self.eval(batch_data_input) preds.extend(np.reshape(step_pred, -1)) labels.extend(np.reshape(step_labels, -1)) imp_indexes.extend(np.reshape(step_imp_index, -1)) group_impr_indexes, group_labels, group_preds = self.group_labels( labels, preds, imp_indexes ) return group_impr_indexes, group_labels, group_preds def run_fast_eval(self, news_filename, behaviors_file): news_vecs = self.run_news(news_filename) user_vecs = self.run_user(news_filename, behaviors_file) self.news_vecs = news_vecs self.user_vecs = user_vecs group_impr_indexes = [] group_labels = [] group_preds = [] for ( impr_index, news_index, user_index, label, ) in tqdm(self.test_iterator.load_impression_from_file(behaviors_file)): pred = np.dot( np.stack([news_vecs[i] for i in news_index], axis=0), user_vecs[impr_index], ) group_impr_indexes.append(impr_index) group_labels.append(label) group_preds.append(pred) return group_impr_indexes, group_labels, group_preds