# Source code for statinf.regressions.glm

import numpy as np
import pandas as pd
import math
import warnings
import scipy.stats as scp
import matplotlib.pyplot as plt

from ..ml.initializations import init_params
from ..ml.activations import logit

from ..data.ProcessData import parse_formula
from ..misc import ConvergenceWarning, summary, get_significance
# from .glm_test import GLM

# TODO: HC covariance
# TODO: check probit

[docs]class GLM:

def __init__(self, formula, data, family='binomial', fit_intercept=False, initializer='zeros'):
"""Generalized Linear Model implemented with Newton-Raphson's method.

:param formula: Regression formula to be run of the form :obj:y ~ x1 + x2. See :py:meth:statinf.data.ProcessData.parse_formula.
:type formula: :obj:str
:param data: Input data with Pandas format.
:type data: :obj:pandas.DataFrame
:param family: Family distribution of the dependent variable, defaults to binomial.
:type family: :obj:str, optional
:param fit_intercept: Used for adding intercept in the regression formula, defaults to False.
:type fit_intercept: :obj:bool, optional
:param initializer: Method for initializing the first parameters (see :py:meth:statinf.ml.initializations), defaults to 'zeros'.
:type initializer: :obj:str, optional

.. note:: The modules allows Binomial (for Logit) and Gaussian (for Probit) distributions.
It will soon be extended to other distributions.
"""
# Parse the formula
self.df, self.X_col, self.Y_col = parse_formula(data=data, formula=formula, check_values=True, return_all=True)
self.formula        = formula
self.fit_intercept  = fit_intercept
self.n              = self.df.shape[0]
self.p              = len(self.X_col) + 1 if fit_intercept else len(self.X_col)
self.X              = self.df[self.X_col].to_numpy()
self.Y              = np.array(self.df[self.Y_col].values).reshape(self.n, 1)
self.cov_type       = 'nonrobust'
self.family         = family
self.log_likelihood = np.Inf
self.log_likelihood_hist = []
self.variance_hist  = []
self.coefs_hist     = []
self.hessian_hist   = []

# Initialize parameters
self.beta = init_params(rows=self.p, cols=1, method=initializer)

def _get_X(self, new_data=None):
"""Function to process data (and append unit variable if fit_intercept)

:param new_data: Data to transform, defaults to None (gets train data provided in __init__).
:type new_data: :obj:pandas.DataFrame, optional

:return: Formatted data
:rtype: :obj:numpy.ndarray
"""
if new_data is None:
X = self.X
else:
# If new_data, parse the formula to get transformations
df = parse_formula(data=new_data, formula=self.formula, check_values=True)
X = df[self.X_col].to_numpy()

# If fit_intercept, append unit variable
if self.fit_intercept:
X = np.hstack((np.ones((self.n, 1), dtype=self.X.dtype), self.X))

return X

def _prob(self, X):
""" Prob
"""
if self.family == 'binomial':
prob = logit(X, weights=self.beta)  # (X.dot(self.beta))
elif self.family == 'gaussian':
prob = scp.norm.cdf(X.dot(self.beta), loc=0, scale=1)
else:
raise ValueError('Family distribution is not valid.')

return prob

def _log_likelihood(self, X=None, Y=None):
"""
Compute Log-Likelihood
"""
X_data = self._get_X() if X is None else X
Y_data = self.Y if Y is None else Y

# Individual log-likelihood
log_likelihood_id = np.array(Y_data * np.log(self._prob(X_data)) + (1 - Y_data) * np.log(1 - self._prob(X_data)))
# Overall log-likelihood
log_likelihood = log_likelihood_id.sum()
return log_likelihood

"""
Gradient (Overall) not the score function
"""
X = self._get_X()
gradient = X.T.dot((self._prob(X) - self.Y)) / self.n  # gradient of the likelihood

def _hessian(self):
"""
Hessian - Fisher Information
"""
X = self._get_X()
pi = self._prob(X)
S = np.multiply(pi, (1 - pi)) * np.identity(self.n)
hessian = (1 / self.n) * X.T.dot(S).dot(X)
return hessian

def _sandwich(self):
X = self._get_X()
pi = self._prob(X)
res_2 = (self.Y - pi)**2 * np.identity(self.n)
XX_1 = np.linalg.inv(X.T.dot(X))
Xres_2X = X.T.dot(res_2.dot(X))
_sand = XX_1.dot(Xres_2X).dot(XX_1)
return _sand

[docs]    def variance(self, cov_type='nonrobust'):
"""Compute the covariance matrix for the fitted model.

:param cov_type: Type of the covariance matrix, defaults to nonrobust.
:type cov_type: :obj:str, optional

:formulae: * **Non-robust covariance matrix**:

.. math:: \\sigma_{\\beta} = {(X'SX)}^{-1}

* **Sandwich covariance matrix**:

.. math:: \\sigma_{\\beta} = {(X'X)}^{-1} (X'\\hat{S}X) {(X'X)}^{-1}

with :math:S = \\text{diag}(\\hat{p}_i(1 - \\hat{p}_i)) and
:math:\\hat{S} = \\text{diag} \\left( (Y_i - \\hat{p}_i)^{2} \\right)

.. note:: Only non-robust covariance matrix is currently available.
Sandwich estimate and :math:HC0, :math:HC1, :math:HC2, :math:HC3 will soon be implemented.

:return: Fisher information matrix
:rtype: :obj:numpy.array
"""
self.cov_type = cov_type

if self.cov_type == 'nonrobust':
# Uses Fisher information matrix
_variance = np.linalg.inv(self._hessian())
# elif self.cov_type == 'sandwich':
#     _variance = self._sandwich()
else:
error_msg = f"""Value for cov_type not valid. Got '{self.cov_type}'.
"""
raise ValueError(error_msg)
return np.array(_variance)

def _std_errors(self):
"""Compute the covariance matrix used for standard errors
"""
_variance = self.variance()
_std = np.sqrt((1 / self.n) * np.diag(_variance))
return np.array(_std).reshape(self.p, 1)

[docs]    def fit(self, maxit=15, cov_type='nonrobust', improvement_threshold=0.0005, keep_hist=True, plot=False):
"""Fits the GLM regression model using Newton-Raphson method.

:param maxit: Maximum number of iterations, defaults to 15.
:type maxit: :obj:int, optional
:param cov_type: Type of the covariance matrix (non-robust or sandwich), defaults to nonrobust.
:type cov_type: :obj:str, optional
:param improvement_threshold: Threshold from which we consider the likelihood improved, defaults to 0.0005.
:type improvement_threshold: :obj:float, optional
:param keep_hist: Keeps training history (gradients, hessian, etc...), can be turned off for saving memory, defaults to True.
:type keep_hist: :obj:bool, optional
:param plot: Plots evolution of log-likelihood through the different iterations (requires :obj:keep_hist = True), defaults to False.
:type plot: :obj:bool, optional

:formulae: * **Log-likelihood**:

.. math:: l(\\beta) = y_{i} \\log \\left[ G(\\mathbf{x_i} \\beta) \\right] + (1 - y_{i}) \\log \\left[1 - G(\\mathbf{x_i} \\beta) \\right]

* **Newton's method**:

.. math:: \\hat{\\beta}_{s+1} = \\hat{\\beta}_{s} - H^{-1}_{s} G_{s}

with

.. math:: G_{s} = \\dfrac{\\partial}{\\partial \\beta_{s}} l(\\beta) = \\sum_{i=1}^{N} x_i (y_{i} - G(x_{i} \\beta))

and

.. math:: H_{s} = \\dfrac{\\partial^2}{\\partial \\beta_{s}^2} l(\\beta) = X'S_{s}X

where :math:S = \\text{diag} \\left( (Y_i - \\hat{p}_i)^{2} \\right) and
:math:G denotes the link function (:py:meth:statinf.ml.activations.sigmoid for logit or gaussian c.d.f for probit).

:references: * Wooldridge, J. M. (2010). Econometric analysis of cross section and panel data.
* Cameron, A. C., & Trivedi, P. K. (2005). Microeconometrics: methods and applications. Cambridge university press.
* McCullagh, P. (2018). Generalized linear models. Routledge.
"""
self.cov_type = cov_type.lower()
self.it = 0
diff = np.inf
old_log_like = np.inf

while (self.it < maxit) and (np.abs(diff) >= improvement_threshold):
self.it      += 1
log_like      = self._log_likelihood()
variance      = self.variance(cov_type=cov_type.lower())
diff          = old_log_like - log_like
old_log_like  = log_like

if keep_hist:
self.log_likelihood = log_like
self.log_likelihood_hist.append(log_like)
self.variance_hist.append(variance)

# Check whether convergence was reached
self.converged = np.abs(diff) < improvement_threshold
if (self.converged is False) or (math.isnan(log_like)):
warnings.warn('Model did not converge.', ConvergenceWarning)

if plot:
# Plot the training and test loss
plt.title('GLM training history', loc='center')
plt.plot(self.log_likelihood_hist, label='Log-likelihood')
# plt.plot(self.test_losses, label='Test loss')
plt.legend()
plt.show()

[docs]    def r_squared(self):
"""Mc Fadden's pseudo-:math:R^{2} -- Goodness of fit

:formula: .. math:: R^{2} = 1 - \\dfrac{LL(\\hat{\\beta})}{LL(\\bar{Y})}

:return: Goodness of fit.
:rtype: :obj:float
"""
null_mod = self.__class__(formula=f'{self.Y_col} ~ 1', data=self.df)
_r2 = 1 - self._log_likelihood() / null_mod._log_likelihood()
self.r_squared = _r2
return _r2

"""Mc Fadden's pseudo-:math:R^{2} adjusted -- Adjusted goodness of fit

:formula: .. math:: R^{2}_{adj} = 1 - \\dfrac{LL(\\hat{\\beta}) - p}{LL(\\bar{Y})}

:rtype: :obj:float
"""
null_mod = self.__class__(formula=f'{self.Y_col} ~ 1', data=self.df)
ar2 = 1 - (self._log_likelihood() - self.p) / null_mod._log_likelihood()
return ar2

[docs]    def summary(self, return_df=False):
"""Statistical summary for GLM model

:param return_df: Return the summary as a Pandas DataFrame, else returns a string, defaults to False.
:type return_df: :obj:bool

:formulae: * **Mc Fadden's** :math:R^{2}:

.. math:: R^{2} = 1 - \\dfrac{LL(\\hat{\\beta})}{LL(\\bar{Y})}

where :math:LL represents the log-likelihood.

:references: * Student. (1908). The probable error of a mean. Biometrika, 1-25.
* Wooldridge, J. M. (2016). Introductory econometrics: A modern approach
<https://faculty.arts.ubc.ca/nfortin/econ495/IntroductoryEconometrics_AModernApproach_FourthEdition_Jeffrey_Wooldridge.pdf>_.
Nelson Education.
* Agresti, A. (2003). Categorical data analysis (Vol. 482). John Wiley & Sons.
* Murphy, K. P. (2012). Machine learning: a probabilistic perspective. MIT press.

:return: Model's summary.
:rtype: :obj:pandas.DataFrame or :obj:str
"""
# Fit model if not already done
if self.log_likelihood_hist == []:
self.fit()

# Initialize
betas = self.beta
X = self._get_X()

t_values = betas / self._std_errors()

p_values = [round(2 * (1 - scp.t.cdf(np.abs(i[0]), (len(X) - 1))), 5) for i in t_values]
z = scp.norm.ppf(0.975)

betas = [b[0] for b in self.beta]
t_values = [t[0] for t in t_values]

summary_df = pd.DataFrame()
summary_df["Variables"] = ['(Intercept)'] + self.X_col if self.fit_intercept else self.X_col
summary_df["Coefficients"] = betas
summary_df["Standard Errors"] = self._std_errors()
summary_df["t-values"] = t_values
summary_df["Probabilities"] = p_values
summary_df["Significance"] = summary_df["Probabilities"].map(lambda x: get_significance(x))
summary_df["CI_lo"] = summary_df["Coefficients"] - z * summary_df["Standard Errors"]
summary_df["CI_hi"] = summary_df["Coefficients"] + z * summary_df["Standard Errors"]

# Null model for null log likelihood
null_mod = self.__class__(formula=f'{self.Y_col} ~ 1', data=self.df)

# Key metrics for summary
_ll = round(self._log_likelihood(), 2)
nll = round(null_mod._log_likelihood(), 2)
_r2 = round(1 - _ll / nll, 5)
ar2 = round(1 - (_ll - self.p) / nll, 5)
_n_ = self.n
_p_ = self.p
_it = self.it
_cv = '     True' if self.converged else "    False"
_ct = ' ' + self.cov_type

# Likelihood Ratio test
G = 2 * (_ll - nll)
lrp = round(scp.chi2.sf(G, self.p), 3)

if return_df:
return(summary_df)
else:
max_var = np.max([len(v) for v in summary_df.Variables])

add_sp = ' ' * np.max([max_var - 17, 0])
add_sep = '=' * np.max([max_var - 17, 0])
space = np.max([max_var, 17])

summ += f'|                                              Logit summary                                                {add_sp}|\n'
summ += f"| Log-Likelihood         =              {_ll:10} | Null Log-Likelihood        =                {nll:10} {add_sp}|\n"
summ += f"| LR test p-value        =              {lrp:10} | Covariance                 =                {_ct:10} {add_sp}|\n"
summ += f"| n                      =              {_n_:10} | p                          =                {_p_:10} {add_sp}|\n"
summ += f"| Iterations             =              {_it:10} | Convergence                =                 {_cv:5} {add_sp}|\n"
summ += summary(summary_df)
print(summ)

[docs]    def predict(self, new_data, return_proba=False):
"""Predicted :math:\\hat{Y} values for for a new dataset

:param new_data: New data to evaluate with pandas data-frame format.
:type new_data: :obj:pandas.DataFrame
:param return_proba: Whether to return probabilities or binary values, defaults to False
:type return_proba: :obj:bool, optional

:formula: .. math:: f(X) = \\dfrac{1}{e^{-\\beta X} + 1}

:return: Predictions :math:\\hat{Y}
:rtype: :obj:numpy.ndarray
"""
# Fit model if not already done
if self.log_likelihood_hist == []:
self.fit()

# Tranform data
new_X = self._get_X(new_data=new_data)

pred = self._prob(new_X)
if return_proba:
return np.asarray([x[0] for x in pred])
else:
return np.asarray([1 if x[0] > 0.5 else 0 for x in pred])

[docs]    def partial_effects(self, variables, new_data=None, average=False):
"""Computes Partial and Average Partial Effects (APE).

:param variables: List of variables for which to compute the PE/APE.
:type variables: :obj:list
:param new_data: Data to use for computations, optional, defaults to None (uses training set).
:type new_data: :obj:pandas.DataFrame
:param average: Whether to compute Average Partial Effects or not, defaults to False.
:type average: :obj:bool, optional

:formula: .. math:: PE(X_{i}) = \\beta_{i} \\dfrac{e^{-\\beta X}}{(e^{-\\beta X} + 1)^{2}}

if :obj:average = True, :math:APE = \\bar{PE}

:raises TypeError: If argument :obj:variables is neither :obj:str nor :obj:list.

:return: Dictionnary including Partial Effects (PE) or Average Partial Effects (APE).
:rtype: :obj:dict
"""

assert self.family == 'binomial', 'APE module only available for Logit model.'

# Fit model if not already done
if self.log_likelihood_hist == []:
self.fit()

# Get data
X = self._get_X(new_data)

# Get coefficients as a DataFrame (easier to handle for fitlering)
if self.fit_intercept:
# Append the name of the intercept
cols = ['(Intercept)'] + self.X_col
else:
cols = self.X_col

coefs = pd.DataFrame({'Coefficients': [x[0] for x in self.beta]}, index=cols)

# Compute exp(bX)
exp_bx = np.exp(-X.dot(self.beta))

# Check type for argument variables
if type(variables) == str:
var = [variables]
elif type(variables) == list:
var = variables
else:
raise TypeError('Type for argument variables is not valid.')

# Initialize the dictionnary for output
pes = {}

# Compute PEs (or APEs) for each variable
for v in var:
b = coefs.Coefficients[v].min()
pe = b * exp_bx / ((exp_bx + 1)**2)
pe_out = pe.mean() if average else pe

pes.update({v: pe_out})

return pes