"""Target function for cosym analysis."""
from __future__ import annotations
import concurrent.futures
import copy
import itertools
import logging
import numpy as np
import pandas as pd
from orderedset import OrderedSet
from scipy import sparse
import cctbx.sgtbx.cosets
from cctbx import miller, sgtbx
from cctbx.array_family import flex
from dials.algorithms.scaling.scaling_library import ExtendedDatasetStatistics
logger = logging.getLogger(__name__)
def _lattice_lower_upper_index(lattices, lattice_id):
lower_index = int(lattices[lattice_id])
upper_index = None
if lattice_id < len(lattices) - 1:
upper_index = int(lattices[lattice_id + 1])
else:
assert lattice_id == len(lattices) - 1
return lower_index, upper_index
def _compute_rij_matrix_one_row_block(
i,
lattices,
data,
indices,
sym_ops,
patterson_group,
weights=True,
min_pairs=3,
):
cs = data.crystal_symmetry()
n_lattices = len(lattices)
rij_cache = {}
NN = n_lattices * len(sym_ops)
rij_row = []
rij_col = []
rij_data = []
wij = None
if weights:
wij_row = []
wij_col = []
wij_data = []
i_lower, i_upper = _lattice_lower_upper_index(lattices, i)
intensities_i = data.data()[i_lower:i_upper]
sigmas_i = data.sigmas()[i_lower:i_upper]
cb_ops = [sgtbx.change_of_basis_op(cb_op_k) for cb_op_k in sym_ops]
for j in range(n_lattices):
j_lower, j_upper = _lattice_lower_upper_index(lattices, j)
intensities_j = data.data()[j_lower:j_upper]
sigmas_j = data.sigmas()[j_lower:j_upper]
for k, cb_op_k in enumerate(cb_ops):
indices_i = indices[cb_op_k.as_xyz()][i_lower:i_upper]
for kk, cb_op_kk in enumerate(cb_ops):
if i == j and k == kk:
# don't include correlation of dataset with itself
continue
ik = i + (n_lattices * k)
jk = j + (n_lattices * kk)
key = (i, j, str(cb_op_k.inverse() * cb_op_kk))
if key in rij_cache:
cc, n, n_pairs = rij_cache[key]
else:
indices_j = indices[cb_op_kk.as_xyz()][j_lower:j_upper]
matches = miller.match_indices(indices_i, indices_j)
pairs = matches.pairs()
isel_i = pairs.column(0)
isel_j = pairs.column(1)
isel_i = isel_i.select(
patterson_group.epsilon(indices_i.select(isel_i)) == 1
)
isel_j = isel_j.select(
patterson_group.epsilon(indices_j.select(isel_j)) == 1
)
ms = miller.set(
crystal_symmetry=cs, indices=indices_j.select(isel_j)
)
ma_j = miller.array(
miller_set=ms,
data=intensities_j.select(isel_j),
sigmas=sigmas_j.select(isel_j),
)
ms = miller.set(
crystal_symmetry=cs, indices=indices_i.select(isel_i)
)
ma_i = miller.array(
miller_set=ms,
data=intensities_i.select(isel_i),
sigmas=sigmas_i.select(isel_i),
)
n_pairs = ma_i.size()
if ma_i.size() < min_pairs:
n, cc = (None, None)
else:
corr, neff = ExtendedDatasetStatistics.weighted_cchalf(
ma_i, ma_j, assume_index_matching=True
)[0]
if neff:
cc = corr
n = neff
else:
n, cc = (None, None)
rij_cache[key] = (cc, n, n_pairs)
if (
n is None
or cc is None
or (min_pairs is not None and n_pairs < min_pairs)
):
continue
if weights:
wij_row.append(ik)
wij_col.append(jk)
wij_data.append(n)
rij_row.append(ik)
rij_col.append(jk)
rij_data.append(cc)
if weights and not any(wij_data):
raise RuntimeError(
f"Unable to calculate any correlations for dataset index {i} ({len(intensities_i)} reflections)."
+ "\nIncreasing min_reflections may overcome this problem."
)
rij = sparse.coo_matrix((rij_data, (rij_row, rij_col)), shape=(NN, NN))
if weights:
wij = sparse.coo_matrix((wij_data, (wij_row, wij_col)), shape=(NN, NN))
return rij, wij
[docs]
class Target:
"""Target function for cosym analysis.
Attributes:
dim (int): The number of dimensions used in the analysis.
"""
[docs]
def __init__(
self,
intensities,
lattice_ids,
weights=None,
min_pairs=3,
lattice_group=None,
dimensions=None,
nproc=1,
cc_weights=None,
):
r"""Initialise a Target object.
Args:
intensities (cctbx.miller.array): The intensities on which to perform
cosym analysis.
lattice_ids (np.ndarray): An array of equal size to
`intensities` which maps each reflection to a given lattice (dataset).
weights (str): Optionally include weights in the target function.
Allowed values are `None`, "count" and "standard_error". The default
is to use no weights. If "count" is set, then weights are equal to the
number of pairs of reflections used in calculating each value of the
rij matrix. If "standard_error" is used, then weights are defined as
:math:`w_{ij} = 1/s`, where :math:`s = (1-r_{ij}^2)/sqrt(N)`.
Where N=(n-2) or N=(neff-1) depending on the cc_weights option.
See also https://doi.org/10.1525/collabra.87615.
min_pairs (int): Only calculate the correlation coefficient between two
datasets if they have more than `min_pairs` of common reflections.
lattice_group (cctbx.sgtbx.space_group): Optionally set the lattice
group to be used in the analysis.
dimensions (int): Optionally override the number of dimensions to be used
in the analysis. If not set, then the number of dimensions used is
equal to the greater of 2 or the number of symmetry operations in the
lattice group.
"""
if weights is not None:
assert weights in ("count", "standard_error")
self._weights = weights
self._min_pairs = min_pairs
self._nproc = nproc
data = intensities.customized_copy(anomalous_flag=False)
cb_op_to_primitive = data.change_of_basis_op_to_primitive_setting()
data = data.change_basis(cb_op_to_primitive).map_to_asu()
# Convert to uint64 avoids crashes on Windows when later constructing
# flex.size_t (https://github.com/cctbx/cctbx_project/issues/591)
order = lattice_ids.argsort().astype(np.uint64)
sorted_data = data.data().select(flex.size_t(order))
sorted_indices = data.indices().select(flex.size_t(order))
sorted_sigmas = data.sigmas().select(flex.size_t(order))
self._lattice_ids = lattice_ids[order]
self._data = data.customized_copy(
indices=sorted_indices, data=sorted_data, sigmas=sorted_sigmas
)
assert isinstance(self._data.indices(), type(flex.miller_index()))
assert isinstance(self._data.data(), type(flex.double()))
# construct a lookup for the separate lattices
self._lattices = np.array(
[
np.where(self._lattice_ids == i)[0][0]
for i in np.unique(self._lattice_ids)
]
)
self.sym_ops = OrderedSet(["x,y,z"])
self._lattice_group = lattice_group
self.sym_ops.update(op.as_xyz() for op in self._generate_twin_operators())
if dimensions is None:
dimensions = max(2, len(self.sym_ops))
self.set_dimensions(dimensions)
self._lattice_group = copy.deepcopy(self._data.space_group())
for sym_op in self.sym_ops:
self._lattice_group.expand_smx(sym_op)
self._patterson_group = self._lattice_group.build_derived_patterson_group()
logger.debug(
"Lattice group: %s (%i symops)",
self._lattice_group.info().symbol_and_number(),
len(self._lattice_group),
)
logger.debug(
"Patterson group: %s", self._patterson_group.info().symbol_and_number()
)
if cc_weights == "sigma":
self.rij_matrix, self.wij_matrix = self._compute_rij_wij_ccweights()
else:
self.rij_matrix, self.wij_matrix = self._compute_rij_wij()
[docs]
def set_dimensions(self, dimensions):
"""Set the number of dimensions for analysis.
Args:
dimensions (int): The number of dimensions to be used.
"""
self.dim = dimensions
def _generate_twin_operators(self, lattice_symmetry_max_delta=5.0):
# see also mmtbx.scaling.twin_analyses.twin_laws
if self._lattice_group is None:
cb_op_to_minimum_cell = self._data.change_of_basis_op_to_minimum_cell()
minimum_cell_symmetry = self._data.crystal_symmetry().change_basis(
cb_op=cb_op_to_minimum_cell
)
self._lattice_group = sgtbx.lattice_symmetry.group(
reduced_cell=minimum_cell_symmetry.unit_cell(),
max_delta=lattice_symmetry_max_delta,
)
intensity_symmetry = minimum_cell_symmetry.reflection_intensity_symmetry(
anomalous_flag=self._data.anomalous_flag()
)
cb_op = cb_op_to_minimum_cell.inverse()
else:
cb_op = sgtbx.change_of_basis_op()
intensity_symmetry = self._data.reflection_intensity_symmetry()
operators = []
for partition in cctbx.sgtbx.cosets.left_decomposition(
g=self._lattice_group,
h=intensity_symmetry.space_group()
.build_derived_acentric_group()
.make_tidy(),
).partitions[1:]:
if partition[0].r().determinant() > 0:
operators.append(cb_op.apply(partition[0]))
return operators
def _compute_rij_wij_ccweights(self):
# Use flex-based methods for calculating matrices.
# Pre-calculate miller indices after application of each cb_op. Only calculate
# this once per cb_op instead of on-the-fly every time we need it.
indices = {}
epsilons = {}
space_group_type = self._data.space_group().type()
for cb_op in self.sym_ops:
cb_op = sgtbx.change_of_basis_op(cb_op)
indices_reindexed = cb_op.apply(self._data.indices())
miller.map_to_asu(space_group_type, False, indices_reindexed)
cb_op_str = cb_op.as_xyz()
indices[cb_op_str] = indices_reindexed
epsilons[cb_op_str] = self._patterson_group.epsilon(indices_reindexed)
rij_matrix = None
wij_matrix = None
with concurrent.futures.ProcessPoolExecutor(max_workers=self._nproc) as pool:
# note we use weights=True to help us work out where we have calculated rij,
# even if the weights phil option is None
futures = [
pool.submit(
_compute_rij_matrix_one_row_block,
i,
self._lattices,
self._data,
indices,
self.sym_ops,
self._patterson_group,
weights=True,
min_pairs=self._min_pairs,
)
for i, _ in enumerate(self._lattices)
]
for future in concurrent.futures.as_completed(futures):
rij, wij = future.result()
if rij_matrix is None:
rij_matrix = rij
else:
rij_matrix += rij
if wij is not None:
if wij_matrix is None:
wij_matrix = wij
else:
wij_matrix += wij
rij_matrix = rij_matrix.toarray().astype(np.float64)
if self._weights:
## use the counts as weights
wij_matrix = wij_matrix.toarray().astype(np.float64)
if self._weights == "standard_error":
# N.B. using effective n due to sigma weighting, which can be below 2
# but approches 1 in the limit, so rather say efective sample size
# for standard error calc is n-1
sel = np.where(wij_matrix > 1)
se = (1 - np.square(rij_matrix[sel])) / np.sqrt(wij_matrix[sel] - 1)
wij_matrix = np.zeros_like(rij_matrix)
wij_matrix[sel] = 1 / se
# rescale the weights matrix such that the sum of wij_matrix == the number of non-zero entries
scale = np.count_nonzero(wij_matrix) / np.sum(wij_matrix)
wij_matrix *= scale
else:
## No weights - i.e. equal weights in places where we can calculate an rij value,
## but also making sure our diagonal elements are zero as we exclude the
## self-correlation elements from rij and the cosym procedure - we need zero weights
## for uncalculate correlations so they aren't taken into account in the functional
## evaluation.
## at this point, wij matrix contains neff values where it was possible to calculate
## a pairwise correlation.
wij_matrix = wij_matrix.toarray().astype(np.float64)
sel = np.where(wij_matrix > 0)
wij_matrix[sel] = 1
return rij_matrix, wij_matrix
def _compute_rij_wij(self, use_cache=True):
"""Compute the rij_wij matrix.
Rij is a symmetric matrix of size (n x m, n x m), where n is the number of
datasets and m is the number of symmetry operations.
It is composed of (m, m) blocks of size (n, n), where each block contains the
correlation coefficients between cb_op_k applied to datasets 1..N with
cb_op_kk applied to datasets 1.. N.
If `use_cache=True`, then an optimisation is made to reflect the fact some elements
of the matrix are equivalent, i.e.:
CC[(a, cb_op_k), (b, cb_op_kk)] == CC[(a,), (b, cb_op_k.inverse() * cb_op_kk)]
"""
n_lattices = len(self._lattices)
n_sym_ops = len(self.sym_ops)
# Pre-calculate miller indices after application of each cb_op. Only calculate
# this once per cb_op instead of on-the-fly every time we need it.
indices = {}
epsilons = {}
space_group_type = self._data.space_group().type()
for cb_op in self.sym_ops:
cb_op = sgtbx.change_of_basis_op(cb_op)
indices_reindexed = cb_op.apply(self._data.indices())
miller.map_to_asu(space_group_type, False, indices_reindexed)
cb_op_str = cb_op.as_xyz()
indices[cb_op_str] = np.array(
[
h.iround().as_numpy_array()
for h in indices_reindexed.as_vec3_double().parts()
]
).transpose()
epsilons[cb_op_str] = self._patterson_group.epsilon(
indices_reindexed
).as_numpy_array()
intensities = self._data.data().as_numpy_array()
# Map indices to an array of flat 1d indices which can later be used for
# matching pairs of indices
offset = -np.min(np.concatenate(list(indices.values())), axis=0)
dims = np.max(np.concatenate(list(indices.values())), axis=0) + offset + 1
for cb_op, hkl in indices.items():
indices[cb_op] = np.ravel_multi_index((hkl + offset).T, dims)
# Create an empty 2D array of shape (m * n, L), where m is the number of sym
# ops, n is the number of lattices, and L is the number of unique miller indices
all_intensities = np.empty((n_sym_ops * n_lattices, np.prod(dims)))
# Populate all_intensities with intensity values, filling absent intensities
# with np.nan
all_intensities.fill(np.nan)
slices = np.append(self._lattices, intensities.size)
slices = list(map(slice, slices[:-1], slices[1:]))
for i, (mil_ind, eps) in enumerate(zip(indices.values(), epsilons.values())):
for j, selection in enumerate(slices):
# map (i, j) to a column in all_intensities
column = np.ravel_multi_index((i, j), (n_sym_ops, n_lattices))
epsilon_equals_one = eps[selection] == 1
valid_mil_ind = mil_ind[selection][epsilon_equals_one]
valid_intensities = intensities[selection][epsilon_equals_one]
all_intensities[column, valid_mil_ind] = valid_intensities
# Ideally we would use `np.ma.corrcoef` here, but it is broken, so use
# pd.DataFrame.corr() instead (see numpy/numpy#15601)
rij = (
pd.DataFrame(all_intensities)
.T.dropna(how="all")
.corr(min_periods=self._min_pairs)
.values
)
# Set any NaN correlation coefficients to zero
np.nan_to_num(rij, copy=False)
# Cosym does not make use of the on-diagonal correlation coefficients
np.fill_diagonal(rij, 0)
## First, populate a weights matrix of the number of pairs i.e. counts
## if we are not going to use weights, this helps us select where we
## calculated values, so that we can set them to constant weights
wij = np.zeros_like(rij)
right_up = np.triu_indices_from(wij, k=1)
# For each correlation coefficient, set the weight equal to the size of
# the sample used to calculate that coefficient
pairwise_combos = itertools.combinations(np.isfinite(all_intensities), 2)
def sample_size(x, y):
pairs = np.count_nonzero(x & y)
if pairs < self._min_pairs:
return 0
else:
return pairs
wij[right_up] = list(itertools.starmap(sample_size, pairwise_combos))
if self._weights:
## the weights are currently the pairwise sample sizes
if self._weights == "standard_error":
# Set each weights as the reciprocal of the standard error on the
# corresponding correlation coefficient
# http://www.sjsu.edu/faculty/gerstman/StatPrimer/correlation.pdf
with np.errstate(divide="ignore", invalid="ignore"):
reciprocal_se = np.sqrt((wij[right_up] - 2)) / (
1 - np.square(rij[right_up])
)
wij[right_up] = np.where(wij[right_up] > 2, reciprocal_se, 0)
# Symmetrise the wij matrix
wij += wij.T
for i in range(wij.shape[0]):
if not any(wij[i, :]):
while i > n_lattices:
i -= n_lattices
n_refl = sum(self._lattice_ids == i)
raise RuntimeError(
f"Unable to calculate any correlations for dataset index {i} ({n_refl} reflections)."
+ "\nIncreasing min_reflections may overcome this problem."
)
# rescale the weights matrix such that the sum of wij_matrix == the number of non-zero entries
scale = np.count_nonzero(wij) / np.sum(wij)
wij *= scale
else:
## we are not going to use weights, so set them to constant weights
## as we still needs zeros to avoid inclusion of uncalculate values in
## the functional evaluation.
sel = np.where(wij > 0)
wij[sel] = 1
# Symmetrise the wij matrix
wij += wij.T
return rij, wij
[docs]
def compute_functional(self, x: np.ndarray) -> float:
"""Compute the target function at coordinates `x`.
Args:
x (np.ndarray):
a flattened list of the N-dimensional vectors, i.e. coordinates in
the first dimension are stored first, followed by the coordinates in
the second dimension, etc.
Returns:
f (float): The value of the target function at coordinates `x`.
"""
assert (x.size // self.dim) == (len(self._lattices) * len(self.sym_ops))
x = x.reshape((self.dim, x.size // self.dim))
elements = np.square(self.rij_matrix - x.T @ x)
if self.wij_matrix is not None:
np.multiply(self.wij_matrix, elements, out=elements)
f = 0.5 * elements.sum()
return f
[docs]
def compute_functional_score_for_dimension_assessment(
self, x: np.ndarray, outlier_rejection: bool = True
) -> float:
if not outlier_rejection:
return self.compute_functional(x)
x = x.reshape((self.dim, x.size // self.dim))
elements = np.square(self.rij_matrix - x.T @ x)
if self.wij_matrix is not None:
np.multiply(self.wij_matrix, elements, out=elements)
q1, q2, q3 = np.quantile(elements, (0.25, 0.5, 0.75))
inliers = elements[elements < q2 + (q3 - q1)]
return 0.5 * inliers.sum()
[docs]
def compute_gradients_fd(self, x: np.ndarray, eps=1e-6) -> np.ndarray:
"""Compute the gradients at coordinates `x` using finite differences.
Args:
x (np.ndarray):
a flattened list of the N-dimensional vectors, i.e. coordinates in
the first dimension are stored first, followed by the coordinates in
the second dimension, etc.
eps (float):
The value of epsilon to use in finite difference calculations.
Returns:
grad (np.ndarray):
The gradients of the target function with respect to the parameters.
"""
x = copy.deepcopy(x)
grad = np.zeros(x.shape)
for i in range(x.size):
x[i] += eps # x + eps
fp = self.compute_functional(x)
x[i] -= 2 * eps # x - eps
fm = self.compute_functional(x)
x[i] += eps # reset to original values
grad[i] += (fp - fm) / (2 * eps)
return grad
[docs]
def compute_gradients(self, x: np.ndarray) -> np.ndarray:
"""Compute the gradients of the target function at coordinates `x`.
Args:
x (np.ndarray):
a flattened list of the N-dimensional vectors, i.e. coordinates in
the first dimension are stored first, followed by the coordinates in
the second dimension, etc.
Returns:
Tuple[float, np.ndarray]:
f: The value of the target function at coordinates `x`.
grad: The gradients of the target function with respect to the parameters.
"""
x = x.reshape((self.dim, x.size // self.dim))
if self.wij_matrix is not None:
wrij_matrix = np.multiply(self.wij_matrix, self.rij_matrix)
grad = -2 * x @ (wrij_matrix - np.multiply(self.wij_matrix, x.T @ x))
else:
grad = -2 * x @ (self.rij_matrix - x.T @ x)
return grad.flatten()
[docs]
def curvatures(self, x: np.ndarray) -> np.ndarray:
"""Compute the curvature of the target function at coordinates `x`.
Args:
x (np.ndarray):
a flattened list of the N-dimensional vectors, i.e. coordinates in
the first dimension are stored first, followed by the coordinates in
the second dimension, etc.
Returns:
curvs (np.ndarray):
The curvature of the target function with respect to the parameters.
"""
if self.wij_matrix is not None:
wij = self.wij_matrix
else:
wij = np.ones(self.rij_matrix.shape)
x = x.reshape((self.dim, x.size // self.dim))
curvs = 2 * np.square(x) @ wij
return curvs.flatten()
[docs]
def curvatures_fd(self, x: np.ndarray, eps=1e-6) -> np.ndarray:
"""Compute the curvatures at coordinates `x` using finite differences.
Args:
x (np.ndarray):
a flattened list of the N-dimensional vectors, i.e. coordinates in
the first dimension are stored first, followed by the coordinates in
the second dimension, etc.
eps (float):
The value of epsilon to use in finite difference calculations.
Returns:
curvs (np.ndarray):
The curvature of the target function with respect to the parameters.
"""
x = copy.deepcopy(x)
f = self.compute_functional(x)
curvs = np.zeros(x.shape)
for i in range(x.size):
x[i] += eps # x + eps
fp = self.compute_functional(x)
x[i] -= 2 * eps # x - eps
fm = self.compute_functional(x)
x[i] += eps # reset to original values
curvs[i] += (fm - 2 * f + fp) / (eps**2)
return curvs