This documentation page refers to a previous release of DIALS (1.14).
Click here to go to the corresponding page for the latest version of DIALS

Source code for dials.algorithms.symmetry.cosym.seed_clustering

"""Seed clustering method for cosym analysis."""
from __future__ import absolute_import, division, print_function

import logging

logger = logging.getLogger(__name__)

import math

import numpy as np
from scipy.cluster import hierarchy
import scipy.spatial.distance as ssd
from sklearn.neighbors import NearestNeighbors
from sklearn import metrics

from libtbx import Auto
from scitbx.array_family import flex
from dials.algorithms.symmetry.cosym import plot_matrix, plot_dendrogram


[docs]class seed_clustering(object): """Perform seed clustering of coordinates. Labels points into clusters such that cluster contains exactly one copy of each dataset, then performs silhouettete analysis on the resulting clusters to determine the true number of clusters present, under the constraint that only equal-sized clusterings are valid, i.e. each dataset should appear an equal number of times in each cluster. See also: https://en.wikipedia.org/wiki/Silhouette_(clustering) http://scikit-learn.org/stable/auto_examples/cluster/plot_kmeans_silhouette_analysis.html Attributes: cluster_labels (scitbx.array_family.flex.int): A label for each coordinate. """ def __init__( self, coordinates, n_datasets, n_sym_ops, min_silhouette_score, n_clusters=Auto, plot_prefix=None, ): """Initialise a seed_clustering object. Args: coordinates (scitbx.array_family.flex.double): The input array of coordinates on which to perform the analysis. The dimensions of the array should be (dim, `n_datasets` * `n_sym_ops`), where dim is the number of dimensions being used for the analysis. n_datasets (int): The number of datasets. n_sym_ops (int): The number of symmetry operations. min_silhouette_score (float): The minimum silhouette score to be used in automatic determination of the number of clusters. n_clusters (int): Optionally override the automatic determination of the number of clusters. """ self.coords = coordinates self.cluster_labels = self._label_clusters_first_pass(n_datasets, n_sym_ops) if flex.max(self.cluster_labels) == 0: # assume single cluster return dist_mat, linkage_matrix = self._hierarchical_clustering() self.cluster_labels, threshold = self._silhouette_analysis( self.cluster_labels, linkage_matrix, n_clusters=n_clusters, min_silhouette_score=min_silhouette_score, plot_prefix=plot_prefix, ) if plot_prefix is not None: plot_matrix( 1 - ssd.squareform(dist_mat), linkage_matrix, "%sseed_clustering_cos_angle_matrix.png" % plot_prefix, color_threshold=threshold, ) plot_dendrogram( linkage_matrix, "%sseed_clustering_cos_angle_dendrogram.png" % plot_prefix, color_threshold=threshold, ) def _label_clusters_first_pass(self, n_datasets, n_sym_ops): """First pass labelling of clusters. Labels points into clusters such that cluster contains exactly one copy of each dataset. Args: n_datasets (int): The number of datasets. n_sym_ops (int): The number of symmetry operations. Returns: cluster_labels (scitbx.array_family.flex.int): A label for each coordinate, labelled from 0 .. n_sym_ops. """ # initialise cluster labels: -1 signifies doesn't belong to a cluster cluster_labels = flex.int(self.coords.all()[0], -1) X_orig = self.coords.as_numpy_array() cluster_id = 0 while cluster_labels.count(-1) > 0: dataset_ids = ( flex.int_range(n_datasets * n_sym_ops) % n_datasets ).as_numpy_array() coord_ids = flex.int_range(dataset_ids.size).as_numpy_array() # select only those points that don't already belong to a cluster sel = np.where(cluster_labels == -1) X = X_orig[sel] dataset_ids = dataset_ids[sel] coord_ids = coord_ids[sel] # choose a high density point as seed for cluster nbrs = NearestNeighbors( n_neighbors=min(11, len(X)), algorithm="brute", metric="cosine" ).fit(X) distances, indices = nbrs.kneighbors(X) average_distance = flex.double([dist[1:].mean() for dist in distances]) i = flex.min_index(average_distance) d_id = dataset_ids[i] cluster = np.array([coord_ids[i]]) cluster_dataset_ids = np.array([d_id]) xis = np.array([X[i]]) for j in range(n_datasets - 1): # select only those rows that don't correspond to a dataset already # present in current cluster sel = np.where(dataset_ids != d_id) X = X[sel] dataset_ids = dataset_ids[sel] coord_ids = coord_ids[sel] assert len(X) > 0 # Find nearest neighbour in cosine-space to the current cluster centroid nbrs = NearestNeighbors( n_neighbors=min(1, len(X)), algorithm="brute", metric="cosine" ).fit(X) distances, indices = nbrs.kneighbors([xis.mean(axis=0)]) k = indices[0][0] d_id = dataset_ids[k] cluster = np.append(cluster, coord_ids[k]) cluster_dataset_ids = np.append(cluster_dataset_ids, d_id) xis = np.append(xis, [X[k]], axis=0) # label this cluster cluster_labels.set_selected(flex.size_t(cluster.tolist()), cluster_id) cluster_id += 1 return cluster_labels def _hierarchical_clustering(self): """Perform hierarchical clustering on cluster centroids. Returns: Tuple[numpy.ndarray, numpy.ndarray]: A tuple containing the distance matrix as output by :func:`scipy.spatial.distance.pdist` and the linkage matrix as output by :func:`scipy.cluster.hierarchy.linkage`. """ cluster_centroids = [] X = self.coords.as_numpy_array() for i in set(self.cluster_labels): sel = self.cluster_labels == i cluster_centroids.append( X[(self.cluster_labels == i).iselection().as_numpy_array()].mean(axis=0) ) # hierarchical clustering of cluster centroids, using cosine metric dist_mat = ssd.pdist(cluster_centroids, metric="cosine") return dist_mat, hierarchy.linkage(dist_mat, method="average") def _silhouette_analysis( self, cluster_labels, linkage_matrix, n_clusters, min_silhouette_score, plot_prefix=None, ): """Compare valid equal-sized clustering using silhouette scores. Args: cluster_labels (scitbx.array_family.flex.int): linkage_matrix (numpy.ndarray): The hierarchical clustering of centroids of the initial clustering as produced by :func:`scipy.cluster.hierarchy.linkage`. n_clusters (int): Optionally override the automatic determination of the number of clusters. min_silhouette_score (float): The minimum silhouette score to be used in automatic determination of the number of clusters. plot_prefix (bool): Optional plot_prefix to plot the silhouette analysis for each possible clustering. Returns: cluster_labels (scitbx.array_family.flex.int): A label for each coordinate. """ eps = 1e-6 X = self.coords.as_numpy_array() cluster_labels_input = cluster_labels distances = linkage_matrix[::, 2] distances = np.insert(distances, 0, 0) silhouette_scores = flex.double() thresholds = flex.double() threshold_n_clusters = flex.size_t() for threshold in distances[1:]: cluster_labels = cluster_labels_input.deep_copy() labels = hierarchy.fcluster( linkage_matrix, threshold - eps, criterion="distance" ).tolist() counts = [labels.count(l) for l in set(labels)] if len(set(counts)) > 1: # only equal-sized clusters are valid continue n = len(set(labels)) if n == 1: continue for i in range(len(labels)): cluster_labels.set_selected( cluster_labels_input == i, int(labels[i] - 1) ) silhouette_avg = metrics.silhouette_score( X, cluster_labels.as_numpy_array(), metric="cosine" ) # Compute the silhouette scores for each sample sample_silhouette_values = metrics.silhouette_samples( X, cluster_labels.as_numpy_array(), metric="cosine" ) silhouette_avg = sample_silhouette_values.mean() silhouette_scores.append(silhouette_avg) thresholds.append(threshold) threshold_n_clusters.append(n) count_negative = (sample_silhouette_values < 0).sum() logger.info("Clustering:") logger.info(" Number of clusters: %i" % n) logger.info( " Threshold score: %.3f (%.1f deg)" % (threshold, math.degrees(math.acos(1 - threshold))) ) logger.info(" Silhouette score: %.3f" % silhouette_avg) logger.info( " -ve silhouette scores: %.1f%%" % (100 * count_negative / sample_silhouette_values.size) ) if plot_prefix is not None: _plot_silhouette( sample_silhouette_values, cluster_labels.as_numpy_array(), file_name="%ssilhouette_%i.png" % (plot_prefix, n), ) if n_clusters is Auto: idx = flex.max_index(silhouette_scores) else: idx = flex.first_index(threshold_n_clusters, n_clusters) if idx is None: raise Sorry("No valid clustering with %i clusters" % n_clusters) if n_clusters is Auto and silhouette_scores[idx] < min_silhouette_score: # assume single cluster cluster_labels = flex.int(cluster_labels.size(), 0) else: threshold = thresholds[idx] - eps labels = hierarchy.fcluster(linkage_matrix, threshold, criterion="distance") cluster_labels = flex.double(cluster_labels.size(), -1) for i in range(len(labels)): cluster_labels.set_selected(cluster_labels_input == i, labels[i] - 1) return cluster_labels, threshold
def _plot_silhouette(sample_silhouette_values, cluster_labels, file_name): from matplotlib import pyplot as plt fig = plt.figure() ax1 = fig.gca() y_lower = 10 n_clusters = len(set(cluster_labels)) silhouette_avg = sample_silhouette_values.mean() for i in range(n_clusters): # Aggregate the silhouette scores for samples belonging to # cluster i, and sort them ith_cluster_silhouette_values = sample_silhouette_values[cluster_labels == i] ith_cluster_silhouette_values.sort() size_cluster_i = ith_cluster_silhouette_values.shape[0] y_upper = y_lower + size_cluster_i color = plt.cm.Spectral(float(i) / n_clusters) ax1.fill_betweenx( np.arange(y_lower, y_upper), 0, ith_cluster_silhouette_values, facecolor=color, edgecolor=color, alpha=0.7, ) # Label the silhouette plots with their cluster numbers at the middle ax1.text(-0.05, y_lower + 0.5 * size_cluster_i, str(i)) # Compute the new y_lower for next plot y_lower = y_upper + 10 # 10 for the 0 samples ax1.set_title("The silhouette plot for the various clusters.") ax1.set_xlabel("The silhouette coefficient values") ax1.set_ylabel("Cluster label") # The vertical line for average silhouette score of all the values ax1.axvline(x=silhouette_avg, color="red", linestyle="--") ax1.set_yticks([]) # Clear the yaxis labels / ticks ax1.set_xticks([-0.1, 0, 0.2, 0.4, 0.6, 0.8, 1]) fig.savefig(file_name) plt.close(fig)