# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.
import itertools
import numpy as np
import tensorflow as tf
from tensorflow_estimator.python.estimator.export.export import (
build_supervised_input_receiver_fn_from_input_fn,
)
MODEL_DIR = "model_checkpoints"
OPTIMIZERS = dict(
adadelta=tf.compat.v1.train.AdadeltaOptimizer,
adagrad=tf.compat.v1.train.AdagradOptimizer,
adam=tf.compat.v1.train.AdamOptimizer,
ftrl=tf.compat.v1.train.FtrlOptimizer,
momentum=tf.compat.v1.train.MomentumOptimizer,
rmsprop=tf.compat.v1.train.RMSPropOptimizer,
sgd=tf.compat.v1.train.GradientDescentOptimizer,
)
def _dataset(x, y=None, batch_size=128, num_epochs=1, shuffle=False, seed=None):
if y is None:
dataset = tf.data.Dataset.from_tensor_slices(x)
else:
dataset = tf.data.Dataset.from_tensor_slices((x, y))
if shuffle:
dataset = dataset.shuffle(
1000, seed=seed, reshuffle_each_iteration=True # buffer size = 1000
)
elif seed is not None:
import warnings
warnings.warn("Seed was set but `shuffle=False`. Seed will be ignored.")
return dataset.repeat(num_epochs).batch(batch_size)
[docs]def build_optimizer(name, lr=0.001, **kwargs):
"""Get an optimizer for TensorFlow high-level API Estimator.
Available options are: `adadelta`, `adagrad`, `adam`, `ftrl`, `momentum`, `rmsprop` or `sgd`.
Args:
name (str): Optimizer name.
lr (float): Learning rate
kwargs: Optimizer arguments as key-value pairs
Returns:
tf.train.Optimizer: Tensorflow optimizer.
"""
name = name.lower()
try:
optimizer_class = OPTIMIZERS[name]
except KeyError:
raise KeyError("Optimizer name should be one of: {}".format(list(OPTIMIZERS)))
# Set parameters
params = {}
if name == "ftrl":
params["l1_regularization_strength"] = kwargs.get(
"l1_regularization_strength", 0.0
)
params["l2_regularization_strength"] = kwargs.get(
"l2_regularization_strength", 0.0
)
elif name == "momentum" or name == "rmsprop":
params["momentum"] = kwargs.get("momentum", 0.0)
return optimizer_class(learning_rate=lr, **params)
[docs]def export_model(model, train_input_fn, eval_input_fn, tf_feat_cols, base_dir):
"""Export TensorFlow estimator (model).
Args:
model (tf.estimator.Estimator): Model to export.
train_input_fn (function): Training input function to create data receiver spec.
eval_input_fn (function): Evaluation input function to create data receiver spec.
tf_feat_cols (list(tf.feature_column)): Feature columns.
base_dir (str): Base directory to export the model.
Returns:
str: Exported model path
"""
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
train_rcvr_fn = build_supervised_input_receiver_fn_from_input_fn(train_input_fn)
eval_rcvr_fn = build_supervised_input_receiver_fn_from_input_fn(eval_input_fn)
serve_rcvr_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
tf.feature_column.make_parse_example_spec(tf_feat_cols)
)
rcvr_fn_map = {
tf.estimator.ModeKeys.TRAIN: train_rcvr_fn,
tf.estimator.ModeKeys.EVAL: eval_rcvr_fn,
tf.estimator.ModeKeys.PREDICT: serve_rcvr_fn,
}
exported_path = model.experimental_export_all_saved_models(
export_dir_base=base_dir, input_receiver_fn_map=rcvr_fn_map
)
return exported_path.decode("utf-8")
[docs]def evaluation_log_hook(
estimator,
logger,
true_df,
y_col,
eval_df,
every_n_iter=10000,
model_dir=None,
batch_size=256,
eval_fns=None,
**eval_kwargs
):
"""Evaluation log hook for TensorFlow high-level API Estimator.
.. note::
TensorFlow Estimator model uses the last checkpoint weights for evaluation or prediction.
In order to get the most up-to-date evaluation results while training,
set model's `save_checkpoints_steps` to be equal or greater than hook's `every_n_iter`.
Args:
estimator (tf.estimator.Estimator): Model to evaluate.
logger (Logger): Custom logger to log the results.
E.g., define a subclass of Logger for AzureML logging.
true_df (pd.DataFrame): Ground-truth data.
y_col (str): Label column name in true_df
eval_df (pd.DataFrame): Evaluation data without label column.
every_n_iter (int): Evaluation frequency (steps).
model_dir (str): Model directory to save the summaries to. If None, does not record.
batch_size (int): Number of samples fed into the model at a time.
Note, the batch size doesn't affect on evaluation results.
eval_fns (iterable of functions): List of evaluation functions that have signature of
(true_df, prediction_df, **eval_kwargs)->(float). If None, loss is calculated on true_df.
eval_kwargs: Evaluation function's keyword arguments.
Note, prediction column name should be 'prediction'
Returns:
tf.train.SessionRunHook: Session run hook to evaluate the model while training.
"""
return _TrainLogHook(
estimator,
logger,
true_df,
y_col,
eval_df,
every_n_iter,
model_dir,
batch_size,
eval_fns,
**eval_kwargs
)
class _TrainLogHook(tf.estimator.SessionRunHook):
def __init__(
self,
estimator,
logger,
true_df,
y_col,
eval_df,
every_n_iter=10000,
model_dir=None,
batch_size=256,
eval_fns=None,
**eval_kwargs
):
"""Evaluation log hook class"""
self.model = estimator
self.logger = logger
self.true_df = true_df
self.y_col = y_col
self.eval_df = eval_df
self.every_n_iter = every_n_iter
self.model_dir = model_dir
self.batch_size = batch_size
self.eval_fns = eval_fns
self.eval_kwargs = eval_kwargs
self.summary_writer = None
self.global_step_tensor = None
self.step = 0
def begin(self):
if self.model_dir is not None:
self.summary_writer = tf.compat.v1.summary.FileWriterCache.get(
self.model_dir
)
self.global_step_tensor = tf.compat.v1.train.get_or_create_global_step()
else:
self.step = 0
def before_run(self, run_context):
if self.global_step_tensor is not None:
requests = {"global_step": self.global_step_tensor}
return tf.estimator.SessionRunArgs(requests)
else:
return None
def after_run(self, run_context, run_values):
if self.global_step_tensor is not None:
self.step = run_values.results["global_step"]
else:
self.step += 1
if self.step % self.every_n_iter == 0:
_prev_log_level = tf.compat.v1.logging.get_verbosity()
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
if self.eval_fns is None:
result = self.model.evaluate(
input_fn=pandas_input_fn(
df=self.true_df, y_col=self.y_col, batch_size=self.batch_size
)
)["average_loss"]
self._log("validation_loss", result)
else:
predictions = list(
itertools.islice(
self.model.predict(
input_fn=pandas_input_fn(
df=self.eval_df, batch_size=self.batch_size
)
),
len(self.eval_df),
)
)
prediction_df = self.eval_df.copy()
prediction_df["prediction"] = [p["predictions"][0] for p in predictions]
for fn in self.eval_fns:
result = fn(self.true_df, prediction_df, **self.eval_kwargs)
self._log(fn.__name__, result)
tf.compat.v1.logging.set_verbosity(_prev_log_level)
def end(self, session):
if self.summary_writer is not None:
self.summary_writer.flush()
def _log(self, tag, value):
self.logger.log(tag, value)
if self.summary_writer is not None:
summary = tf.compat.v1.Summary(
value=[tf.compat.v1.Summary.Value(tag=tag, simple_value=value)]
)
self.summary_writer.add_summary(summary, self.step)
[docs]class MetricsLogger:
"""Metrics logger"""
[docs] def __init__(self):
"""Initializer"""
self._log = {}
[docs] def log(self, metric, value):
"""Log metrics. Each metric's log will be stored in the corresponding list.
Args:
metric (str): Metric name.
value (float): Value.
"""
if metric not in self._log:
self._log[metric] = []
self._log[metric].append(value)
[docs] def get_log(self):
"""Getter
Returns:
dict: Log metrics.
"""
return self._log