This documentation page refers to a previous release of DIALS (2.2).
Click here to go to the corresponding page for the latest version of DIALS

Source code for dials.algorithms.symmetry.cosym.target

"""Target function for cosym analysis."""
from __future__ import absolute_import, division, print_function

import copy
import logging
import math

import cctbx.sgtbx.cosets
from cctbx import miller
from cctbx import sgtbx
from cctbx.array_family import flex
from libtbx import easy_mp
from scipy import sparse

logger = logging.getLogger(__name__)


[docs]class Target(object): """Target function for cosym analysis. Attributes: dim (int): The number of dimensions used in the analysis. """ def __init__( self, intensities, lattice_ids, weights=None, min_pairs=None, lattice_group=None, dimensions=None, nproc=1, ): r""""Intialise a Target object. Args: intensities (cctbx.miller.array): The intensities on which to perform cosym anaylsis. lattice_ids (scitbx.array_family.flex.int): An array of equal size to `intensities` which maps each reflection to a given lattice (dataset). weights (str): Optionally include weights in the target function. Allowed values are `None`, "count" and "standard_error". The default is to use no weights. If "count" is set, then weights are equal to the number of pairs of reflections used in calculating each value of the rij matrix. If "standard_error" is used, then weights are defined as :math:`w_{ij} = 1/s`, where :math:`s = \sqrt{(1-r_{ij}^2)/(n-2)}`. See also http://www.sjsu.edu/faculty/gerstman/StatPrimer/correlation.pdf. min_pairs (int): Only calculate the correlation coefficient between two datasets if they have more than `min_pairs` of common reflections. lattice_group (cctbx.sgtbx.space_group): Optionally set the lattice group to be used in the analysis. dimensions (int): Optionally override the number of dimensions to be used in the analysis. If not set, then the number of dimensions used is equal to the greater of 2 or the number of symmetry operations in the lattice group. nproc (int): number of processors to use for computing the rij matrix. """ if weights is not None: assert weights in ("count", "standard_error") self._weights = weights self._min_pairs = min_pairs self._nproc = nproc data = intensities.customized_copy(anomalous_flag=False) cb_op_to_primitive = data.change_of_basis_op_to_primitive_setting() data = data.change_basis(cb_op_to_primitive).map_to_asu() order = flex.sort_permutation(lattice_ids) sorted_lattice_id = flex.select(lattice_ids, order) sorted_data = data.data().select(order) sorted_indices = data.indices().select(order) self._lattice_ids = sorted_lattice_id self._data = data.customized_copy(indices=sorted_indices, data=sorted_data) assert isinstance(self._data.indices(), type(flex.miller_index())) assert isinstance(self._data.data(), type(flex.double())) # construct a lookup for the separate lattices last_id = -1 self._lattices = flex.int() for n, lid in enumerate(self._lattice_ids): if lid != last_id: last_id = lid self._lattices.append(n) self._sym_ops = {"x,y,z"} self._lattice_group = lattice_group self._sym_ops.update({op.as_xyz() for op in self._generate_twin_operators()}) if dimensions is None: dimensions = max(2, len(self._sym_ops)) self.set_dimensions(dimensions) self._lattice_group = copy.deepcopy(self._data.space_group()) for sym_op in self._sym_ops: self._lattice_group.expand_smx(sym_op) self._patterson_group = self._lattice_group.build_derived_patterson_group() logger.debug( "Lattice group: %s (%i symops)" % (self._lattice_group.info().symbol_and_number(), len(self._lattice_group)) ) logger.debug( "Patterson group: %s" % self._patterson_group.info().symbol_and_number() ) self._compute_rij_wij()
[docs] def set_dimensions(self, dimensions): """Set the number of dimensions for analysis. Args: dimensions (int): The number of dimensions to be used. """ self.dim = dimensions
def _generate_twin_operators(self, lattice_symmetry_max_delta=5.0): # see also mmtbx.scaling.twin_analyses.twin_laws if self._lattice_group is None: cb_op_to_minimum_cell = self._data.change_of_basis_op_to_minimum_cell() minimum_cell_symmetry = self._data.crystal_symmetry().change_basis( cb_op=cb_op_to_minimum_cell ) self._lattice_group = sgtbx.lattice_symmetry.group( reduced_cell=minimum_cell_symmetry.unit_cell(), max_delta=lattice_symmetry_max_delta, ) intensity_symmetry = minimum_cell_symmetry.reflection_intensity_symmetry( anomalous_flag=self._data.anomalous_flag() ) cb_op = cb_op_to_minimum_cell.inverse() else: cb_op = sgtbx.change_of_basis_op() intensity_symmetry = self._data.reflection_intensity_symmetry() operators = [] for partition in cctbx.sgtbx.cosets.left_decomposition( g=self._lattice_group, h=intensity_symmetry.space_group() .build_derived_acentric_group() .make_tidy(), ).partitions[1:]: if partition[0].r().determinant() > 0: operators.append(cb_op.apply(partition[0])) return operators def _lattice_lower_upper_index(self, lattice_id): lower_index = self._lattices[lattice_id] upper_index = None if lattice_id < len(self._lattices) - 1: upper_index = self._lattices[lattice_id + 1] else: assert lattice_id == len(self._lattices) - 1 return lower_index, upper_index def _compute_rij_wij(self, use_cache=True): """Compute the rij_wij matrix.""" n_lattices = self._lattices.size() n_sym_ops = len(self._sym_ops) NN = n_lattices * n_sym_ops self.rij_matrix = flex.double(flex.grid(NN, NN), 0.0) if self._weights is None: self.wij_matrix = None else: self.wij_matrix = flex.double(flex.grid(NN, NN), 0.0) indices = {} space_group_type = self._data.space_group().type() for cb_op in self._sym_ops: cb_op = sgtbx.change_of_basis_op(cb_op) indices_reindexed = cb_op.apply(self._data.indices()) miller.map_to_asu(space_group_type, False, indices_reindexed) indices[cb_op.as_xyz()] = indices_reindexed def _compute_rij_matrix_one_row_block(i): rij_cache = {} n_sym_ops = len(self._sym_ops) NN = n_lattices * n_sym_ops rij_row = [] rij_col = [] rij_data = [] if self._weights is not None: wij_row = [] wij_col = [] wij_data = [] else: wij = None i_lower, i_upper = self._lattice_lower_upper_index(i) intensities_i = self._data.data()[i_lower:i_upper] for j in range(n_lattices): j_lower, j_upper = self._lattice_lower_upper_index(j) intensities_j = self._data.data()[j_lower:j_upper] for k, cb_op_k in enumerate(self._sym_ops): cb_op_k = sgtbx.change_of_basis_op(cb_op_k) indices_i = indices[cb_op_k.as_xyz()][i_lower:i_upper] for kk, cb_op_kk in enumerate(self._sym_ops): if i == j and k == kk: # don't include correlation of dataset with itself continue cb_op_kk = sgtbx.change_of_basis_op(cb_op_kk) ik = i + (n_lattices * k) jk = j + (n_lattices * kk) key = (i, j, str(cb_op_k.inverse() * cb_op_kk)) if use_cache and key in rij_cache: cc, n = rij_cache[key] else: indices_j = indices[cb_op_kk.as_xyz()][j_lower:j_upper] matches = miller.match_indices(indices_i, indices_j) pairs = matches.pairs() isel_i = pairs.column(0) isel_j = pairs.column(1) isel_i = isel_i.select( self._patterson_group.epsilon(indices_i.select(isel_i)) == 1 ) isel_j = isel_j.select( self._patterson_group.epsilon(indices_j.select(isel_j)) == 1 ) corr = flex.linear_correlation( intensities_i.select(isel_i), intensities_j.select(isel_j), ) if corr.is_well_defined(): cc = corr.coefficient() n = corr.n() rij_cache[key] = (cc, n) else: cc = None n = None if ( n is None or cc is None or (self._min_pairs is not None and n < self._min_pairs) ): continue if self._weights == "count": wij_row.extend([ik, jk]) wij_col.extend([jk, ik]) wij_data.extend([n, n]) elif self._weights == "standard_error": assert n > 2 # http://www.sjsu.edu/faculty/gerstman/StatPrimer/correlation.pdf se = math.sqrt((1 - cc ** 2) / (n - 2)) wij = 1 / se wij_row.extend([ik, jk]) wij_col.extend([jk, ik]) wij_data.extend([wij, wij]) rij_row.append(ik) rij_col.append(jk) rij_data.append(cc) rij = sparse.coo_matrix((rij_data, (rij_row, rij_col)), shape=(NN, NN)) if self._weights is not None: wij = sparse.coo_matrix((wij_data, (wij_row, wij_col)), shape=(NN, NN)) return rij, wij args = [(i,) for i in range(n_lattices)] results = easy_mp.parallel_map( _compute_rij_matrix_one_row_block, args, processes=self._nproc, iterable_type=easy_mp.posiargs, method="multiprocessing", ) rij_matrix = None wij_matrix = None for i, (rij, wij) in enumerate(results): if rij_matrix is None: rij_matrix = rij else: rij_matrix += rij if wij is not None: if wij_matrix is None: wij_matrix = wij else: wij_matrix += wij self.rij_matrix = flex.double(rij_matrix.todense()) if wij_matrix is not None: import numpy as np self.wij_matrix = flex.double(wij_matrix.todense().astype(np.float64)) return self.rij_matrix, self.wij_matrix
[docs] def compute_functional(self, x): """Compute the target function at coordinates `x`. Args: x (scitbx.array_family.flex.double): a flattened list of the N-dimensional vectors, i.e. coordinates in the first dimension are stored first, followed by the coordinates in the second dimension, etc. Returns: f (float): The value of the target function at coordinates `x`. """ assert (x.size() // self.dim) == (self._lattices.size() * len(self._sym_ops)) inner = self.rij_matrix.deep_copy() NN = x.size() // self.dim for i in range(self.dim): coord = x[i * NN : (i + 1) * NN] outer_prod = coord.matrix_outer_product(coord) inner -= outer_prod elements = inner * inner if self.wij_matrix is not None: elements = self.wij_matrix * elements f = 0.5 * flex.sum(elements) return f
[docs] def compute_gradients_fd(self, x, eps=1e-6): """Compute the gradients at coordinates `x` using finite differences. Args: x (scitbx.array_family.flex.double): a flattened list of the N-dimensional vectors, i.e. coordinates in the first dimension are stored first, followed by the coordinates in the second dimension, etc. eps (float): The value of epsilon to use in finite difference calculations. Returns: grad (scitbx.array_family.flex.double): The gradients of the target function with respect to the parameters. """ grad = flex.double(x.size(), 0) for i in range(grad.size()): x[i] += eps # x + eps fp = self.compute_functional(x) x[i] -= 2 * eps # x - eps fm = self.compute_functional(x) x[i] += eps # reset to original values grad[i] += (fp - fm) / (2 * eps) return grad
[docs] def compute_functional_and_gradients(self, x): """Compute the target function and gradients at coordinates `x`. Args: x (scitbx.array_family.flex.double): a flattened list of the N-dimensional vectors, i.e. coordinates in the first dimension are stored first, followed by the coordinates in the second dimension, etc. Returns: Tuple[float, scitbx.array_family.flex.double]: f: The value of the target function at coordinates `x`. grad: The gradients of the target function with respect to the parameters. """ f = self.compute_functional(x) grad = flex.double() if self.wij_matrix is not None: wrij_matrix = self.wij_matrix * self.rij_matrix else: wrij_matrix = self.rij_matrix coords = [] NN = x.size() // self.dim for i in range(self.dim): coords.append(x[i * NN : (i + 1) * NN]) # term 1 for i in range(self.dim): grad.extend(wrij_matrix.matrix_multiply(coords[i])) for i in range(self.dim): tmp_array = flex.double() tmp = coords[i].matrix_outer_product(coords[i]) if self.wij_matrix is not None: tmp = self.wij_matrix * tmp for j in range(self.dim): tmp_array.extend(tmp.matrix_multiply(coords[j])) grad -= tmp_array grad *= -2 # grad_fd = self.compute_gradients_fd(x) # assert grad.all_approx_equal_relatively(grad_fd, relative_error=1e-4) return f, grad
[docs] def curvatures(self, x): """Compute the curvature of the target function. Args: x (scitbx.array_family.flex.double): a flattened list of the N-dimensional vectors, i.e. coordinates in the first dimension are stored first, followed by the coordinates in the second dimension, etc. Returns: curvs (scitbx.array_family.flex.double): The curvature of the target function with respect to the parameters. """ coords = [] NN = x.size() // self.dim for i in range(self.dim): coords.append(x[i * NN : (i + 1) * NN]) curvs = flex.double() if self.wij_matrix is not None: wij = self.wij_matrix else: wij = flex.double(self.rij_matrix.accessor(), 1) for i in range(self.dim): curvs.extend(wij.matrix_multiply(coords[i] * coords[i])) curvs *= 2 return curvs
[docs] def curvatures_fd(self, x, eps=1e-6): """Compute the curvatures at coordinates `x` using finite differences. Args: x (scitbx.array_family.flex.double): a flattened list of the N-dimensional vectors, i.e. coordinates in the first dimension are stored first, followed by the coordinates in the second dimension, etc. eps (float): The value of epsilon to use in finite difference calculations. Returns: curvs (scitbx.array_family.flex.double): The curvature of the target function with respect to the parameters. """ f = self.compute_functional(x) curvs = flex.double(x.size(), 0) for i in range(curvs.size()): x[i] += eps # x + eps fp = self.compute_functional(x) x[i] -= 2 * eps # x - eps fm = self.compute_functional(x) x[i] += eps # reset to original values curvs[i] += (fm - 2 * f + fp) / (eps ** 2) return curvs
[docs] def get_sym_ops(self): """Get the list of symmetry operations used in the analysis. Returns: List[cctbx.sgtbx.rt_mx]: The list of symmetry operations. """ return self._sym_ops