Source code for src.utils

import random
import pickle
from collections import defaultdict, Counter
from typing import List, Dict, Any, Optional, Union

import lightgbm as lgb
import pandas as pd
import numpy as np

# type hints for dataset split function
TrainTestSplit = Union[
    tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame],  # train and test
    tuple[
        pd.DataFrame,
        pd.DataFrame,
        pd.DataFrame,
        pd.DataFrame,
        pd.DataFrame,
        pd.DataFrame,
    ],  # train, val and test
]


[docs] def generate_rum_structure( alt_spec_features: Optional[Dict[str, List[str]]] = None, socio_demo_chars: Optional[List[str]] = None, functional_intercept: Optional[bool] = False, functional_params: Optional[bool] = False, ) -> List[Dict[str, Any]]: """ Generate the rum structure for the given dataset. Note that this code is written for a single alternative (i.e. regression or ordinal regression problem). Parameters ---------- alt_spec_features: Optional[Dict[str, List[str]]] The alternative-specific features to be used in the rum structure. The dictionary keys are the utility indices and the values are the features to be used in the rum structure. socio_demo_chars: Optional[List[str]] The socio-demographic characteristics to be used in the rum structure. They will represent the individual-specific constant learnt from the data. functional_intercept: Optional[bool] Whether to use the functional intercept or not. The default is False. functional_params: Optional[bool] Whether to use the functional parameters or not. The default is False. Returns ------- rum_structure: List[Dict[Any]] The rum structure for the dataset. """ # initialise rum_structure rum_structure = [] # alternative-specific features, one per ensemble if not functional_params: for key, value in alt_spec_features.items(): # monotone constraints for SwissMetro dataset if "TRAIN_TT" in value: monotone_constraints = [-1, -1, -1] md = 1 elif "SM_TT" in value: monotone_constraints = [-1, -1, -1, 0] md = 1 elif "CAR_TT" in value: monotone_constraints = [-1, -1] md = 1 elif "dur_walking" in value: monotone_constraints = [-1, -1, 0, 0] md = 1 elif "dur_cycling" in value: monotone_constraints = [-1, -1, 0, 0] md = 1 elif "dur_pt_access" in value: monotone_constraints = [-1, -1, -1, -1, -1, -1, -1, -1, 0, 0] md = 1 elif "dur_driving" in value: monotone_constraints = [-1, -1, -1, -1, -1, 0, 0] md = 1 elif value in [["f4"], ["f5"], ["f6"], ["f7"]]: monotone_constraints = [-1] md = 1 else: monotone_constraints = [0] * len(value) md = 1 interaction_constraints = [list(range(len(value)))] rum_structure_as = [ { "variables": value, "utility": [key], "boosting_params": { "monotone_constraints_method": "advanced", "max_depth": md, "n_jobs": -1, "learning_rate": 0.1, "verbose": -1, "monotone_constraints": monotone_constraints, "interaction_constraints": interaction_constraints, }, "shared": False, } ] # add the alternative-specific features to the rum_structure rum_structure.extend(rum_structure_as) else: # if functional parameters are used, add them to the rum_structure for key, value in alt_spec_features.items(): if "TRAIN_TT" in value: monotone_constraints = [-1, -1, -1] elif "SM_TT" in value: monotone_constraints = [-1, -1, -1, 0] elif "CAR_TT" in value: monotone_constraints = [-1, -1] elif "dur_walking" in value: monotone_constraints = [-1, -1, 0, 0] elif "dur_cycling" in value: monotone_constraints = [-1, -1, 0, 0] elif "dur_pt_access" in value: monotone_constraints = [-1, -1, -1, -1, -1, -1, -1, -1, 0, 0] elif "dur_driving" in value: monotone_constraints = [-1, -1, -1, -1, 0, 0, -1] elif value in [["f4"], ["f5"], ["f6"], ["f7"]]: monotone_constraints = [-1] else: monotone_constraints = [0] * len(value) rum_structure_params = [ { "variables": socio_demo_chars, "utility": [key], "boosting_params": { "monotone_constraints_method": "advanced", "n_jobs": -1, "learning_rate": 0.1, "monotone_constraints": [monotone_constraints[i]], "verbose": -1, }, "shared": False, "endogenous_variable": f, } for i, f in enumerate(value) ] # add the functional parameters to the rum_structure rum_structure.extend(rum_structure_params) # socio-demographic characteristics, all in one ensemble if functional_intercept: for key, _ in alt_spec_features.items(): rum_structure_sd = [ { "variables": socio_demo_chars, "utility": [key], "boosting_params": { "monotone_constraints_method": "advanced", "n_jobs": -1, "learning_rate": 0.1, "verbose": -1, }, "shared": False, } ] # add the socio-demographic characteristics to the rum_structure rum_structure.extend(rum_structure_sd) return rum_structure
[docs] def add_hyperparameters( rum_struct: List[Dict[str, Any]], hyperparameters: Dict[str, Any], ) -> Dict[str, Any]: """ Add hyperparameters to a specific dict of rum structure. Parameters ---------- rum_struct: List[Dict[str, Any]] The rum structure to be modified. hyperparameters: Dict[str, Any] The hyperparameters to be added to the rum structure. Returns ------- rum_structure: List[Dict[Any]] The modified rum structure with the hyperparameters added. """ for struct in rum_struct: # add the hyperparameters to the rum structure struct["boosting_params"].update(hyperparameters) return rum_struct
[docs] def generate_general_params(num_classes: int, **kwargs) -> Dict[str, Any]: """ " Generate the general parameters for the rumboost model. Parameters ---------- num_classes: int The number of classes in the dataset. kwargs: Dict[str, Any] The additional parameters to be added to the general parameters. These parameters will be used to update the general parameters. It has to be parameters that are accepted by rumboost. See the rumboost documentation for more details. Returns ------- general_params: Dict[str, Any] The general parameters for the rumboost model. """ # general parameters general_params = { "num_classes": num_classes, "max_booster_to_update": 1, } # update the general parameters with the kwargs general_params.update(kwargs) return general_params
[docs] def generate_ordinal_spec( model_type: Optional[str] = "proportional_odds", optim_interval: Optional[int] = 20 ) -> Dict[str, Any]: """ Generate the ordinal specification for the rumboost model. Parameters ---------- model_type: str The type of the model. It can be either 'proportional_odds', 'coral' or 'corn'. The default is 'proportional_odds'. optim_interval: int The optimisation interval at which thresholds are updated with scipy. The default is 20. Returns ------- ordinal_spec: Dict[str, Any] The ordinal specification for the rumboost model. """ ordinal_spec = { "model": model_type, "optim_interval": optim_interval, } return ordinal_spec
[docs] def build_lgb_dataset(X: pd.DataFrame, y: pd.Series) -> lgb.Dataset: """ Build the LightGBM dataset from the dataframe. Parameters ---------- X: pd.DataFrame The dataframe to be used. y: pd.Series The target variable. Returns ------- lgb_dataset: Any The LightGBM dataset. """ # create the LightGBM dataset lgb_dataset = lgb.Dataset(X, label=y, free_raw_data=False) return lgb_dataset
[docs] def split_dataset( df: pd.DataFrame, target: str, features: List[str], train_size: float = 0.8, val_size: Optional[float] = None, random_state: int = 42, groups: Optional[pd.Series] = None, save_path: Optional[str] = None, ) -> TrainTestSplit: """ Split the dataset into train and test sets. Parameters ---------- df: pd.DataFrame The dataframe to be used. target: str The target variable. features: List[str] The features to be used. train_size: float The size of the training set. The default is 0.7. This is the fraction of the total dataset. val_size: Optional[float] The size of the validation set. The default is 0.1. This is the fraction of the total dataset. random_state: int The random state to be used. The default is 42. groups: Optional[pd.Series] Whether to use stratified sampling or not. The default is None. If None, the data will be split randomly. If not None, the data will be split using the groups provided. save_path: Optional[str] The path to save the train and test sets. The default is None. Returns ------- TrainTestSplit The train and test sets. If val_size is provided, the train set will be split into train and validation sets. If groups is provided, the train and test sets will be split using stratified sampling. If groups is not provided, the train and test sets will be split randomly. """ assert ( train_size + (val_size if val_size else 0) <= 0.95 ), "The sum of train and val size must be less than 0.95." assert ( train_size + (val_size if val_size else 0) >= 0.5 ), "The sum of train and val size must be greater than 0.5" test_size = 1 - train_size - (val_size if val_size else 0) if groups is None: # if no groups are provided, use random sampling train_df = df.sample(frac=train_size, random_state=random_state).reset_index( drop=True ) test_df = df.drop(train_df.index).reset_index(drop=True) if save_path: train_df.to_csv(save_path + "train.csv", index=False) test_df.to_csv(save_path + "test.csv", index=False) if val_size: val_size = val_size / (1 - test_size) train_df = train_df.sample( frac=val_size, random_state=random_state ).reset_index(drop=True) val_df = train_df.drop(train_df.index).reset_index(drop=True) if save_path: val_df.to_csv(save_path + "val.csv", index=False) return ( train_df[features], train_df[target], val_df[features], val_df[target], test_df[features], test_df[target], ) return train_df[features], train_df[target], test_df[features], test_df[target] # split the data into train and test sets folds = stratified_group_k_fold( df[features], df[target], groups, k=int(1 / test_size), seed=random_state, ) for train_idx, test_idx in folds: train_df = df.iloc[train_idx].reset_index(drop=True) test_df = df.iloc[test_idx].reset_index(drop=True) groups_train = groups.iloc[train_idx].reset_index(drop=True) break if save_path: train_df.to_csv(save_path + "train.csv", index=False) test_df.to_csv(save_path + "test.csv", index=False) if val_size: val_size = val_size / (1 - test_size) val_folds = stratified_group_k_fold( train_df[features], train_df[target], groups_train, k=int(1 / val_size), seed=random_state, ) if save_path: pickle.dump( list(val_folds), open( save_path + "folds.pickle", "wb", ), ) for train_idx, val_idx in val_folds: val_df = train_df.iloc[val_idx].reset_index(drop=True) train_df = train_df.iloc[train_idx].reset_index(drop=True) break return ( train_df[features], train_df[target], val_df[features], val_df[target], test_df[features], test_df[target], ) return train_df[features], train_df[target], test_df[features], test_df[target]
# Sample a dataset grouped by `groups` and stratified by `y` # Source: https://www.kaggle.com/jakubwasikowski/stratified-group-k-fold-cross-validation
[docs] def stratified_group_k_fold(X, y, groups, k, seed=None): """ Stratified Group K-Fold cross-validator Provides train/test indices to split data in train/test sets. Parameters ---------- X : array-like of shape (n_samples, n_features) The input samples. y : array-like of shape (n_samples,) The target values. groups : array-like of shape (n_samples,) Group labels for the samples used while splitting the dataset into train/test set. k : int Number of folds. Must be at least 2. seed : int, optional Random seed for shuffling the data. Yields ------ train : ndarray The training set indices for that split. test : ndarray The testing set indices for that split. """ labels_num = np.max(y) + 1 y_counts_per_group = defaultdict(lambda: np.zeros(labels_num)) y_distr = Counter() for label, g in zip(y, groups): y_counts_per_group[g][label] += 1 y_distr[label] += 1 y_counts_per_fold = defaultdict(lambda: np.zeros(labels_num)) groups_per_fold = defaultdict(set) def eval_y_counts_per_fold(y_counts, fold): y_counts_per_fold[fold] += y_counts std_per_label = [] for label in range(labels_num): label_std = np.std( [y_counts_per_fold[i][label] / y_distr[label] for i in range(k)] ) std_per_label.append(label_std) y_counts_per_fold[fold] -= y_counts return np.mean(std_per_label) groups_and_y_counts = list(y_counts_per_group.items()) random.Random(seed).shuffle(groups_and_y_counts) for g, y_counts in sorted(groups_and_y_counts, key=lambda x: -np.std(x[1])): best_fold = None min_eval = None for i in range(k): fold_eval = eval_y_counts_per_fold(y_counts, i) if min_eval is None or fold_eval < min_eval: min_eval = fold_eval best_fold = i y_counts_per_fold[best_fold] += y_counts groups_per_fold[best_fold].add(g) all_groups = set(groups) for i in range(k): train_groups = all_groups - groups_per_fold[i] test_groups = groups_per_fold[i] train_indices = [i for i, g in enumerate(groups) if g in train_groups] test_indices = [i for i, g in enumerate(groups) if g in test_groups] yield train_indices, test_indices
[docs] def compute_metrics( preds: np.ndarray, binary_preds: np.ndarray, labels: np.ndarray, y_test: pd.Series, ) -> tuple[float, float, float]: """ Compute the metrics for the model. Parameters ---------- preds: np.ndarray The predictions of the model. binary_preds: np.ndarray The binary predictions of the model. labels: np.ndarray The labels of the model. y_test: pd.Series The test set. Returns ------- mae_test: float The mean absolute error of the model. loss_test: float The loss of the model. emae_test: float The expected mean absolute error of the model. """ mae_test = np.mean(np.abs(labels - y_test.values)) safe_binary_preds = np.clip(binary_preds, 1e-15, 1 - 1e-7) ranks = np.arange(binary_preds.shape[1]) levels = y_test.values[:, None] > ranks[None, :] loss_test = -np.mean( levels * np.log(safe_binary_preds) + (1 - levels) * np.log(1 - safe_binary_preds), axis=1, ).mean() all_labels = np.arange(preds.shape[1]) distances = np.abs(all_labels[None, :] - y_test.values[:, None]) emae_test = np.mean(preds * distances, axis=1).mean() return mae_test, loss_test, emae_test
[docs] def cross_entropy( y_true: np.ndarray, y_pred: np.ndarray, ) -> float: """ Compute the cross entropy loss. Parameters ---------- y_true: np.ndarray The true labels. y_pred: np.ndarray The predicted labels. Returns ------- loss: float The cross entropy loss. """ indices = range(len(y_true)) return -np.mean(np.log(y_pred[indices, y_true]))
[docs] def pkl_to_df(pkl_path: str) -> pd.DataFrame: """ Convert a pickle file to a pandas dataframe. Parameters ---------- pkl_path: str The path to the pickle file. Returns ------- df: pd.DataFrame The dataframe containing the data from the pickle file. """ data = pd.read_pickle(pkl_path) data_df = pd.DataFrame(data["x"], columns=data["x_names"]) data_df[data["z_names"]] = data["z"] data_df["CHOICE"] = data["y"] - 1 data_df["CHOICE"] = data_df["CHOICE"].astype(int) data = data_df.copy() return data