Source code for src.rumboost.utils

import numpy as np
from rumboost.metrics import cross_entropy
from scipy.special import softmax

[docs] def optimise_asc(asc, raw_preds, labels): """ Optimise the ASC parameters of the model. Parameters ---------- asc : np.array The array of ASC parameters. raw_preds : np.array The raw predictions of the model. labels : np.array The labels of the dataset. Returns ------- asc : np.array The optimised ASC parameters. """ raw_preds_asc = raw_preds + asc new_preds = softmax(raw_preds_asc, axis=1) new_ce = cross_entropy(new_preds, labels) return new_ce
[docs] def process_parent(parent, pairs): """ Dig into the biogeme expression to retrieve name of variable and beta parameter. Work only with simple utility specification (beta * variable). """ # final expression to be stored if parent.getClassName() == "Times": pairs.append(get_pair(parent)) else: # if not final try: # dig into the expression left = parent.left right = parent.right except: # if no left and right children return pairs else: # dig further left and right process_parent(left, pairs) process_parent(right, pairs) return pairs
[docs] def get_pair(parent): """ Return beta and variable names on a tupple from a parent expression. """ left = parent.left right = parent.right beta = None variable = None for exp in [left, right]: if exp.getClassName() == "Beta": beta = exp.name elif exp.getClassName() == "Variable": variable = exp.name if beta and variable: return (beta, variable) else: raise ValueError("Parent does not contain beta and variable")
[docs] def bio_to_rumboost( model, all_columns=False, monotonic_constraints=True, interaction_contraints=True, fct_effect_variables=[], ): """ Converts a biogeme model to a rumboost dict. Parameters ---------- model : a BIOGEME object The model used to create the rumboost structure dictionary. all_columns : bool, optional (default = False) If True, do not consider alternative-specific features. monotonic_constraints : bool, optional (default = True) If False, do not consider monotonic constraints. interaction_contraints : bool, optional (default = True) If False, do not consider feature interactions constraints. fct_effect_variables : list, optional (default = []) The list of variables in the functional effect part of the model Returns ------- rum_structure : dict A dictionary specifying the structure of a RUMBoost object. """ utilities = model.loglike.util # biogeme expression rum_structure = [] # for all utilities for k, v in utilities.items(): rum_structure.append( { "columns": [], "monotone_constraints": [], "interaction_constraints": [], "betas": [], "categorical_feature": [], } ) if len(fct_effect_variables) > 0: rum_structure_re = { "columns": [], "monotone_constraints": [], "interaction_constraints": [], "betas": [], "categorical_feature": [], } for i, pair in enumerate( process_parent(v, []) ): # get all the pairs of the utility if pair[1] in fct_effect_variables: rum_structure_re["columns"].append(pair[1]) # append variable name rum_structure_re["betas"].append(pair[0]) # append beta name if interaction_contraints: rum_structure_re["interaction_constraints"].append( len(rum_structure_re["interaction_constraints"]) ) # no interaction between features if monotonic_constraints: bounds = model.getBoundsOnBeta( pair[0] ) # get bounds on beta parameter for monotonic constraint if (bounds[0] is not None) and (bounds[1] is not None): raise ValueError("Only one bound can be not None") if bounds[0] is not None: if bounds[0] >= 0: rum_structure_re["monotone_constraints"].append( 1 ) # register positive monotonic constraint elif bounds[1] is not None: if bounds[1] <= 0: rum_structure_re["monotone_constraints"].append( -1 ) # register negative monotonic constraint else: rum_structure_re["monotone_constraints"].append(0) # none else: rum_structure[-1]["columns"].append(pair[1]) # append variable name rum_structure[-1]["betas"].append(pair[0]) # append beta name if interaction_contraints: if len(fct_effect_variables) > 0: rum_structure[-1]["interaction_constraints"].append( [len(rum_structure[-1]["interaction_constraints"])] ) # no interaction between features else: rum_structure[-1]["interaction_constraints"].append( [i] ) # no interaction between features if monotonic_constraints: bounds = model.getBoundsOnBeta( pair[0] ) # get bounds on beta parameter for monotonic constraint if (bounds[0] is not None) and (bounds[1] is not None): raise ValueError("Only one bound can be not None") if bounds[0] is not None: if bounds[0] >= 0: rum_structure[-1]["monotone_constraints"].append( 1 ) # register positive monotonic constraint elif bounds[1] is not None: if bounds[1] <= 0: rum_structure[-1]["monotone_constraints"].append( -1 ) # register negative monotonic constraint else: rum_structure[k]["monotone_constraints"].append(0) # none if all_columns: rum_structure[-1]["columns"] = [ col for col in model.database.data.drop( ["choice"], axis=1 ).columns.values.tolist() ] if len(fct_effect_variables) > 0: rum_structure.append(rum_structure_re) return rum_structure
[docs] def get_mid_pos(data, split_points, end="data"): """ Return the mid point in-between two split points for a specific feature (used in pw linear predict). Parameters ---------- data: pandas Series The column of the dataframe associated with the feature. split_points : list The list of split points for that feature. end : str How to compute the mid position of the first and last point, it can be: -'data': add min and max values of data -'split point': add first and last split points -'mean_data': add the mean of data before the first split point, and after the last split point Returns ------- mid_pos : list A list of points in the middle of every consecutive split points. """ # getting position in the middle of splitting points intervals if len(split_points) > 1: mid_pos = [ (sp2 + sp1) / 2 for sp2, sp1 in zip(split_points[:-1], split_points[1:]) ] else: mid_pos = [] if end == "data": mid_pos.insert(0, min(data)) # adding first point mid_pos.append(max(data)) # adding last point elif end == "split point": mid_pos.insert(0, min(split_points)) # adding first point mid_pos.append(max(split_points)) # adding last point elif end == "mean_data": mid_pos.insert(0, data[data < split_points[0]].mean()) # adding first point mid_pos.append(data[data > split_points[-1]].mean()) # adding last point return mid_pos
[docs] def get_mean_pos(data, split_points): """ Return the mean point in-between two split points for a specific feature (used in smoothing). At end points, it is the mean of data before the first split point, and after the last split point. Parameters ---------- data : pandas.Series The column of the dataframe associated with the feature. split_points : list The list of split points for that feature. Returns ------- mean_data : list A list of points in the mean of every consecutive split points. """ # getting the mean of data of splitting points intervals mean_data = [ np.mean(data[(data < s_ii) & (data > s_i)]) for s_i, s_ii in zip(split_points[:-1], split_points[1:]) ] mean_data.insert(0, np.mean(data[data < split_points[0]])) # adding first point mean_data.append(np.mean(data[data > split_points[-1]])) # adding last point return mean_data
[docs] def data_leaf_value(data, weights_feature, technique="data_weighted"): """ Computes the utility values of given data, according to the prespecified technique. Parameters ---------- data : pandas.Series The column of the dataframe associated with the feature. weight_feature : dict The dictionary corresponding to the feature leaf values. technique : str, optional (default = weight_data) The technique used to compute data values. It can be: data_weighted : feature data and its utility values. mid_point : the mid point in between all splitting points. mean_data : the mean of data in between all splitting points. mid_point_weighted : the mid points in between all splitting points, weighted by the number of data points in the interval. mean_data_weighted : the mean of data in between all splitting points, weighted by the number of data points in the interval. Returns ------- data_ordered : numpy array X coordinates of the data, or feature data point values. data_values : numpy array Y coordinates of the data, or utility values """ if technique == "data_weighted": data_ordered = np.sort(data) idx = np.searchsorted( np.array(weights_feature["Splitting points"]), data_ordered ) data_values = np.array(weights_feature["Histogram values"])[idx] return np.array(data_ordered), data_values if technique == "mid_point": mid_points = np.array(get_mid_pos(data, weights_feature["Splitting points"])) return mid_points, np.array(weights_feature["Histogram values"]) elif technique == "mean_data": mean_data = np.array(get_mean_pos(data, weights_feature["Splitting points"])) return mean_data, np.array(weights_feature["Histogram values"]) data_ordered = data.copy().sort_values() data_values = [weights_feature["Histogram values"][0]] * sum( data_ordered < weights_feature["Splitting points"][0] ) if technique == "mid_point_weighted": mid_points = get_mid_pos(data, weights_feature["Splitting points"]) mid_points_weighted = [mid_points[0]] * sum( data_ordered < weights_feature["Splitting points"][0] ) elif technique == "mean_data_weighted": mean_data = get_mean_pos(data, weights_feature["Splitting points"]) mean_data_weighted = [mean_data[0]] * sum( data_ordered < weights_feature["Splitting points"][0] ) for i, (s_i, s_ii) in enumerate( zip( weights_feature["Splitting points"][:-1], weights_feature["Splitting points"][1:], ) ): data_values += [weights_feature["Histogram values"][i + 1]] * sum( (data_ordered < s_ii) & (data_ordered > s_i) ) if technique == "mid_point_weighted": mid_points_weighted += [mid_points[i + 1]] * sum( (data_ordered < s_ii) & (data_ordered > s_i) ) elif technique == "mean_data_weighted": mean_data_weighted += [mean_data[i + 1]] * sum( (data_ordered < s_ii) & (data_ordered > s_i) ) data_values += [weights_feature["Histogram values"][-1]] * sum( data_ordered > weights_feature["Splitting points"][-1] ) if technique == "mid_point_weighted": mid_points_weighted += [mid_points[-1]] * sum( data_ordered > weights_feature["Splitting points"][-1] ) return np.array(mid_points_weighted), np.array(data_values) elif technique == "mean_data_weighted": mean_data_weighted += [mean_data[-1]] * sum( data_ordered > weights_feature["Splitting points"][-1] ) return np.array(mean_data_weighted), np.array(data_values) return np.array(data_ordered), np.array(data_values)
[docs] def map_x_knots(x_knots, num_splines_range, x_first=None, x_last=None): """ Map the 1d array of x_knots into a dictionary with utility and attributes as keys. Parameters ---------- x_knots : 1d np.array The positions of knots in a 1d array, following this structure: np.array([x_att1_1, x_att1_2, ... x_att1_m, x_att2_1, ... x_attn_m]) where m is the number of knots and n the number of attributes that are interpolated with splines. num_splines_range: dict A dictionary of the same format than weights of features names for each utility that are interpolated with monotonic splines. The key is a spline interpolated feature name, and the value is the number of splines used for interpolation as an int. There should be a key for all features where splines are used. x_first : list, optional (default=None) A list of all first knots in the order of the attributes from spline_utilities and num_splines_range. x_last : list, optional (default=None) A list of all last knots in the order of the attributes from spline_utilities and num_splines_range. Returns ------- x_knots_dict : dict A dictionary in the form of {utility: {attribute: x_knots}} where x_knots are the spline knots for the corresponding utility and attributes """ x_knots_dict = {} starter = 0 i = 0 for u in num_splines_range: num_splines_sorted = sort_dict(num_splines_range[u]) x_knots_dict[u] = {} for f in num_splines_sorted: if x_first is not None: x_knots_dict[u][f] = [x_first[i]] x_knots_dict[u][f].extend( x_knots[starter : starter + num_splines_range[u][f] - 1] ) x_knots_dict[u][f].append(x_last[i]) x_knots_dict[u][f] = np.array(x_knots_dict[u][f]) starter += num_splines_range[u][f] - 1 i += 1 else: x_knots_dict[u][f] = x_knots[ starter : starter + num_splines_range[u][f] + 1 ] starter += num_splines_range[u][f] + 1 return x_knots_dict
[docs] def sort_dict(dict_to_sort): """ Sort a dictionary by its keys. Parameters ---------- dict_to_sort : dict A dictionary to sort. Returns ------- dict_sorted : dict The sorted dictionary. """ dict_sorted = {} for k in sorted(dict_to_sort.keys()): dict_sorted[k] = dict_to_sort[k] return dict_sorted
def _check_rum_structure(rum_structure): """ Check that rum_structure, a list of dictionaries, is of the correct format. """ if not isinstance(rum_structure, list): raise ValueError("rum_structure must be a list") for i, rum_struct in enumerate(rum_structure): if "utility" not in rum_struct: raise ValueError( f"rum_structure {i} must contain utility key with the list of alternatives" ) if "variables" not in rum_struct: raise ValueError( f"rum_structure {i} must contain variables key with the list of variables" ) if "boosting_params" not in rum_struct: raise ValueError( f"rum_structure {i} must contain boosting_params key with the boosting parameters" ) if "shared" not in rum_struct: raise ValueError( f"rum_structure {i} must contain shared key with a boolean value" ) if len(rum_struct["utility"]) > 1 and not rum_struct["shared"]: raise ValueError( f"rum_structure {i} must be shared if the parameter is used in more than one utility function" ) if rum_struct["shared"] and len(rum_struct["utility"]) != len(rum_struct["variables"]): raise ValueError( f"rum_structure {i} must have the same number of variables and utility functions if shared is True" )