Source code for statinf.stats.unsupervised

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random

from ..nonparametrics.kernels import gaussian


[docs]class KMeans(): def __init__(self, k=1, max_iter=100, init='random', random_state=0): """ K-means clustering implementation. .. warning:: This function is still under development. This is a beta version, please be aware that some functionalitie might not be available. The full stable version soon will be released. :param k: number of clusters, default is 1. :type k: :obj:`int` :param max_iter: number of iterations for convergence. :type max_iter: :obj:`int` :param init: initialization option, options are random or kmeans++ . :type init: :obj:`String` :param random_state: seed of the random state, default is 0. :type random_state: :obj:`int` :param labels: labels for each datapoint. :type labels: :obj:`numpy.array` :param centroids: coordinates of the centroids. :type centroids: :obj:`numpy.array` :references: * Friedman, J., Hastie, T., & Tibshirani, R. (2001). The elements of statistical learning (Vol. 1, No. 10). New York: Springer series in statistics. """ self.max_iter = max_iter self.init = init self.random_state = random_state self.k = k self.labels_ = 0 self.centroids = 0
[docs] def fit(self, X): """ Fit the model to the data using different initializations (random init or kmeans++) :param X: Input data. :type X: :obj:`numpy.array` """ # Initialize centroids randomly or using k-means ++ if type(X) == pd.core.frame.DataFrame: data = X.values elif type(X) == np.ndarray: data = X else: raise TypeError('X should be either a numpy array or a pandas DataFrame') if self.init == 'random': centroids = data.copy() r = np.random.RandomState(seed=self.random_state) r.shuffle(centroids) centroids = centroids[:self.k, :] elif self.init == 'kmeans++': candidates = data.copy()[:, :2] # Pick a first random centroid random_candidate_index = np.random.randint(0, candidates.shape[0] - 1) # Add it to the list of centroids centroids = [candidates[random_candidate_index, :]] # Remove the picked centroid from the list of potential centroids candidates = np.delete(candidates, random_candidate_index, axis=0) for k in range(self.k - 1): distances = self.get_distance(candidates, np.array(centroids).mean(axis=0).reshape(1, -1))[0] # Normalize distances to get the probabilities probabilities = distances / distances.sum() # Pick new centroid with a probability proportional to the distance random_candidate_index = np.random.choice(np.arange(0, candidates.shape[0]), p=probabilities) centroids.append(candidates[random_candidate_index, :]) # Remove the picked centroid from the list of potential centroids candidates = np.delete(candidates, random_candidate_index, axis=0) centroids = np.array(centroids) for _l in range(0, self.max_iter): labels_ = self.closest_centroid(data, centroids) # Find index of closest centroid to each point centroids = self.move_centroids(data, labels_, centroids) # Update the values of the new centroid self.labels_ = self.closest_centroid(data, centroids) self.centroids = centroids
[docs] def closest_centroid(self, points, centroids): """ Returns an array containing the index to the nearest centroid for each point :param points: features of each point. :type points: :obj:`numpy.array` :param centroids: list of the centroids coordinates. :type centroids: :obj:`list` """ distances = self.get_distance(points, centroids) return np.argmin(distances, axis=0)
[docs] def get_distance(self, points, centroids): """ Returns the euclidian distance between each point and the centroids. :param points: features of each point. :type points: :obj:`numpy.array` :param centroids: list of the centroids coordinates. :type centroids: :obj:`list` """ return np.sqrt(((points - centroids[:, np.newaxis])**2).sum(axis=2))
[docs] def move_centroids(self, points, closest, centroids): """ Returns the new centroids assigned from the points closest to them. :param points: features of each point. :type points: :obj:`numpy.array` :param closest: array with the index of closest centroid for each point. :type closest: :obj:`numpy.array` :param centroids: list of the centroids coordinates. :type centroids: :obj:`list` """ return np.array([points[closest == k].mean(axis=0) for k in range(centroids.shape[0])])
[docs] def silhouette_score(self, X, labels): """ To be added soon """ pass
[docs]class GaussianMixture: def __init__(self): """ Class for a gaussian mixture model, uses the EM algorithm to fit the model to the data. .. warning:: This function is still under development. This is a beta version, please be aware that some functionalitie might not be available. The full stable version soon will be released. :references: * Murphy, K. P. (2012). Machine learning: a probabilistic perspective. MIT press. :source: * Inspired by: https://github.com/ocontreras309/ML_Notebooks/blob/master/GMM_Implementation.ipynb """ self.clusters = [] self.likelihoods = None self.likelihood = np.Inf self.history = None self.scores = None def _initialize(self, X, k): """ Initialize the clusters using Kmeans :param X: Input data. :type X: :obj:`numpy.array` :param k: Number of (gaussian) clusters. :type k: :obj:`int` """ kmeans = KMeans(k=k) kmeans.fit(X) means = kmeans.centroids for _k in range(k): self.clusters.append({ 'pi_k': 1 / k, # Assign equal probabilities 'mu_k': means[_k], 'cov_k': np.identity(X.shape[1], dtype=np.float64) }) def _expect(self, X): """ Expectation step, computes the responsibilities using the means and covariances computed at the previous maximization step :param X: data :type X: :obj:`numpy.array` """ totals = np.zeros((X.shape[0], 1), dtype=np.float64) for cluster in self.clusters: pi_k = cluster['pi_k'] mu_k = cluster['mu_k'] cov_k = cluster['cov_k'] gamma_nk = (pi_k * gaussian(X, mu_k, cov_k)).astype(np.float64) cluster['gamma_nk'] = gamma_nk totals += gamma_nk for cluster in self.clusters: cluster['totals'] = totals cluster['gamma_nk'] /= totals def _maximize(self, X): """ Maximization step, computes the new values of the means and the covariances :param X: data :type X: :obj:`numpy.array` """ N = X.shape[0] for cluster in self.clusters: gamma_nk = cluster['gamma_nk'] Nk = gamma_nk.sum(axis=0)[0] cluster['pi_k'] = Nk / N mu_k = (1 / Nk) * (gamma_nk * X).sum(axis=0) diff = (X - mu_k).reshape(-1, 1) cov_k = np.zeros((X.shape[1], X.shape[1])) for j in range(X.shape[0]): diff = (X[j] - mu_k).reshape(-1, 1) cov_k += gamma_nk[j] * np.dot(diff, diff.T) cluster['cov_k'] = cov_k / Nk cluster['mu_k'] = mu_k def _get_likelihood(self, X): """ Returns the total likelihoods over the dataset :param X: data :type X: :obj:`numpy.array` """ likelihood = [] sample_likelihoods = np.log(np.array([cluster['totals'] for cluster in self.clusters])) self.likelihood = np.sum(sample_likelihoods) return self.likelihood
[docs] def fit(self, X, k, n_epochs=100, improvement_threshold=0.0005): """ Fitting function initialized by K-means algorithm. :param X: data. :type X: :obj:`numpy.ndarray` :param K: number of clusters (gaussians). :type K: :obj:`int` :param n_epochs: number of epochs, default is 100. :type n_epochs: :obj:`numpy.ndarray` :param improvement_threshold: Threshold from which we consider the likelihood improved, defaults to 0.0005. :type improvement_threshold: :obj:`float`, optional """ self._initialize(X, k) likelihoods = np.zeros(n_epochs) scores = np.zeros((X.shape[0], k)) history = [] diff = 1. i = 0 while (i < n_epochs) and (np.abs(diff) >= improvement_threshold): i += 1 clusters_snapshot = [] for cluster in self.clusters: clusters_snapshot.append({ 'mu_k': cluster['mu_k'].copy(), 'cov_k': cluster['cov_k'].copy() }) history.append(clusters_snapshot) self._expect(X) self._maximize(X) _likelihood_old = self.likelihood self.likelihood = self._get_likelihood(X) diff = _likelihood_old - self.likelihood likelihoods[i] = self.likelihood print(f'Epoch : {i}, Likelihood : {self.likelihood}') for i, cluster in enumerate(self.clusters): scores[:, i] = np.log(cluster['gamma_nk']).reshape(-1) self.likelihoods = likelihoods self.scores = scores self.history = history