import numpy as np
from scipy.optimize import minimize
from scipy.interpolate import interp1d, PchipInterpolator
from scipy.special import softmax
from lightgbm import Dataset
from rumboost.metrics import cross_entropy
from rumboost.nested_cross_nested import nest_probs, cross_nested_probs
from rumboost.utils import (
data_leaf_value,
map_x_knots,
)
[docs]
def monotone_spline(
x_spline,
weights,
num_splines=5,
x_knots=None,
y_knots=None,
linear_extrapolation=False,
):
"""
A function that apply monotonic spline interpolation on a given feature.
Parameters
----------
x_spline : numpy array
Data from the interpolated feature.
weights : dict
The dictionary corresponding to the feature leaf values.
num_splines : int, optional (default=5)
The number of splines used for interpolation.
x_knots : numpy array, optional (default=None)
The positions of knots. If None, linearly spaced.
y_knots : numpy array, optional (default=None)
The value of the utility at knots. Need to be specified if x_knots is passed.
linear_extrapolation : bool, optional (default=False)
If True, the splines are linearly extrapolated before and after the first and last knots.
Returns
-------
x_spline : numpy array
A vector of x values used to plot the splines.
y_spline : numpy array
A vector of the spline values at x_spline.
pchip : scipy.interpolate.PchipInterpolator
The scipy interpolator object from the monotonic splines.
x_knots : numpy array
The positions of knots. If None, linearly spaced.
y_knots : numpy array
The value of the utility at knots.
"""
# create knots if None
if x_knots is None:
x_knots = np.linspace(np.min(x_spline), np.max(x_spline), num_splines + 1)
x_knots, y_knots = data_leaf_value(x_knots, weights)
# sort knots in ascending order
is_sorted = lambda a: np.all(a[:-1] <= a[1:])
if not is_sorted(x_knots):
x_knots = np.sort(x_knots)
# untangle knots that have the same values
is_equal = lambda a: np.any(a[:-1] == a[1:])
if is_equal(x_knots):
first_point = x_knots[0]
last_point = x_knots[-1]
x_knots = [
x_ii + 1e-9 if x_i == x_ii else x_ii
for x_i, x_ii in zip(x_knots[:-1], x_knots[1:])
]
if last_point != x_knots[-1]:
x_knots[-1] = last_point
x_knots[-2] = last_point - 1e-10
x_knots.insert(0, first_point)
# create spline object
pchip = PchipInterpolator(x_knots, y_knots, extrapolate=True)
if linear_extrapolation:
pchip = linear_extrapolator_wrapper(pchip)
# compute utility values
y_spline = pchip(x_spline)
return x_spline, y_spline, pchip, x_knots, y_knots
[docs]
def mean_monotone_spline(x_data, x_mean, y_data, y_mean, num_splines=15):
"""
A function that apply monotonic spline interpolation on a given feature.
The difference with monotone_spline, is that the knots are on the closest stairs mean.
Parameters
----------
x_data : numpy array
Data from the interpolated feature.
x_mean : numpy array
The x coordinate of the vector of mean points at each stairs
y_data : numpy array
V(x_value), the values of the utility at x.
y_mean : numpy array
The y coordinate of the vector of mean points at each stairs
Returns
-------
x_spline : numpy array
A vector of x values used to plot the splines.
y_spline : numpy array
A vector of the spline values at x_spline.
pchip : scipy.interpolate.PchipInterpolator
The scipy interpolator object from the monotonic splines.
"""
# case where there are more splines than mean data points
if num_splines + 1 >= len(x_mean):
x_knots = x_mean
y_knots = y_mean
# adding first and last point for extrapolation
if x_knots[0] != x_data[0]:
x_knots = np.insert(x_knots, 0, x_data[0])
y_knots = np.insert(y_knots, 0, y_data[0])
if x_knots[-1] != x_data[-1]:
x_knots = np.append(x_knots, x_data[-1])
y_knots = np.append(y_knots, y_data[-1])
# create interpolator
pchip = PchipInterpolator(x_knots, y_knots, extrapolate=True)
# for plot
x_spline = np.linspace(0, np.max(x_data) * 1.05, 10000)
y_spline = pchip(x_spline)
return x_spline, y_spline, pchip, x_knots, y_knots
# candidate for knots
x_candidates = np.linspace(
np.min(x_mean) + 1e-10, np.max(x_mean) + 1e-10, num_splines + 1
)
# find closest mean point
idx = np.unique(np.searchsorted(x_mean, x_candidates, side="left") - 1)
x_knots = x_mean[idx]
y_knots = y_mean[idx]
# adding first and last point for extrapolation
if x_knots[0] != x_data[0]:
x_knots = np.insert(x_knots, 0, x_data[0])
y_knots = np.insert(y_knots, 0, y_data[0])
if x_knots[-1] != x_data[-1]:
x_knots = np.append(x_knots, x_data[-1])
y_knots = np.append(y_knots, y_data[-1])
# create interpolator
pchip = PchipInterpolator(x_knots, y_knots, extrapolate=True)
# for plot
x_spline = np.linspace(0, np.max(x_data) * 1.05, 10000)
y_spline = pchip(x_spline)
return x_spline, y_spline, pchip, x_knots, y_knots
[docs]
def updated_utility_collection(
weights,
data,
num_splines_feat,
spline_utilities,
mean_splines=False,
x_knots=None,
linear_extrapolation=False,
):
"""
Create a dictionary that stores what type of utility (smoothed or not) should be used for smooth_predict.
Parameters
----------
weights : dict
A dictionary containing all leaf values for all utilities and all features.
data : pandas DataFrame
The pandas DataFrame used for training.
num_splines_feat : dict
A dictionary of the same format than weights of features names for each utility that are interpolated with monotonic splines.
The key is a spline interpolated feature name, and the value is the number of splines used for interpolation as an int.
There should be a key for all features where splines are used.
spline_utilities : dict
A dictionary containing attributes where splines are applied. Must be in the form ]
{utility_indx: [attributes1, attributes2, ...], ...}.
mean_splines : bool, optional (default = False)
If True, the splines are computed at the mean distribution of data for stairs.
x_knots : dict
A dictionary in the form of {utility: {attribute: x_knots}} where x_knots are the spline knots for the corresponding
utility and attributes
linear_extrapolation : bool, optional (default=False)
If True, the splines are linearly extrapolated before and after the first and last knots.
Returns
-------
util_collection : dict
A dictionary containing the type of utility to use for all features in all utilities.
"""
# initialise utility collection
util_collection = {}
# for all utilities and features that have leaf values
for u in weights:
util_collection[u] = {}
for f in weights[u]:
# data points and their utilities
x_dat, y_dat = data_leaf_value(data[f], weights[u][f])
# if using splines
if f in spline_utilities[u]:
# if mean technique
if mean_splines:
x_mean, y_mean = data_leaf_value(
data[f], weights[u][f], technique="mean_data"
)
_, _, func, _, _ = mean_monotone_spline(
x_dat, x_mean, y_dat, y_mean, num_splines=num_splines_feat[u][f]
)
# else, i.e. linearly sampled points
else:
x_spline = np.linspace(np.min(data[f]), np.max(data[f]), num=10000)
x_knots_temp, y_knots = data_leaf_value(
x_knots[u][f], weights[u][f]
)
_, _, func, _, _ = monotone_spline(
x_spline,
weights,
num_splines=num_splines_feat[u][f],
x_knots=x_knots_temp,
y_knots=y_knots,
linear_extrapolation=linear_extrapolation,
)
# stairs functions
else:
func = interp1d(
x_dat,
y_dat,
kind="previous",
bounds_error=False,
fill_value=(y_dat[0], y_dat[-1]),
)
# save the utility function
util_collection[u][f] = func
return util_collection
[docs]
def smooth_predict(
data_test,
util_collection,
utilities=False,
mu=None,
nests=None,
alphas=None,
fe_model=None,
target="choice",
):
"""
A prediction function that used monotonic spline interpolation on some features to predict their utilities.
The function should be used with a trained model only.
Parameters
----------
data_test : pandas DataFrame
A pandas DataFrame containing the observations that will be predicted.
util_collection : dict
A dictionary containing the type of utility to use for all features in all utilities.
utilities : bool, optional (default = False)
if True, return the raw utilities.
mu : list, optional (default=None)
Only used, and required, if nests is True. It is the list of mu values for each nest.
The first value correspond to the first nest and so on.
nests : dict, optional (default=False)
If not none, compute predictions with the nested probability function. The dictionary keys are alternatives number and their values are
their nest number. By example {0:0, 1:1, 2:0} means that alt 0 and 2 are in nest 0 and alt 1 is in nest 1.
alphas : numpy.ndarray, optional (default=None)
An array of J (alternatives) by M (nests).
alpha_jn represents the degree of membership of alternative j to nest n.
fe_model : RUMBoost, optional (default=None)
The socio-economic characteristics part of the functional effect model.
Returns
-------
preds : numpy array
A numpy array containing the predictions for each class for each observation. Predictions are computed through the softmax function,
unless the raw utilities are requested. A prediction for class j for observation n will be U[n, j].
"""
raw_preds = np.array(np.zeros((data_test.shape[0], len(util_collection))))
for u in util_collection:
for f in util_collection[u]:
raw_preds[:, int(u)] += util_collection[u][f](data_test[f])
# adding the socio-economic constant
if fe_model is not None:
raw_preds += fe_model.predict(
Dataset(data_test, label=data_test[target], free_raw_data=False),
utilities=True,
)
# probabilities
if alphas is not None:
preds, _, _ = cross_nested_probs(raw_preds, mu, alphas)
return preds
if mu is not None:
preds, _, _ = nest_probs(raw_preds, mu, nests)
return preds
if not utilities:
preds = softmax(raw_preds, axis=1)
return preds
return raw_preds
[docs]
def optimise_splines(
x_knots,
weights,
data_train,
data_test,
label_test,
spline_utilities,
num_spline_range,
x_first=None,
x_last=None,
deg_freedom=None,
mu=None,
nests=None,
alphas=None,
fe_model=None,
linear_extrapolation=False,
):
"""
Function wrapper to find the optimal position of knots for each feature. The optimal position is the one
who minimises the CE loss.
Parameters
----------
x_knots ; 1d np.array
The positions of knots in a 1d array, following this structure:
np.array([x_att1_1, x_att1_2, ... x_att1_m, x_att2_1, ... x_attn_m]) where m is the number of knots
and n the number of attributes that are interpolated with splines.
weights : dict
A dictionary containing all leaf values for all utilities and all features.
data_train : pandas DataFrame
The pandas DataFrame used for training.
data_test : pandas DataFrame
The pandas DataFrame used for testing.
label_test : pandas Series or numpy array
The labels of the dataset used for testing.
spline_utilities : dict
A dictionary containing attributes where splines are applied. Must be in the form ]
{utility_indx: [attributes1, attributes2, ...], ...}.
num_splines_range : dict
A dictionary of the same format than weights of features names for each utility that are interpolated with monotonic splines.
The key is a spline interpolated feature name, and the value is the number of splines used for interpolation as an int.
There should be a key for all features where splines are used.
x_first : list, optional (default=None)
A list of all first knots in the order of the attributes from spline_utilities and num_splines_range.
x_last : list, optional (default=None)
A list of all last knots in the order of the attributes from spline_utilities and num_splines_range.
mu : list, optional (default=None)
Only used, and required, if nests is True. It is the list of mu values for each nest.
The first value correspond to the first nest and so on.
nests : dict, optional (default=False)
If not none, compute predictions with the nested probability function. The dictionary keys are alternatives number and their values are
their nest number. By example {0:0, 1:1, 2:0} means that alt 0 and 2 are in nest 0 and alt 1 is in nest 1.
alphas : numpy.ndarray, optional (default=None)
An array of J (alternatives) by M (nests).
alpha_jn represents the degree of membership of alternative j to nest n.
fe_model : RUMBoost, optional (default=None)
The socio-economic characteristics part of the functional effect model.
linear_extrapolation : bool, optional (default=False)
If True, the splines are linearly extrapolated before and after the first and last knots.
Returns
-------
loss: float
The final cross entropy or BIC on the test set.
"""
x_knots_dict = map_x_knots(x_knots, num_spline_range, x_first, x_last)
# compute new CE
utility_collection = updated_utility_collection(
weights,
data_train,
num_splines_feat=num_spline_range,
spline_utilities=spline_utilities,
x_knots=x_knots_dict,
linear_extrapolation=linear_extrapolation,
)
smooth_preds_final = smooth_predict(
data_test,
utility_collection,
mu=mu,
nests=nests,
alphas=alphas,
fe_model=fe_model,
)
loss = cross_entropy(smooth_preds_final, label_test)
# BIC
if deg_freedom:
N = len(label_test)
loss = 2 * N * loss + np.log(N) * deg_freedom
return loss
[docs]
def optimal_knots_position(
weights,
dataset_train,
dataset_test,
labels_test,
spline_utilities,
num_spline_range,
max_iter=100,
optimise=True,
bic=True,
n_iter=1,
first_last_knot_fixed=True,
mu=None,
nests=None,
alphas=None,
fe_model=None,
linear_extrapolation=False,
):
"""
Find the optimal position of knots for a given number of knots for given attributes.
Smoothing is not implemented yet for shared ensembles.
Parameters
----------
weights : dict
A dictionary containing all leaf values for all utilities and all features.
dataset_train : pandas DataFrame
The pandas DataFrame used for training.
dataset_test : pandas DataFrame
The pandas DataFrame used for testing.
labels_test : pandas Series or numpy array
The labels of the dataset used for testing.
spline_utilities : dict
A dictionary containing attributes where splines are applied. Must be in the form ]
{utility_indx: [attributes1, attributes2, ...], ...}.
num_splines_range : dict
A dictionary of the same format than weights of features names for each utility that are interpolated with monotonic splines.
The key is a spline interpolated feature name, and the value is the number of splines used for interpolation as an int.
There should be a key for all features where splines are used.
max_iter : int, optional (default=100)
The maximum number of iterations from the solver
optimize : bool, optional (default=True)
If True, optimize the knots position with scipy.minimize
bic : bool, optional (default=True)
If True, compute the BIC instead of the cross-entropy.
n_iter : int, optional (default=None)
The number of iteration, to leverage the randomness induced by the local minimizer.
first_last_knot_fixed : bool, optional (default=True)
If True, the first and last knots are fixed to the first and last data points.
mu : numpy.ndarray, optional (default=None)
Only used, and required, if nests is True. It is the array of mu values for each nest.
The first value correspond to the first nest and so on.
nests : dict, optional (default=None)
If not None, compute predictions with the nested probability function. The dictionary keys are nests id and their values are
the alternatives belonging to the nest. By example {0:[0, 2], 1:[1]} means that alt 0 and 2 are in nest 0 and alt 1 is in nest 1.
alphas : numpy.ndarray, optional (default=None)
An array of J (alternatives) by N (nests).
alpha_jn represents the degree of membership of alternative j to nest n.
fe_model : RUMBoost, optional (default=None)
The socio-economic characteristics part of the functional effect model.
linear_extrapolation : bool, optional (default=False)
If True, the splines are linearly extrapolated before and after the first and last knots.
Returns
-------
x_opt : OptimizeResult
The result of scipy.minimize.
"""
ce = 1e10
for n in range(n_iter):
x_0 = []
x_first = [] if first_last_knot_fixed else None
x_last = [] if first_last_knot_fixed else None
all_cons = []
all_bounds = []
starter = 0
for u in num_spline_range:
for f in num_spline_range[u]:
# last split points
last_split_point = weights[u][f]["Splitting points"][-1]
# get first and last data points
first_point = np.min(dataset_train[f])
last_point = np.max(dataset_train[f])
if first_last_knot_fixed:
# initial knots position q/Nth quantile
x_0.extend(
[
np.quantile(
dataset_train[f].unique(), q / (num_spline_range[u][f])
)
for q in range(1, num_spline_range[u][f])
]
)
# knots must be greater than the previous one
cons = [
{
"type": "ineq",
"fun": lambda x, i_plus=starter + j + 1, i_minus=starter + j: x[
i_plus
]
- x[i_minus]
- 1e-6,
"keep_feasible": True,
}
for j in range(0, num_spline_range[u][f] - 2)
]
# last knots must be greater than last split point
cons.append(
{
"type": "ineq",
"fun": lambda x, i_knot=starter + num_spline_range[u][
f
] - 2, lsp=last_split_point: x[i_knot]
- lsp
- 1e-6,
"keep_feasible": True,
}
)
# knots must be within the range of data points
bounds = [
(
first_point + q * 1e-7,
last_point + (-num_spline_range[u][f] + 1 + q) * 1e-7,
)
for q in range(1, num_spline_range[u][f])
]
x_first.append(first_point)
x_last.append(last_point)
# count the number of knots until now
starter += num_spline_range[u][f] - 1
else:
x_0.extend(
[
np.quantile(
dataset_train[f].unique(),
0.95 * q / (num_spline_range[u][f]),
)
for q in range(0, num_spline_range[u][f] + 1)
]
)
cons = [
{
"type": "ineq",
"fun": lambda x, i_plus=starter + j + 1, i_minus=starter + j: x[
i_plus
]
- x[i_minus]
- 1e-6,
"keep_feasible": True,
}
for j in range(0, num_spline_range[u][f])
]
bounds = [
(
first_point + q * 1e-7,
last_point + (-num_spline_range[u][f] - 1 + q) * 1e-7,
)
for q in range(0, num_spline_range[u][f] + 1)
]
starter += num_spline_range[u][f] + 1
# store all constraints and first and last points
all_cons.extend(cons)
all_bounds.extend(bounds)
deg_freedom = starter if bic else None
if optimise:
# optimise knot positions
x_opt = minimize(
optimise_splines,
np.array(x_0),
args=(
weights,
dataset_train,
dataset_test,
labels_test,
spline_utilities,
num_spline_range,
x_first,
x_last,
deg_freedom,
mu,
nests,
alphas,
fe_model,
linear_extrapolation,
),
bounds=all_bounds,
constraints=all_cons,
method="SLSQP",
options={"maxiter": max_iter, "disp": True},
)
# compute final negative cross-entropy with optimised knots
ce_final = optimise_splines(
x_opt.x,
weights,
dataset_train,
dataset_test,
labels_test,
spline_utilities,
num_spline_range,
x_first,
x_last,
deg_freedom,
mu=mu,
nests=nests,
alphas=alphas,
fe_model=fe_model,
linear_extrapolation=linear_extrapolation,
)
# store best value
if ce_final < ce:
ce = ce_final
x_opt_best = x_opt
x_first_best = x_first
x_last_best = x_last
print(f"{n+1}/{n_iter}:{ce_final} with knots at: {x_opt.x}")
else:
# without optimisation
final_loss = optimise_splines(
np.array(x_0),
weights,
dataset_train,
dataset_test,
labels_test,
spline_utilities,
num_spline_range,
x_first,
x_last,
deg_freedom,
mu=mu,
nests=nests,
alphas=alphas,
fe_model=fe_model,
linear_extrapolation=linear_extrapolation,
)
if final_loss < ce:
ce = final_loss
x_opt_best = x_0
print(f"{n+1}/{n_iter}:{final_loss}")
# return best x_opt and first and last points + score
if optimise:
return x_opt_best, x_first_best, x_last_best, ce
return x_opt_best, x_first, x_last, ce