import numpy as np
import pandas as pd
import math
import warnings
import scipy.stats as scp
import matplotlib.pyplot as plt
from ..ml.initializations import init_params
from ..ml.activations import logit
from ..data.ProcessData import parse_formula
from ..misc import ConvergenceWarning, summary, get_significance
# from .glm_test import GLM
# TODO: HC covariance
# TODO: check probit
[docs]class GLM:
def __init__(self, formula, data, family='binomial', fit_intercept=False, initializer='zeros'):
"""Generalized Linear Model implemented with Newton-Raphson's method.
:param formula: Regression formula to be run of the form :obj:`y ~ x1 + x2`. See :py:meth:`statinf.data.ProcessData.parse_formula`.
:type formula: :obj:`str`
:param data: Input data with Pandas format.
:type data: :obj:`pandas.DataFrame`
:param family: Family distribution of the dependent variable, defaults to binomial.
:type family: :obj:`str`, optional
:param fit_intercept: Used for adding intercept in the regression formula, defaults to False.
:type fit_intercept: :obj:`bool`, optional
:param initializer: Method for initializing the first parameters (see :py:meth:`statinf.ml.initializations`), defaults to 'zeros'.
:type initializer: :obj:`str`, optional
.. note:: The modules allows Binomial (for Logit) and Gaussian (for Probit) distributions.
It will soon be extended to other distributions.
"""
# Parse the formula
self.df, self.X_col, self.Y_col = parse_formula(data=data, formula=formula, check_values=True, return_all=True)
self.formula = formula
self.fit_intercept = fit_intercept
self.n = self.df.shape[0]
self.p = len(self.X_col) + 1 if fit_intercept else len(self.X_col)
self.X = self.df[self.X_col].to_numpy()
self.Y = np.array(self.df[self.Y_col].values).reshape(self.n, 1)
self.cov_type = 'nonrobust'
self.family = family
self.log_likelihood = np.Inf
self.log_likelihood_hist = []
self.gradient_hist = []
self.variance_hist = []
self.coefs_hist = []
self.hessian_hist = []
# Initialize parameters
self.beta = init_params(rows=self.p, cols=1, method=initializer)
def _get_X(self, new_data=None):
"""Function to process data (and append unit variable if fit_intercept)
:param new_data: Data to transform, defaults to None (gets train data provided in __init__).
:type new_data: :obj:`pandas.DataFrame`, optional
:return: Formatted data
:rtype: :obj:`numpy.ndarray`
"""
if new_data is None:
X = self.X
else:
# If new_data, parse the formula to get transformations
df = parse_formula(data=new_data, formula=self.formula, check_values=True)
X = df[self.X_col].to_numpy()
# If fit_intercept, append unit variable
if self.fit_intercept:
X = np.hstack((np.ones((self.n, 1), dtype=self.X.dtype), self.X))
return X
def _prob(self, X):
""" Prob
"""
if self.family == 'binomial':
prob = logit(X, weights=self.beta) # (X.dot(self.beta))
elif self.family == 'gaussian':
prob = scp.norm.cdf(X.dot(self.beta), loc=0, scale=1)
else:
raise ValueError('Family distribution is not valid.')
return prob
def _log_likelihood(self, X=None, Y=None):
"""
Compute Log-Likelihood
"""
X_data = self._get_X() if X is None else X
Y_data = self.Y if Y is None else Y
# Individual log-likelihood
log_likelihood_id = np.array(Y_data * np.log(self._prob(X_data)) + (1 - Y_data) * np.log(1 - self._prob(X_data)))
# Overall log-likelihood
log_likelihood = log_likelihood_id.sum()
return log_likelihood
def _gradient(self):
"""
Gradient (Overall) not the score function
"""
X = self._get_X()
gradient = X.T.dot((self._prob(X) - self.Y)) / self.n # gradient of the likelihood
return gradient
def _hessian(self):
"""
Hessian - Fisher Information
"""
X = self._get_X()
pi = self._prob(X)
S = np.multiply(pi, (1 - pi)) * np.identity(self.n)
hessian = (1 / self.n) * X.T.dot(S).dot(X)
return hessian
def _sandwich(self):
X = self._get_X()
pi = self._prob(X)
res_2 = (self.Y - pi)**2 * np.identity(self.n)
XX_1 = np.linalg.inv(X.T.dot(X))
Xres_2X = X.T.dot(res_2.dot(X))
_sand = XX_1.dot(Xres_2X).dot(XX_1)
return _sand
[docs] def variance(self, cov_type='nonrobust'):
"""Compute the covariance matrix for the fitted model.
:param cov_type: Type of the covariance matrix, defaults to nonrobust.
:type cov_type: :obj:`str`, optional
:formulae: * **Non-robust covariance matrix**:
.. math:: \\sigma_{\\beta} = {(X'SX)}^{-1}
* **Sandwich covariance matrix**:
.. math:: \\sigma_{\\beta} = {(X'X)}^{-1} (X'\\hat{S}X) {(X'X)}^{-1}
with :math:`S = \\text{diag}(\\hat{p}_i(1 - \\hat{p}_i))` and
:math:`\\hat{S} = \\text{diag} \\left( (Y_i - \\hat{p}_i)^{2} \\right)`
.. note:: Only non-robust covariance matrix is currently available.
Sandwich estimate and :math:`HC0`, :math:`HC1`, :math:`HC2`, :math:`HC3` will soon be implemented.
:return: Fisher information matrix
:rtype: :obj:`numpy.array`
"""
self.cov_type = cov_type
if self.cov_type == 'nonrobust':
# Uses Fisher information matrix
_variance = np.linalg.inv(self._hessian())
# elif self.cov_type == 'sandwich':
# _variance = self._sandwich()
else:
error_msg = f"""Value for cov_type not valid. Got '{self.cov_type}'.
Please chose 'nonrobust'.
"""
raise ValueError(error_msg)
return np.array(_variance)
def _std_errors(self):
"""Compute the covariance matrix used for standard errors
"""
_variance = self.variance()
_std = np.sqrt((1 / self.n) * np.diag(_variance))
return np.array(_std).reshape(self.p, 1)
[docs] def fit(self, maxit=15, cov_type='nonrobust', improvement_threshold=0.0005, keep_hist=True, plot=False):
"""Fits the GLM regression model using Newton-Raphson method.
:param maxit: Maximum number of iterations, defaults to 15.
:type maxit: :obj:`int`, optional
:param cov_type: Type of the covariance matrix (non-robust or sandwich), defaults to nonrobust.
:type cov_type: :obj:`str`, optional
:param improvement_threshold: Threshold from which we consider the likelihood improved, defaults to 0.0005.
:type improvement_threshold: :obj:`float`, optional
:param keep_hist: Keeps training history (gradients, hessian, etc...), can be turned off for saving memory, defaults to True.
:type keep_hist: :obj:`bool`, optional
:param plot: Plots evolution of log-likelihood through the different iterations (requires :obj:`keep_hist = True`), defaults to False.
:type plot: :obj:`bool`, optional
:formulae: * **Log-likelihood**:
.. math:: l(\\beta) = y_{i} \\log \\left[ G(\\mathbf{x_i} \\beta) \\right] + (1 - y_{i}) \\log \\left[1 - G(\\mathbf{x_i} \\beta) \\right]
* **Newton's method**:
.. math:: \\hat{\\beta}_{s+1} = \\hat{\\beta}_{s} - H^{-1}_{s} G_{s}
with
.. math:: G_{s} = \\dfrac{\\partial}{\\partial \\beta_{s}} l(\\beta) = \\sum_{i=1}^{N} x_i (y_{i} - G(x_{i} \\beta))
and
.. math:: H_{s} = \\dfrac{\\partial^2}{\\partial \\beta_{s}^2} l(\\beta) = X'S_{s}X
where :math:`S = \\text{diag} \\left( (Y_i - \\hat{p}_i)^{2} \\right)` and
:math:`G` denotes the link function (:py:meth:`statinf.ml.activations.sigmoid` for logit or gaussian c.d.f for probit).
:references: * Wooldridge, J. M. (2010). Econometric analysis of cross section and panel data.
* Cameron, A. C., & Trivedi, P. K. (2005). Microeconometrics: methods and applications. Cambridge university press.
* McCullagh, P. (2018). Generalized linear models. Routledge.
"""
self.cov_type = cov_type.lower()
self.it = 0
diff = np.inf
old_log_like = np.inf
while (self.it < maxit) and (np.abs(diff) >= improvement_threshold):
self.it += 1
log_like = self._log_likelihood()
gradient = self._gradient()
variance = self.variance(cov_type=cov_type.lower())
self.beta -= variance.dot(gradient)
diff = old_log_like - log_like
old_log_like = log_like
if keep_hist:
self.log_likelihood = log_like
self.log_likelihood_hist.append(log_like)
self.gradient_hist.append(gradient)
self.variance_hist.append(variance)
# Check whether convergence was reached
self.converged = np.abs(diff) < improvement_threshold
if (self.converged is False) or (math.isnan(log_like)):
warnings.warn('Model did not converge.', ConvergenceWarning)
if plot:
# Plot the training and test loss
plt.title('GLM training history', loc='center')
plt.plot(self.log_likelihood_hist, label='Log-likelihood')
# plt.plot(self.test_losses, label='Test loss')
plt.legend()
plt.show()
[docs] def r_squared(self):
"""Mc Fadden's pseudo-:math:`R^{2}` -- Goodness of fit
:formula: .. math:: R^{2} = 1 - \\dfrac{LL(\\hat{\\beta})}{LL(\\bar{Y})}
:return: Goodness of fit.
:rtype: :obj:`float`
"""
null_mod = self.__class__(formula=f'{self.Y_col} ~ 1', data=self.df)
_r2 = 1 - self._log_likelihood() / null_mod._log_likelihood()
self.r_squared = _r2
return _r2
[docs] def adjusted_r_squared(self):
"""Mc Fadden's pseudo-:math:`R^{2}` adjusted -- Adjusted goodness of fit
:formula: .. math:: R^{2}_{adj} = 1 - \\dfrac{LL(\\hat{\\beta}) - p}{LL(\\bar{Y})}
:return: Adjusted goodness of fit.
:rtype: :obj:`float`
"""
null_mod = self.__class__(formula=f'{self.Y_col} ~ 1', data=self.df)
ar2 = 1 - (self._log_likelihood() - self.p) / null_mod._log_likelihood()
self.adjusted_r_squared = ar2
return ar2
[docs] def summary(self, return_df=False):
"""Statistical summary for GLM model
:param return_df: Return the summary as a Pandas DataFrame, else returns a string, defaults to False.
:type return_df: :obj:`bool`
:formulae: * **Mc Fadden's** :math:`R^{2}`:
.. math:: R^{2} = 1 - \\dfrac{LL(\\hat{\\beta})}{LL(\\bar{Y})}
where :math:`LL` represents the log-likelihood.
:references: * Student. (1908). The probable error of a mean. Biometrika, 1-25.
* Wooldridge, J. M. (2016). `Introductory econometrics: A modern approach
<https://faculty.arts.ubc.ca/nfortin/econ495/IntroductoryEconometrics_AModernApproach_FourthEdition_Jeffrey_Wooldridge.pdf>`_.
Nelson Education.
* Agresti, A. (2003). Categorical data analysis (Vol. 482). John Wiley & Sons.
* Murphy, K. P. (2012). Machine learning: a probabilistic perspective. MIT press.
:return: Model's summary.
:rtype: :obj:`pandas.DataFrame` or :obj:`str`
"""
# Fit model if not already done
if self.log_likelihood_hist == []:
self.fit()
# Initialize
betas = self.beta
X = self._get_X()
t_values = betas / self._std_errors()
p_values = [round(2 * (1 - scp.t.cdf(np.abs(i[0]), (len(X) - 1))), 5) for i in t_values]
z = scp.norm.ppf(0.975)
betas = [b[0] for b in self.beta]
t_values = [t[0] for t in t_values]
summary_df = pd.DataFrame()
summary_df["Variables"] = ['(Intercept)'] + self.X_col if self.fit_intercept else self.X_col
summary_df["Coefficients"] = betas
summary_df["Standard Errors"] = self._std_errors()
summary_df["t-values"] = t_values
summary_df["Probabilities"] = p_values
summary_df["Significance"] = summary_df["Probabilities"].map(lambda x: get_significance(x))
summary_df["CI_lo"] = summary_df["Coefficients"] - z * summary_df["Standard Errors"]
summary_df["CI_hi"] = summary_df["Coefficients"] + z * summary_df["Standard Errors"]
# Null model for null log likelihood
null_mod = self.__class__(formula=f'{self.Y_col} ~ 1', data=self.df)
# Key metrics for summary
_ll = round(self._log_likelihood(), 2)
nll = round(null_mod._log_likelihood(), 2)
_r2 = round(1 - _ll / nll, 5)
ar2 = round(1 - (_ll - self.p) / nll, 5)
_n_ = self.n
_p_ = self.p
_it = self.it
_cv = ' True' if self.converged else " False"
_ct = ' ' + self.cov_type
# Likelihood Ratio test
G = 2 * (_ll - nll)
lrp = round(scp.chi2.sf(G, self.p), 3)
if return_df:
return(summary_df)
else:
max_var = np.max([len(v) for v in summary_df.Variables])
add_sp = ' ' * np.max([max_var - 17, 0])
add_sep = '=' * np.max([max_var - 17, 0])
space = np.max([max_var, 17])
summ = f"============================================================================================================={add_sep}\n"
summ += f'| Logit summary {add_sp}|\n'
summ += f"============================================================================================================={add_sep}\n"
summ += f"| McFadden's R² = {_r2:10} | McFadden's R² Adj. = {ar2:10} {add_sp}|\n"
summ += f"| Log-Likelihood = {_ll:10} | Null Log-Likelihood = {nll:10} {add_sp}|\n"
summ += f"| LR test p-value = {lrp:10} | Covariance = {_ct:10} {add_sp}|\n"
summ += f"| n = {_n_:10} | p = {_p_:10} {add_sp}|\n"
summ += f"| Iterations = {_it:10} | Convergence = {_cv:5} {add_sp}|\n"
summ += summary(summary_df)
print(summ)
[docs] def predict(self, new_data, return_proba=False):
"""Predicted :math:`\\hat{Y}` values for for a new dataset
:param new_data: New data to evaluate with pandas data-frame format.
:type new_data: :obj:`pandas.DataFrame`
:param return_proba: Whether to return probabilities or binary values, defaults to False
:type return_proba: :obj:`bool`, optional
:formula: .. math:: f(X) = \\dfrac{1}{e^{-\\beta X} + 1}
:return: Predictions :math:`\\hat{Y}`
:rtype: :obj:`numpy.ndarray`
"""
# Fit model if not already done
if self.log_likelihood_hist == []:
self.fit()
# Tranform data
new_X = self._get_X(new_data=new_data)
pred = self._prob(new_X)
if return_proba:
return np.asarray([x[0] for x in pred])
else:
return np.asarray([1 if x[0] > 0.5 else 0 for x in pred])
[docs] def partial_effects(self, variables, new_data=None, average=False):
"""Computes Partial and Average Partial Effects (APE).
:param variables: List of variables for which to compute the PE/APE.
:type variables: :obj:`list`
:param new_data: Data to use for computations, optional, defaults to None (uses training set).
:type new_data: :obj:`pandas.DataFrame`
:param average: Whether to compute Average Partial Effects or not, defaults to False.
:type average: :obj:`bool`, optional
:formula: .. math:: PE(X_{i}) = \\beta_{i} \\dfrac{e^{-\\beta X}}{(e^{-\\beta X} + 1)^{2}}
if :obj:`average = True`, :math:`APE = \\bar{PE}`
:raises TypeError: If argument :obj:`variables` is neither :obj:`str` nor :obj:`list`.
:return: Dictionnary including Partial Effects (PE) or Average Partial Effects (APE).
:rtype: :obj:`dict`
"""
assert self.family == 'binomial', 'APE module only available for Logit model.'
# Fit model if not already done
if self.log_likelihood_hist == []:
self.fit()
# Get data
X = self._get_X(new_data)
# Get coefficients as a DataFrame (easier to handle for fitlering)
if self.fit_intercept:
# Append the name of the intercept
cols = ['(Intercept)'] + self.X_col
else:
cols = self.X_col
coefs = pd.DataFrame({'Coefficients': [x[0] for x in self.beta]}, index=cols)
# Compute exp(bX)
exp_bx = np.exp(-X.dot(self.beta))
# Check type for argument variables
if type(variables) == str:
var = [variables]
elif type(variables) == list:
var = variables
else:
raise TypeError('Type for argument variables is not valid.')
# Initialize the dictionnary for output
pes = {}
# Compute PEs (or APEs) for each variable
for v in var:
b = coefs.Coefficients[v].min()
pe = b * exp_bx / ((exp_bx + 1)**2)
pe_out = pe.mean() if average else pe
pes.update({v: pe_out})
return pes