This documentation page refers to a previous release of DIALS (1.14).
Click here to go to the corresponding page for the latest version of DIALS

Source code for dials.algorithms.indexing.indexer

#!/usr/bin/env python
# -*- mode: python; coding: utf-8; indent-tabs-mode: nil; python-indent: 2 -*-
#
# dials.algorithms.indexing.indexer.py
#
#  Copyright (C) 2014 Diamond Light Source
#
#  Author: Richard Gildea
#
#  This code is distributed under the BSD license, a copy of which is
#  included in the root directory of this package.

from __future__ import absolute_import, division
from __future__ import print_function
import math
import logging

logger = logging.getLogger(__name__)

from dials.util import log

debug_handle = log.debug_handle(logger)

import libtbx
from dials.util import Sorry
import iotbx.phil
from scitbx import matrix

from dials.array_family import flex
from cctbx import crystal, sgtbx, xray

from dxtbx.model import Crystal
from dxtbx.model.experiment_list import Experiment, ExperimentList

from dials.algorithms.indexing.max_cell import find_max_cell

max_cell_phil_str = """\
max_cell_estimation
  .expert_level = 1
{
  filter_ice = True
    .type = bool
    .help = "Filter out reflections at typical ice ring resolutions"
            "before max_cell estimation."
  filter_overlaps = True
    .type = bool
    .help = "Filter out reflections with overlapping bounding boxes before"
            "max_cell estimation."
  overlaps_border = 0
    .type = int(value_min=0)
    .help = "Optionally add a border around the bounding boxes before finding"
            "overlaps."
  multiplier = 1.3
    .type = float(value_min=0)
    .help = "Multiply the estimated maximum basis vector length by this value."
    .expert_level = 2
  step_size = 45
    .type = float(value_min=0)
    .help = "Step size, in degrees, of the blocks used to peform the max_cell "
            "estimation."
    .expert_level = 2
  nearest_neighbor_percentile = None
    .type = float(value_min=0, value_max=1)
    .help = "Percentile of NN histogram to use for max cell determination."
    .expert_level = 2
  histogram_binning = linear *log
    .type = choice
    .help = "Choose between linear or logarithmic bins for nearest neighbour"
            "histogram analysis."
  nn_per_bin = 5
    .type = int(value_min=1)
    .help = "Target number of nearest neighbours per histogram bin."
  max_height_fraction = 0.25
    .type = float(value_min=0, value_max=1)
    .expert_level=2
}
"""

index_only_phil_str = (
    """\
indexing {
  nproc = 1
    .type = int(value_min=1)
    .help = "The number of processes to use."
  mm_search_scope = 4.0
    .help = "Global radius of origin offset search."
    .type = float(value_min=0)
    .expert_level = 1
  wide_search_binning = 2
    .help = "Modify the coarseness of the wide grid search for the beam centre."
    .type = float(value_min=0)
    .expert_level = 1
  min_cell_volume = 25
    .type = float(value_min=0)
    .help = "Minimum unit cell volume (in Angstrom^3)."
    .expert_level = 1
  min_cell = 3
    .type = float(value_min=0)
    .help = "Minimum length of candidate unit cell basis vectors (in Angstrom)."
    .expert_level = 1
  max_cell = Auto
    .type = float(value_min=0)
    .help = "Maximum length of candidate unit cell basis vectors (in Angstrom)."
    .expert_level = 1
  %s
  fft3d {
    peak_search = *flood_fill clean
      .type = choice
      .expert_level = 2
    peak_volume_cutoff = 0.15
      .type = float
      .expert_level = 2
    reciprocal_space_grid {
      n_points = 256
        .type = int(value_min=0)
        .expert_level = 1
      d_min = Auto
        .type = float(value_min=0)
        .help = "The high resolution limit in Angstrom for spots to include in "
                "the initial indexing."
    }
  }
  sigma_phi_deg = None
    .type = float(value_min=0)
    .help = "Override the phi sigmas for refinement. Mainly intended for single-shot"
            "rotation images where the phi sigma is almost certainly incorrect."
    .expert_level = 2
  b_iso = Auto
    .type = float(value_min=0)
    .expert_level = 2
  rmsd_cutoff = 15
    .type = float(value_min=0)
    .expert_level = 1
  scan_range = None
    .help = "The range of images to use in indexing. Number of arguments"
            "must be a factor of two. Specifying \"0 0\" will use all images"
            "by default. The given range follows C conventions"
            "(e.g. j0 <= j < j1)."
    .type = ints(size=2)
    .multiple = True
  known_symmetry {
    space_group = None
      .type = space_group
      .help = "Target space group for indexing."
    unit_cell = None
      .type = unit_cell
      .help = "Target unit cell for indexing."
    relative_length_tolerance = 0.1
      .type = float
      .help = "Relative tolerance for unit cell lengths in unit cell comparision."
      .expert_level = 1
    absolute_angle_tolerance = 5
      .type = float
      .help = "Angular tolerance (in degrees) in unit cell comparison."
      .expert_level = 1
    max_delta = 5
      .type = float(value_min=0)
      .help = "Maximum allowed Le Page delta used in searching for basis vector"
              "combinations that are consistent with the given symmetry."
      .expert_level = 1
  }
  basis_vector_combinations
    .expert_level = 1
  {
    max_combinations = None
      .type = int(value_min=1)
      .help = "Maximum number of basis vector combinations to test for agreement"
              "with input symmetry."
    max_refine = Auto
      .type = int(value_min=1)
      .help = "Maximum number of putative crystal models to test. Default"
              "for rotation sweeps: 50, for still images: 5"
      .expert_level = 1
    sys_absent_threshold = 0.9
      .type = float(value_min=0.0, value_max=1.0)
    solution_scorer = filter *weighted
      .type = choice
      .expert_level = 1
    filter
      .expert_level = 1
    {
      check_doubled_cell = True
        .type = bool
      likelihood_cutoff = 0.8
        .type = float(value_min=0, value_max=1)
      volume_cutoff = 1.25
        .type = float(value_min=1)
      n_indexed_cutoff = 0.9
        .type = float(value_min=0, value_max=1)
    }
    weighted
      .expert_level = 1
    {
      power = 1
        .type = int(value_min=1)
      volume_weight = 1
        .type = float(value_min=0)
      n_indexed_weight = 1
        .type = float(value_min=0)
      rmsd_weight = 1
        .type = float(value_min=0)
    }
  }
  index_assignment {
    method = *simple local
      .type = choice
      .help = "Choose between simple 'global' index assignment and xds-style "
              "'local' index assignment."
      .expert_level = 1
    simple {
      hkl_tolerance = 0.3
        .type = float(value_min=0, value_max=0.5)
        .help = "Maximum allowable deviation from integer-ness for assigning "
                "a miller index to a reciprocal lattice vector."
    }
    local
      .expert_level = 1
    {
      epsilon = 0.05
        .type = float
        .help = "This corresponds to the xds parameter INDEX_ERROR="
      delta = 8
        .type = int
        .help = "This corresponds to the xds parameter INDEX_MAGNITUDE="
      l_min = 0.8
        .type = float
        .help = "This corresponds to the xds parameter INDEX_QUALITY="
      nearest_neighbours = 20
        .type = int(value_min=1)
    }
  }
  check_misindexing {
    grid_search_scope = 0
      .type = int
      .help = "Search scope for testing misindexing on h, k, l."
  }
  optimise_initial_basis_vectors = False
    .type = bool
    .expert_level = 2
  debug = False
    .type = bool
    .expert_level = 1
  debug_plots = False
    .type = bool
    .help = "Requires matplotlib"
    .expert_level = 1
  combine_scans = False
    .type = bool
    .expert_level = 1
  refinement_protocol {
    mode = *refine_shells repredict_only
      .type = choice
      .expert_level = 1
      .help = "refine_shells: refine in increasing resolution cutoffs after indexing."
              "repredict_only: do not refine after indexing, just update spot"
              "predictions."
    n_macro_cycles = 5
      .type = int(value_min=1)
      .help = "Maximum number of macro cycles of refinement, reindexing all"
              "reflections using updated geometry at the beginning of each"
              "cycle. Does not apply to stills.indexer=stills."
    d_min_step = Auto
      .type = float(value_min=0.0)
      .help = "Reduction per step in d_min for reflections to include in refinement."
    d_min_start = None
      .type = float(value_min=0.0)
    d_min_final = None
      .type = float(value_min=0.0)
      .help = "Do not ever include reflections below this value in refinement."
    verbosity = 1
      .type = int(value_min=0)
    disable_unit_cell_volume_sanity_check = False
      .type = bool
      .help = "Disable sanity check on unrealistic increases in unit cell volume"
              "during refinement."
      .expert_level = 1
  }
  method = *fft3d fft1d real_space_grid_search
    .type = choice
  multiple_lattice_search
    .expert_level = 1
  {
    cluster_analysis_search = False
      .type = bool
      .help = "Perform cluster analysis search for multiple lattices."
    recycle_unindexed_reflections_cutoff = 0.1
      .type = float(value_min=0, value_max=1)
      .help = "Attempt another cycle of indexing on the unindexed reflections "
              "if more than the fraction of input reflections are unindexed."
    minimum_angular_separation = 5
      .type = float(value_min=0)
      .help = "The minimum angular separation (in degrees) between two lattices."
    max_lattices = 1
      .type = int
    cluster_analysis {
      method = *dbscan hcluster
        .type = choice
      hcluster {
        linkage {
          method = *ward
            .type = choice
          metric = *euclidean
            .type = choice
        }
        cutoff = 15
          .type = float(value_min=0)
        cutoff_criterion = *distance inconsistent
          .type = choice
      }
      dbscan {
        eps = 0.05
          .type = float(value_min=0.0)
        min_samples = 30
          .type = int(value_min=1)
      }
      min_cluster_size = 20
        .type = int(value_min=0)
      intersection_union_ratio_cutoff = 0.4
        .type = float(value_min=0.0, value_max=1.0)
    }
  }
  fft1d
    .expert_level = 1
  {
    characteristic_grid = None
      .help = Sampling frequency in radians. See Steller 1997. If None, \
              determine a grid sampling automatically using the input \
              reflections, using at most 0.029 radians.
      .type = float(value_min=0)
  }
  real_space_grid_search
    .expert_level = 1
  {
    characteristic_grid = 0.02
      .type = float(value_min=0)
  }
  stills {
    indexer = *Auto stills sweeps
      .type = choice
      .help = Use the stills or sweeps indexer.  Auto: choose based on the input \
              imagesets (stills or sweeps).
      .expert_level = 1
    ewald_proximity_resolution_cutoff = 2.0
      .type = float
      .help = For still images, this high-resolution cutoff is used to calculate
      .help = the acceptable volume of reciprocal space for spot prediction
    refine_all_candidates = True
      .type = bool
      .help = If False, no attempt is made to refine the model from initial basis \
              vector selection. The indexing solution with the best RMSD is chosen.
    candidate_outlier_rejection = True
      .type = bool
      .expert_level = 1
      .help = If True, while refining candiate basis solutions, also apply Sauter/ \
              Poon (2010) outlier rejection
    refine_candidates_with_known_symmetry = False
      .type = bool
      .expert_level = 2
      .help = If False, when choosing the best set of candidate basis solutions, \
              refine the candidates in the P1 setting. If True, after indexing \
              in P1, convert the candidates to the known symmetry and apply the \
              corresponding change of basis to the indexed reflections.
    rmsd_min_px = 2
      .type = float
      .help = Minimum acceptable RMSD for choosing candidate basis solutions \
              (in pixels)
    ewald_proximal_volume_max = 0.0025
      .type = float
      .help = Maximum acceptable ewald proximal volume when choosing candidate \
              basis solutions
    isoforms
      .help = Constrain the unit cell to specific values during refinement after initial indexing.
      .multiple=True
    {
      name=None
        .type=str
      cell=None
        .type=unit_cell
      lookup_symbol=None
        .type=str
        .help=The sgtbx lookup symbol of the reflections pointgroup
      rmsd_target_mm=None
        .type=float
        .help=Maximum acceptable DIALS positional rmsd, in mm
      beam_restraint=None
        .type=floats(size=2)
        .help=Known beam position in mm X,Y, rmsd_target_mm is reused here as a circle of confusion
        .help=to assure that no images are accepted where the lattice is misindexed by a unit shift.
    }
  }
}
"""
    % max_cell_phil_str
)

index_only_phil_scope = iotbx.phil.parse(index_only_phil_str, process_includes=True)

master_phil_scope = iotbx.phil.parse(
    """
%s
include scope dials.algorithms.refinement.refiner.phil_scope
"""
    % index_only_phil_str,
    process_includes=True,
)

# override default refinement parameters
master_phil_scope = master_phil_scope.fetch(
    source=iotbx.phil.parse(
        """\
refinement {
  reflections {
    reflections_per_degree=100
  }
}
"""
    )
)

master_params = master_phil_scope.fetch().extract()


[docs]def filter_reflections_by_scan_range(reflections, scan_range): reflections_in_scan_range = flex.bool(len(reflections), False) frame_number = reflections["xyzobs.px.value"].parts()[2] for scan_range in scan_range: if scan_range is None: continue range_start, range_end = scan_range reflections_in_scan_range.set_selected( (frame_number >= range_start) & (frame_number < range_end), True ) return reflections.select(reflections_in_scan_range)
[docs]class vector_group(object): def __init__(self): self.vectors = [] self.lengths = [] self.volumes = [] self._mean = None
[docs] def append(self, vector, length, volume): self.vectors.append(vector) self.lengths.append(length) self.volumes.append(volume) self._mean = self.compute_mean()
[docs] def mean(self): if self._mean is None: self._mean = self.compute_mean() return self._mean
[docs] def compute_mean(self): sum_x = 0 sum_y = 0 sum_z = 0 for v in self.vectors: sum_x += v.elems[0] sum_y += v.elems[1] sum_z += v.elems[2] return matrix.col((sum_x, sum_y, sum_z)) / len(self.vectors)
[docs]def is_approximate_integer_multiple( vec_a, vec_b, relative_tolerance=0.2, angular_tolerance=5.0 ): length_a = vec_a.length() length_b = vec_b.length() # assert length_b >= length_a if length_a > length_b: vec_a, vec_b = vec_b, vec_a length_a, length_b = length_b, length_a angle = vec_a.angle(vec_b, deg=True) if angle < angular_tolerance or abs(180 - angle) < angular_tolerance: n = length_b / length_a if abs(round(n) - n) < relative_tolerance: return True return False
deg_to_radians = math.pi / 180
[docs]class indexer_base(object): def __init__(self, reflections, imagesets, params=None): self.reflections = reflections self.imagesets = imagesets if params is None: params = master_params self.params = params.indexing self.all_params = params self.refined_experiments = None self.hkl_offset = None if self.all_params.refinement.reflections.outlier.algorithm in ( "auto", libtbx.Auto, ): if self.imagesets[0].get_goniometer() is None: self.all_params.refinement.reflections.outlier.algorithm = "sauter_poon" else: # different default to dials.refine # tukey is faster and more appropriate at the indexing step self.all_params.refinement.reflections.outlier.algorithm = "tukey" for imageset in imagesets[1:]: if imageset.get_detector().is_similar_to(self.imagesets[0].get_detector()): imageset.set_detector(self.imagesets[0].get_detector()) if imageset.get_goniometer().is_similar_to( self.imagesets[0].get_goniometer() ): imageset.set_goniometer(self.imagesets[0].get_goniometer()) # can only share a beam if we share a goniometer? if imageset.get_beam().is_similar_to(self.imagesets[0].get_beam()): imageset.set_beam(self.imagesets[0].get_beam()) if ( self.params.combine_scans and imageset.get_scan() == self.imagesets[0].get_scan() ): imageset.set_scan(self.imagesets[0].get_scan()) if "flags" in self.reflections: strong_sel = self.reflections.get_flags(self.reflections.flags.strong) if strong_sel.count(True) > 0: self.reflections = self.reflections.select(strong_sel) if "flags" not in self.reflections or strong_sel.count(True) == 0: # backwards compatibility for testing self.reflections.set_flags( flex.size_t_range(len(self.reflections)), self.reflections.flags.strong ) self._setup_symmetry() self.d_min = None self.setup_indexing()
[docs] @staticmethod def from_parameters(reflections, imagesets, known_crystal_models=None, params=None): if params is None: params = master_params if known_crystal_models is not None: from dials.algorithms.indexing.known_orientation import ( indexer_known_orientation, ) if params.indexing.known_symmetry.space_group is None: params.indexing.known_symmetry.space_group = ( known_crystal_models[0].get_space_group().info() ) idxr = indexer_known_orientation( reflections, imagesets, params, known_crystal_models ) else: has_stills = False has_sweeps = False for imageset in imagesets: if ( imageset.get_goniometer() is None or imageset.get_scan() is None or imageset.get_scan().get_oscillation()[1] == 0 ): if has_sweeps: raise Sorry( "Please provide only stills or only sweeps, not both" ) has_stills = True else: if has_stills: raise Sorry( "Please provide only stills or only sweeps, not both" ) has_sweeps = True assert not (has_stills and has_sweeps) use_stills_indexer = has_stills if not ( params.indexing.stills.indexer is libtbx.Auto or params.indexing.stills.indexer.lower() == "auto" ): if params.indexing.stills.indexer == "stills": use_stills_indexer = True elif params.indexing.stills.indexer == "sweeps": use_stills_indexer = False else: assert False if params.indexing.basis_vector_combinations.max_refine is libtbx.Auto: if use_stills_indexer: params.indexing.basis_vector_combinations.max_refine = 5 else: params.indexing.basis_vector_combinations.max_refine = 50 if use_stills_indexer: # Ensure the indexer and downstream applications treat this as set of stills from dxtbx.imageset import ImageSet # , MemImageSet reset_sets = [] for i in xrange(len(imagesets)): imagesweep = imagesets.pop(0) imageset = ImageSet(imagesweep.data(), imagesweep.indices()) # if isinstance(imageset, MemImageSet): # imageset = MemImageSet(imagesweep._images, imagesweep.indices()) # else: # imageset = ImageSet(imagesweep.reader(), imagesweep.indices()) # imageset._models = imagesweep._models imageset.set_scan(None) imageset.set_goniometer(None) reset_sets.append(imageset) imagesets.extend(reset_sets) if params.indexing.method == "fft3d": if use_stills_indexer: from dials.algorithms.indexing.stills_indexer import ( stills_indexer_fft3d as indexer_fft3d, ) else: from dials.algorithms.indexing.fft3d import indexer_fft3d idxr = indexer_fft3d(reflections, imagesets, params=params) elif params.indexing.method == "fft1d": if use_stills_indexer: from dials.algorithms.indexing.stills_indexer import ( stills_indexer_fft1d as indexer_fft1d, ) else: from dials.algorithms.indexing.fft1d import indexer_fft1d idxr = indexer_fft1d(reflections, imagesets, params=params) elif params.indexing.method == "real_space_grid_search": if use_stills_indexer: from dials.algorithms.indexing.stills_indexer import ( stills_indexer_real_space_grid_search as indexer_real_space_grid_search, ) else: from dials.algorithms.indexing.real_space_grid_search import ( indexer_real_space_grid_search, ) idxr = indexer_real_space_grid_search( reflections, imagesets, params=params ) return idxr
def _setup_symmetry(self): self.target_symmetry_primitive = None self.target_symmetry_reference_setting = None self.cb_op_inp_ref = None target_unit_cell = self.params.known_symmetry.unit_cell target_space_group = self.params.known_symmetry.space_group if target_space_group is not None: target_space_group = target_space_group.group() target_space_group = target_space_group.build_derived_patterson_group() if target_unit_cell is not None or target_space_group is not None: if target_unit_cell is not None and target_space_group is not None: from cctbx.sgtbx.bravais_types import bravais_lattice target_bravais_t = bravais_lattice( group=target_space_group.info().reference_setting().group() ) best_subgroup = None best_angular_difference = 1e8 from cctbx.sgtbx import lattice_symmetry space_groups = [target_space_group] if target_space_group.conventional_centring_type_symbol() != "P": space_groups.append(sgtbx.space_group()) for target in space_groups: cs = crystal.symmetry( unit_cell=target_unit_cell, space_group=target, assert_is_compatible_unit_cell=False, ) target_best_cell = cs.best_cell().unit_cell() subgroups = lattice_symmetry.metric_subgroups(cs, max_delta=0.1) for subgroup in subgroups.result_groups: bravais_t = bravais_lattice( group=subgroup["ref_subsym"].space_group() ) if bravais_t == target_bravais_t: # allow for the cell to be given as best cell, reference setting # primitive settings, or minimum cell best_subsym = subgroup["best_subsym"] ref_subsym = best_subsym.as_reference_setting() if not ( best_subsym.unit_cell().is_similar_to(target_unit_cell) or ref_subsym.unit_cell().is_similar_to( target_unit_cell ) or ref_subsym.primitive_setting() .unit_cell() .is_similar_to(target_unit_cell) or best_subsym.primitive_setting() .unit_cell() .is_similar_to(target_unit_cell) or best_subsym.minimum_cell() .unit_cell() .is_similar_to(target_unit_cell.minimum_cell()) or best_subsym.unit_cell().is_similar_to( target_best_cell ) ): continue if ( subgroup["max_angular_difference"] < best_angular_difference ): best_subgroup = subgroup best_angular_difference = subgroup[ "max_angular_difference" ] if best_subgroup is None: raise Sorry("Unit cell incompatible with space group") cb_op_inp_best = best_subgroup["cb_op_inp_best"] best_subsym = best_subgroup["best_subsym"] cb_op_best_ref = best_subsym.change_of_basis_op_to_reference_setting() self.cb_op_inp_ref = cb_op_best_ref * cb_op_inp_best self.target_symmetry_reference_setting = crystal.symmetry( unit_cell=target_unit_cell.change_basis(self.cb_op_inp_ref), space_group=target_space_group.info() .as_reference_setting() .group(), ) elif target_unit_cell is not None: self.target_symmetry_reference_setting = crystal.symmetry( unit_cell=target_unit_cell, space_group=sgtbx.space_group() ) self.cb_op_inp_ref = sgtbx.change_of_basis_op() elif target_space_group is not None: self.cb_op_inp_ref = ( target_space_group.info().change_of_basis_op_to_reference_setting() ) self.target_symmetry_reference_setting = crystal.symmetry( space_group=target_space_group.change_basis(self.cb_op_inp_ref) ) self.cb_op_reference_to_primitive = ( self.target_symmetry_reference_setting.change_of_basis_op_to_primitive_setting() ) if target_unit_cell is not None: self.target_symmetry_primitive = self.target_symmetry_reference_setting.change_basis( self.cb_op_reference_to_primitive ) else: self.target_symmetry_primitive = crystal.symmetry( space_group=self.target_symmetry_reference_setting.space_group().change_basis( self.cb_op_reference_to_primitive ) ) self.cb_op_ref_inp = self.cb_op_inp_ref.inverse() self.cb_op_primitive_inp = ( self.cb_op_ref_inp * self.cb_op_reference_to_primitive.inverse() ) if self.target_symmetry_reference_setting is not None: logger.debug("Target symmetry (reference setting):") self.target_symmetry_reference_setting.show_summary(f=debug_handle) if self.target_symmetry_primitive is not None: logger.debug("Target symmetry (primitive cell):") self.target_symmetry_primitive.show_summary(f=debug_handle) logger.debug( "cb_op reference->primitive: " + str(self.cb_op_reference_to_primitive) ) logger.debug("cb_op primitive->input: " + str(self.cb_op_primitive_inp))
[docs] def setup_indexing(self): reflections_input = self.reflections self.reflections = flex.reflection_table() for i, imageset in enumerate(self.imagesets): if "imageset_id" not in reflections_input: reflections_input["imageset_id"] = reflections_input["id"] sel = reflections_input["imageset_id"] == i self.reflections.extend( self.map_spots_pixel_to_mm_rad( reflections_input.select(sel), imageset.get_detector(), imageset.get_scan(), ) ) self.filter_reflections_by_scan_range() if len(self.reflections) == 0: raise Sorry("No reflections left to index!") spots_mm = self.reflections self.reflections = flex.reflection_table() for i, imageset in enumerate(self.imagesets): spots_sel = spots_mm.select(spots_mm["imageset_id"] == i) self.map_centroids_to_reciprocal_space( spots_sel, imageset.get_detector(), imageset.get_beam(), imageset.get_goniometer(), ) spots_sel["entering"] = self.calculate_entering_flags( spots_sel, beam=imageset.get_beam(), goniometer=imageset.get_goniometer(), ) self.reflections.extend(spots_sel) try: self.find_max_cell() except AssertionError as e: if "too few spots" in str(e).lower(): raise Sorry(e) if self.params.sigma_phi_deg is not None: var_x, var_y, _ = self.reflections["xyzobs.mm.variance"].parts() var_phi_rad = flex.double( var_x.size(), (math.pi / 180 * self.params.sigma_phi_deg) ** 2 ) self.reflections["xyzobs.mm.variance"] = flex.vec3_double( var_x, var_y, var_phi_rad ) if self.params.debug: self.debug_write_reciprocal_lattice_points_as_pdb() self.reflections["id"] = flex.int(len(self.reflections), -1)
[docs] def index(self): experiments = ExperimentList() had_refinement_error = False have_similar_crystal_models = False while True: if had_refinement_error or have_similar_crystal_models: break max_lattices = self.params.multiple_lattice_search.max_lattices if max_lattices is not None and len(experiments) >= max_lattices: break if len(experiments) > 0: cutoff_fraction = ( self.params.multiple_lattice_search.recycle_unindexed_reflections_cutoff ) d_spacings = 1 / self.reflections["rlp"].norms() d_min_indexed = flex.min(d_spacings.select(self.indexed_reflections)) min_reflections_for_indexing = cutoff_fraction * len( self.reflections.select(d_spacings > d_min_indexed) ) crystal_ids = self.reflections.select(d_spacings > d_min_indexed)["id"] if (crystal_ids == -1).count(True) < min_reflections_for_indexing: logger.info( "Finish searching for more lattices: %i unindexed reflections remaining." % (min_reflections_for_indexing) ) break n_lattices_previous_cycle = len(experiments) if self.d_min is None: self.d_min = self.params.refinement_protocol.d_min_start if len(experiments) == 0: experiments.extend(self.find_lattices()) else: try: new = self.find_lattices() experiments.extend(new) except Sorry: logger.info("Indexing remaining reflections failed") if self.params.refinement_protocol.d_min_step is libtbx.Auto: n_cycles = self.params.refinement_protocol.n_macro_cycles if self.d_min is None or n_cycles == 1: self.params.refinement_protocol.d_min_step = 0 else: d_spacings = 1 / self.reflections["rlp"].norms() d_min_all = flex.min(d_spacings) self.params.refinement_protocol.d_min_step = ( self.d_min - d_min_all ) / (n_cycles - 1) logger.info( "Using d_min_step %.1f" % self.params.refinement_protocol.d_min_step ) if len(experiments) == 0: raise Sorry("No suitable lattice could be found.") elif len(experiments) == n_lattices_previous_cycle: # no more lattices found break for i_cycle in range(self.params.refinement_protocol.n_macro_cycles): if ( i_cycle > 0 and self.d_min is not None and self.params.refinement_protocol.d_min_step > 0 ): d_min = self.d_min - self.params.refinement_protocol.d_min_step d_min = max(d_min, 0) d_min = max(d_min, self.params.refinement_protocol.d_min_final) if d_min >= 0: self.d_min = d_min logger.info("Increasing resolution to %.2f Angstrom" % d_min) # reset reflection lattice flags # the lattice a given reflection belongs to: a value of -1 indicates # that a reflection doesn't belong to any lattice so far self.reflections["id"] = flex.int(len(self.reflections), -1) self.index_reflections(experiments, self.reflections) if i_cycle == 0 and self.params.known_symmetry.space_group is not None: # now apply the space group symmetry only after the first indexing # need to make sure that the symmetrized orientation is similar to the P1 model target_space_group = self.target_symmetry_primitive.space_group() for i_cryst, cryst in enumerate(experiments.crystals()): if i_cryst >= n_lattices_previous_cycle: new_cryst, cb_op_to_primitive = self.apply_symmetry( cryst, target_space_group ) if self.cb_op_primitive_inp is not None: new_cryst = new_cryst.change_basis( self.cb_op_primitive_inp ) cryst.update(new_cryst) cryst.set_space_group( self.params.known_symmetry.space_group.group() ) for i_expt, expt in enumerate(experiments): if expt.crystal is not cryst: continue if not cb_op_to_primitive.is_identity_op(): miller_indices = self.reflections[ "miller_index" ].select(self.reflections["id"] == i_expt) miller_indices = cb_op_to_primitive.apply( miller_indices ) self.reflections["miller_index"].set_selected( self.reflections["id"] == i_expt, miller_indices ) if self.cb_op_primitive_inp is not None: miller_indices = self.reflections[ "miller_index" ].select(self.reflections["id"] == i_expt) miller_indices = self.cb_op_primitive_inp.apply( miller_indices ) self.reflections["miller_index"].set_selected( self.reflections["id"] == i_expt, miller_indices ) logger.info("\nIndexed crystal models:") self.show_experiments( experiments, self.reflections, d_min=self.d_min ) if len(experiments) > 1: from dials.algorithms.indexing.compare_orientation_matrices import ( difference_rotation_matrix_axis_angle, ) cryst_b = experiments.crystals()[-1] have_similar_crystal_models = False for i_a, cryst_a in enumerate(experiments.crystals()[:-1]): R_ab, axis, angle, cb_op_ab = difference_rotation_matrix_axis_angle( cryst_a, cryst_b ) min_angle = ( self.params.multiple_lattice_search.minimum_angular_separation ) if abs(angle) < min_angle: # degrees logger.info( "Crystal models too similar, rejecting crystal %i:" % (len(experiments)) ) logger.info( "Rotation matrix to transform crystal %i to crystal %i" % (i_a + 1, len(experiments)) ) logger.info(R_ab) logger.info( "Rotation of %.3f degrees" % angle + " about axis (%.3f, %.3f, %.3f)" % axis ) # show_rotation_matrix_differences([cryst_a, cryst_b]) have_similar_crystal_models = True del experiments[-1] break if have_similar_crystal_models: break logger.info("") logger.info("#" * 80) logger.info("Starting refinement (macro-cycle %i)" % (i_cycle + 1)) logger.info("#" * 80) logger.info("") self.indexed_reflections = self.reflections["id"] > -1 sel = flex.bool(len(self.reflections), False) lengths = 1 / self.reflections["rlp"].norms() if self.d_min is not None: isel = (lengths <= self.d_min).iselection() sel.set_selected(isel, True) sel.set_selected(self.reflections["id"] == -1, True) self.reflections.unset_flags(sel, self.reflections.flags.indexed) self.unindexed_reflections = self.reflections.select(sel) reflections_for_refinement = self.reflections.select( self.indexed_reflections ) if self.params.refinement_protocol.mode == "repredict_only": refined_experiments, refined_reflections = ( experiments, reflections_for_refinement, ) from dials.algorithms.refinement.prediction.managed_predictors import ( ExperimentsPredictorFactory, ) ref_predictor = ExperimentsPredictorFactory.from_experiments( experiments, spherical_relp=self.all_params.refinement.parameterisation.spherical_relp_model, ) ref_predictor(refined_reflections) else: try: refined_experiments, refined_reflections = self.refine( experiments, reflections_for_refinement ) except Sorry as e: if len(experiments) == 1: raise had_refinement_error = True logger.info("Refinement failed:") logger.info(e) del experiments[-1] break # sanity check for unrealistic unit cell volume increase during refinement # usually this indicates too many parameters are being refined given the # number of observations provided. if ( not self.params.refinement_protocol.disable_unit_cell_volume_sanity_check ): for orig_expt, refined_expt in zip( experiments, refined_experiments ): uc1 = orig_expt.crystal.get_unit_cell() uc2 = refined_expt.crystal.get_unit_cell() volume_change = abs(uc1.volume() - uc2.volume()) / uc1.volume() cutoff = 0.5 if volume_change > cutoff: msg = "\n".join( ( "Unrealistic unit cell volume increase during refinement of %.1f%%.", "Please try refining fewer parameters, either by enforcing symmetry", "constraints (space_group=) and/or disabling experimental geometry", "refinement (detector.fix=all and beam.fix=all). To disable this", "sanity check set disable_unit_cell_volume_sanity_check=True.", ) ) % (100 * volume_change) raise Sorry(msg) self.refined_reflections = refined_reflections self.refined_reflections.unset_flags( self.refined_reflections["id"] < 0, self.refined_reflections.flags.indexed, ) for i, imageset in enumerate(self.imagesets): ref_sel = self.refined_reflections.select( self.refined_reflections["imageset_id"] == i ) ref_sel = ref_sel.select(ref_sel["id"] >= 0) for i_expt in set(ref_sel["id"]): expt = refined_experiments[i_expt] imageset.set_detector(expt.detector) imageset.set_beam(expt.beam) imageset.set_goniometer(expt.goniometer) imageset.set_scan(expt.scan) expt.imageset = imageset if not ( self.all_params.refinement.parameterisation.beam.fix == "all" and self.all_params.refinement.parameterisation.detector.fix == "all" ): # Experimental geometry may have changed - re-map centroids to # reciprocal space spots_mm = self.reflections self.reflections = flex.reflection_table() for i, imageset in enumerate(self.imagesets): spots_sel = spots_mm.select(spots_mm["imageset_id"] == i) self.map_centroids_to_reciprocal_space( spots_sel, imageset.get_detector(), imageset.get_beam(), imageset.get_goniometer(), ) self.reflections.extend(spots_sel) # update for next cycle experiments = refined_experiments self.refined_experiments = refined_experiments logger.info("\nRefined crystal models:") self.show_experiments( self.refined_experiments, self.reflections, d_min=self.d_min ) if ( i_cycle >= 2 and self.d_min == self.params.refinement_protocol.d_min_final ): logger.info("Target d_min_final reached: finished with refinement") break if not "refined_experiments" in locals(): raise Sorry("None of the experiments could refine.") if len(self.refined_experiments) > 1: from dials.algorithms.indexing.compare_orientation_matrices import ( rotation_matrix_differences, ) logger.info( rotation_matrix_differences(self.refined_experiments.crystals()) ) self.refined_reflections["xyzcal.px"] = flex.vec3_double( len(self.refined_reflections) ) for i, imageset in enumerate(self.imagesets): imgset_sel = self.refined_reflections["imageset_id"] == i # set xyzcal.px field in self.refined_reflections refined_reflections = self.refined_reflections.select(imgset_sel) panel_numbers = flex.size_t(refined_reflections["panel"]) xyzcal_mm = refined_reflections["xyzcal.mm"] x_mm, y_mm, z_rad = xyzcal_mm.parts() xy_cal_mm = flex.vec2_double(x_mm, y_mm) xy_cal_px = flex.vec2_double(len(xy_cal_mm)) for i_panel in range(len(imageset.get_detector())): panel = imageset.get_detector()[i_panel] sel = panel_numbers == i_panel isel = sel.iselection() ref_panel = refined_reflections.select(panel_numbers == i_panel) xy_cal_px.set_selected( sel, panel.millimeter_to_pixel(xy_cal_mm.select(sel)) ) x_px, y_px = xy_cal_px.parts() scan = imageset.get_scan() if scan is not None: z_px = scan.get_array_index_from_angle(z_rad, deg=False) else: # must be a still image, z centroid not meaningful z_px = z_rad xyzcal_px = flex.vec3_double(x_px, y_px, z_px) self.refined_reflections["xyzcal.px"].set_selected(imgset_sel, xyzcal_px)
[docs] def show_experiments(self, experiments, reflections, d_min=None): if d_min is not None: reciprocal_lattice_points = reflections["rlp"] d_spacings = 1 / reciprocal_lattice_points.norms() reflections = reflections.select(d_spacings > d_min) for i_expt, expt in enumerate(experiments): logger.info( "model %i (%i reflections):" % (i_expt + 1, (reflections["id"] == i_expt).count(True)) ) logger.info(expt.crystal) indexed_flags = reflections.get_flags(reflections.flags.indexed) imageset_id = reflections["imageset_id"] rows = [["Imageset", "#indexed", "#unindexed"]] for i in range(flex.max(imageset_id) + 1): imageset_indexed_flags = indexed_flags.select(imageset_id == i) rows.append( [ str(i), str(imageset_indexed_flags.count(True)), str(imageset_indexed_flags.count(False)), ] ) from libtbx import table_utils logger.info( table_utils.format(rows, has_header=True, prefix="| ", postfix=" |") )
[docs] def find_max_cell(self): params = self.params.max_cell_estimation if self.params.max_cell is libtbx.Auto: if self.params.known_symmetry.unit_cell is not None: uc_params = self.target_symmetry_primitive.unit_cell().parameters() self.params.max_cell = params.multiplier * max(uc_params[:3]) logger.info("Using max_cell: %.1f Angstrom" % (self.params.max_cell)) else: self.params.max_cell = find_max_cell( self.reflections, max_cell_multiplier=params.multiplier, step_size=params.step_size, nearest_neighbor_percentile=params.nearest_neighbor_percentile, histogram_binning=params.histogram_binning, nn_per_bin=params.nn_per_bin, max_height_fraction=params.max_height_fraction, filter_ice=params.filter_ice, filter_overlaps=params.filter_overlaps, overlaps_border=params.overlaps_border, ).max_cell logger.info("Found max_cell: %.1f Angstrom" % (self.params.max_cell))
[docs] def filter_reflections_by_scan_range(self): if len(self.params.scan_range): self.reflections = filter_reflections_by_scan_range( self.reflections, self.params.scan_range )
[docs] @staticmethod def calculate_entering_flags(reflections, beam, goniometer): if goniometer is None: return flex.bool(len(reflections), False) axis = matrix.col(goniometer.get_rotation_axis()) s0 = matrix.col(beam.get_s0()) # calculate a unit vector normal to the spindle-beam plane for this # experiment, such that the vector placed at the centre of the Ewald sphere # points to the hemisphere in which reflections cross from inside to outside # of the sphere (reflections are exiting). NB this vector is in +ve Y # direction when using imgCIF coordinate frame. vec = s0.cross(axis) entering = reflections["s1"].dot(vec) < 0.0 return entering
[docs] @staticmethod def map_spots_pixel_to_mm_rad(spots, detector, scan): """Map spot centroids from pixel/image number to mm/radian. Used to convert spot centroids coming from e.g. dials.find_spots which are in pixel/image number units to mm/radian units as required for indexing and refinement. :param spots: a reflection table containing the columns 'xyzobs.px.value', 'xyzobs.px.variance' and 'panel' :type spots: dials.array_family.flex.reflection_table :param detector: a dxtbx detector object :type detector: dxtbx.model.detector.Detector :param scan: a dxtbx scan object. May be None, e.g. for a still image :type scan: dxtbx.model.scan.Scan :returns: A copy of the input reflection table containing the additional keys 'xyzobs.mm.value' and 'xyzobs.mm.variance' :rtype: dials.array_family.flex.reflection_table """ from dials.algorithms.centroid import centroid_px_to_mm_panel ## ideally don't copy, but have separate spot attributes for mm and pixel import copy spots = copy.deepcopy(spots) spots["xyzobs.mm.value"] = flex.vec3_double(len(spots)) spots["xyzobs.mm.variance"] = flex.vec3_double(len(spots)) panel_numbers = flex.size_t(spots["panel"]) for i_panel in range(len(detector)): sel = panel_numbers == i_panel isel = sel.iselection() spots_panel = spots.select(panel_numbers == i_panel) # e.g. data imported from XDS; no variance known then; since is used # only for weights assign as 1 => uniform weights if not "xyzobs.px.variance" in spots_panel: spots_panel["xyzobs.px.variance"] = flex.vec3_double( len(spots_panel), (1, 1, 1) ) centroid_position, centroid_variance, _ = centroid_px_to_mm_panel( detector[i_panel], scan, spots_panel["xyzobs.px.value"], spots_panel["xyzobs.px.variance"], flex.vec3_double(len(spots_panel), (1, 1, 1)), ) spots["xyzobs.mm.value"].set_selected(sel, centroid_position) spots["xyzobs.mm.variance"].set_selected(sel, centroid_variance) return spots
[docs] @staticmethod def map_centroids_to_reciprocal_space( spots_mm, detector, beam, goniometer, calculated=False ): """Map mm/radian spot centroids to reciprocal space. Used to convert spot centroids provided in mm/radian units to reciprocal space as required for indexing. Adds the column 'rlp' to the reflection table, which contains a :py:class:`.flex.vec3_double` array of the reciprocal lattice vectors. :param spots_mm: a reflection table containing the column 'xyzobs.mm.value' :type spots_mm: dials.array_family.flex.reflection_table :param detector: a dxtbx detector object :type detector: dxtbx.model.detector.Detector :param beam: a dxtbx beam object :type beam: dxtbx.model.beam.Beam :param goniometer: a dxtbx goniometer object. May be None, e.g. for a still image :type goniometer: dxtbx.model.goniometer.Goniometer """ if "s1" not in spots_mm: spots_mm["s1"] = flex.vec3_double(len(spots_mm)) spots_mm["rlp"] = flex.vec3_double(len(spots_mm)) panel_numbers = flex.size_t(spots_mm["panel"]) for i_panel in range(len(detector)): sel = panel_numbers == i_panel spots_panel = spots_mm.select(panel_numbers == i_panel) if calculated: x, y, rot_angle = spots_panel["xyzcal.mm"].parts() else: x, y, rot_angle = spots_panel["xyzobs.mm.value"].parts() s1 = detector[i_panel].get_lab_coord(flex.vec2_double(x, y)) s1 = s1 / s1.norms() * (1 / beam.get_wavelength()) spots_mm["s1"].set_selected(sel, s1) S = s1 - beam.get_s0() if goniometer is not None: setting_rotation = matrix.sqr(goniometer.get_setting_rotation()) rotation_axis = goniometer.get_rotation_axis_datum() fixed_rotation = matrix.sqr(goniometer.get_fixed_rotation()) spots_mm["rlp"].set_selected(sel, tuple(setting_rotation.inverse()) * S) spots_mm["rlp"].set_selected( sel, spots_mm["rlp"] .select(sel) .rotate_around_origin(rotation_axis, -rot_angle), ) spots_mm["rlp"].set_selected( sel, tuple(fixed_rotation.inverse()) * spots_mm["rlp"].select(sel) ) else: spots_mm["rlp"].set_selected(sel, S)
[docs] def find_candidate_orientation_matrices(self, candidate_basis_vectors): candidate_crystal_models = [] vectors = candidate_basis_vectors if ( self.target_symmetry_primitive is not None and self.target_symmetry_primitive.unit_cell() is not None ): target_min_cell = self.target_symmetry_primitive.minimum_cell().unit_cell() else: target_min_cell = None # select unique combinations of input vectors to test # the order of combinations is such that combinations comprising vectors # nearer the beginning of the input list will appear before combinations # comprising vectors towards the end of the list n = len(vectors) # hardcoded limit on number of vectors, fixes issue #72 # https://github.com/dials/dials/issues/72 n = min(n, 100) vectors = vectors[:n] combinations = flex.vec3_int(flex.nested_loop((n, n, n))) combinations = combinations.select( flex.sort_permutation(combinations.as_vec3_double().norms()) ) from rstbx.indexing_api import tools transformations = [ sgtbx.change_of_basis_op( str(sgtbx.rt_mx(sgtbx.rot_mx(T["trans"].transpose().as_int()))) ) for T in tools.R if T["mod"] < 5 ] transformations = [] # XXX temporarily disable cell doubling checks transformations.insert(0, sgtbx.change_of_basis_op()) # select only those combinations where j > i and k > j i, j, k = combinations.as_vec3_double().parts() sel = flex.bool(len(combinations), True) sel &= j > i sel &= k > j combinations = combinations.select(sel) max_combinations = self.params.basis_vector_combinations.max_combinations if max_combinations is not None and max_combinations < len(combinations): combinations = combinations[:max_combinations] half_pi = 0.5 * math.pi min_angle = 20 / 180 * math.pi # 20 degrees, arbitrary cutoff for i, j, k in combinations: a = vectors[i] b = vectors[j] angle = a.angle(b) if angle < min_angle or (math.pi - angle) < min_angle: continue a_cross_b = a.cross(b) gamma = a.angle(b) if gamma < half_pi: # all angles obtuse if possible please b = -b gamma = math.pi - gamma a_cross_b = -a_cross_b c = vectors[k] if abs(half_pi - a_cross_b.angle(c)) < min_angle: continue alpha = b.angle(c, deg=True) if alpha < half_pi: c = -c # beta = c.angle(a, deg=True) if a_cross_b.dot(c) < 0: # we want right-handed basis set, therefore invert all vectors a = -a b = -b c = -c # assert a.cross(b).dot(c) > 0 model = Crystal(a, b, c, space_group_symbol="P 1") uc = model.get_unit_cell() model_orig = model best_model = None min_bmsd = 1e8 cb_op_to_niggli = uc.change_of_basis_op_to_niggli_cell() model = model.change_basis(cb_op_to_niggli) if self.target_symmetry_primitive is not None: max_delta = self.params.known_symmetry.max_delta from dials.algorithms.indexing.symmetry import find_matching_symmetry for T in transformations: uc = model.get_unit_cell() det = T.c().r().determinant() if self.target_symmetry_primitive.unit_cell() is None: if det > 1: break else: primitive_volume = ( self.target_symmetry_primitive.unit_cell().volume() ) if det > 1 and abs(uc.volume() / primitive_volume - det) < 1e-1: uc = uc.change_basis(T) best_subgroup = find_matching_symmetry( uc, self.target_symmetry_primitive.space_group(), max_delta=max_delta, ) cb_op_extra = None if best_subgroup is None: if self.target_symmetry_reference_setting is not None: # if we have been told we have a centred unit cell check that # indexing hasn't found the centred unit cell instead of the # primitive cell best_subgroup = find_matching_symmetry( uc, self.target_symmetry_reference_setting.space_group().build_derived_point_group(), max_delta=max_delta, ) cb_op_extra = self.cb_op_reference_to_primitive if best_subgroup is None: continue else: continue cb_op_inp_best = best_subgroup["cb_op_inp_best"] best_subsym = best_subgroup["best_subsym"] cb_op_best_ref = ( best_subsym.change_of_basis_op_to_reference_setting() ) ref_subsym = best_subsym.change_basis(cb_op_best_ref) cb_op_ref_primitive = ( ref_subsym.change_of_basis_op_to_primitive_setting() ) cb_op_to_primitive = ( cb_op_ref_primitive * cb_op_best_ref * cb_op_inp_best * T ) if cb_op_extra is not None: cb_op_to_primitive = cb_op_extra * cb_op_to_primitive best_model = model.change_basis(cb_op_to_primitive) def is_similar_cell(uc1, uc2): return uc1.is_similar_to( uc2, relative_length_tolerance=self.params.known_symmetry.relative_length_tolerance, absolute_angle_tolerance=self.params.known_symmetry.absolute_angle_tolerance, ) if self.target_symmetry_primitive.unit_cell() is not None and not ( is_similar_cell( best_model.get_unit_cell(), self.target_symmetry_primitive.unit_cell(), ) or is_similar_cell( best_model.get_unit_cell().minimum_cell(), target_min_cell ) ): best_model = None continue else: break else: best_model = model if best_model is None: continue uc = best_model.get_unit_cell() params = uc.parameters() if uc.volume() > (params[0] * params[1] * params[2] / 100): # unit cell volume cutoff from labelit 2004 paper candidate_crystal_models.append(best_model) if ( len(candidate_crystal_models) == self.params.basis_vector_combinations.max_refine ): return candidate_crystal_models return candidate_crystal_models
[docs] def choose_best_orientation_matrix(self, candidate_orientation_matrices): solution_scorer = self.params.basis_vector_combinations.solution_scorer if solution_scorer == "weighted": weighted_params = self.params.basis_vector_combinations.weighted solutions = SolutionTrackerWeighted( power=weighted_params.power, volume_weight=weighted_params.volume_weight, n_indexed_weight=weighted_params.n_indexed_weight, rmsd_weight=weighted_params.rmsd_weight, ) else: filter_params = self.params.basis_vector_combinations.filter solutions = SolutionTrackerFilter( check_doubled_cell=filter_params.check_doubled_cell, likelihood_cutoff=filter_params.likelihood_cutoff, volume_cutoff=filter_params.volume_cutoff, n_indexed_cutoff=filter_params.n_indexed_cutoff, ) def run_one_refinement(args): params, reflections, experiments = args indexed_reflections = reflections.select(reflections["id"] > -1) from dials.command_line import check_indexing_symmetry grid_search_scope = params.indexing.check_misindexing.grid_search_scope best_offset = (0, 0, 0) best_cc = 0.0 best_nref = 0 if grid_search_scope > 0: offsets, ccs, nref = check_indexing_symmetry.get_indexing_offset_correlation_coefficients( indexed_reflections, experiments.crystals()[0], grid=grid_search_scope, map_to_asu=True, ) if len(offsets) > 1: max_nref = flex.max(nref) # select "best" solution - needs nref > 0.5 max nref && highest CC # FIXME perform proper statistical test in here do not like heuristics for offset, cc, n in zip(offsets, ccs, nref): if n < (max_nref // 2): continue if cc > best_cc: best_cc = cc best_offset = offset best_nref = n # print offsets[13], nref[13], '%.2f' %ccs[13] # (0,0,0) # print best_offset, best_nref, '%.2f' %best_cc if best_offset != (0, 0, 0): logger.debug( "Applying h,k,l offset: (%i, %i, %i)" % best_offset + " [cc = %.2f]" % best_cc ) indexed_reflections["miller_index"] = apply_hkl_offset( indexed_reflections["miller_index"], best_offset ) from dials.algorithms.refinement import RefinerFactory reflogger = logging.getLogger("dials.algorithms.refinement") level = reflogger.getEffectiveLevel() reflogger.setLevel(logging.ERROR) try: refiner = RefinerFactory.from_parameters_data_experiments( params, indexed_reflections, experiments ) refiner.run() except (RuntimeError, ValueError, Sorry) as e: return else: rmsds = refiner.rmsds() xy_rmsds = math.sqrt(rmsds[0] ** 2 + rmsds[1] ** 2) model_likelihood = 1.0 - xy_rmsds soln = Solution( model_likelihood=model_likelihood, crystal=experiments.crystals()[0], rmsds=rmsds, n_indexed=len(indexed_reflections), fraction_indexed=float(len(indexed_reflections)) / len(reflections), hkl_offset=best_offset, ) return soln finally: reflogger.setLevel(level) import copy params = copy.deepcopy(self.all_params) params.refinement.parameterisation.auto_reduction.action = "fix" params.refinement.parameterisation.scan_varying = False params.refinement.refinery.max_iterations = 4 params.refinement.reflections.reflections_per_degree = min( params.refinement.reflections.reflections_per_degree, 20 ) if params.refinement.reflections.outlier.block_width is libtbx.Auto: # auto block_width determination is potentially too expensive to do at # this stage: instead set separate_blocks=False and increase value # of tukey.iqr_multiplier to be more tolerant of outliers params.refinement.reflections.outlier.separate_blocks = False params.refinement.reflections.outlier.tukey.iqr_multiplier = ( 2 * params.refinement.reflections.outlier.tukey.iqr_multiplier ) args = [] from dials.algorithms.indexing.compare_orientation_matrices import ( difference_rotation_matrix_axis_angle, ) for cm in candidate_orientation_matrices: sel = self.reflections["id"] == -1 if self.d_min is not None: sel &= 1 / self.reflections["rlp"].norms() > self.d_min xo, yo, zo = self.reflections["xyzobs.mm.value"].parts() imageset_id = self.reflections["imageset_id"] experiments = ExperimentList() for i_imageset, imageset in enumerate(self.imagesets): scan = imageset.get_scan() if scan is not None: start, end = scan.get_oscillation_range() if (end - start) > 360: # only use reflections from the first 360 degrees of the scan sel.set_selected( (imageset_id == i_imageset) & (zo > ((start * math.pi / 180) + 2 * math.pi)), False, ) experiments.append( Experiment( imageset=imageset, beam=imageset.get_beam(), detector=imageset.get_detector(), goniometer=imageset.get_goniometer(), scan=scan, crystal=cm, ) ) refl = self.reflections.select(sel) self.index_reflections(experiments, refl) if refl.get_flags(refl.flags.indexed).count(True) == 0: continue from rstbx.dps_core.cell_assessment import SmallUnitCellVolume threshold = self.params.basis_vector_combinations.sys_absent_threshold if threshold and ( self.target_symmetry_primitive is None or self.target_symmetry_primitive.unit_cell() is None ): try: self.correct_non_primitive_basis(experiments, refl, threshold) if refl.get_flags(refl.flags.indexed).count(True) == 0: continue except SmallUnitCellVolume: logger.debug( "correct_non_primitive_basis SmallUnitCellVolume error for unit cell %s:" % experiments[0].crystal.get_unit_cell() ) continue except RuntimeError as e: if "Krivy-Gruber iteration limit exceeded" in str(e): logger.debug( "correct_non_primitive_basis Krivy-Gruber iteration limit exceeded error for unit cell %s:" % experiments[0].crystal.get_unit_cell() ) continue raise if ( experiments[0].crystal.get_unit_cell().volume() < self.params.min_cell_volume ): continue if self.params.known_symmetry.space_group is not None: target_space_group = self.target_symmetry_primitive.space_group() new_crystal, cb_op_to_primitive = self.apply_symmetry( experiments[0].crystal, target_space_group ) if new_crystal is None: continue experiments[0].crystal.update(new_crystal) if not cb_op_to_primitive.is_identity_op(): sel = refl["id"] > -1 miller_indices = refl["miller_index"].select(sel) miller_indices = cb_op_to_primitive.apply(miller_indices) refl["miller_index"].set_selected(sel, miller_indices) if 0 and self.cb_op_primitive_to_given is not None: sel = refl["id"] > -1 experiments[0].crystal.update( experiments[0].crystal.change_basis( self.cb_op_primitive_to_given ) ) miller_indices = refl["miller_index"].select(sel) miller_indices = self.cb_op_primitive_to_given.apply(miller_indices) refl["miller_index"].set_selected(sel, miller_indices) if ( self.refined_experiments is not None and len(self.refined_experiments) > 0 ): orientation_too_similar = False cryst_b = experiments[0].crystal for i_a, cryst_a in enumerate(self.refined_experiments.crystals()): R_ab, axis, angle, cb_op_ab = difference_rotation_matrix_axis_angle( cryst_a, cryst_b ) min_angle = ( self.params.multiple_lattice_search.minimum_angular_separation ) if abs(angle) < min_angle: # degrees orientation_too_similar = True break if orientation_too_similar: logger.debug("skipping crystal: too similar to other crystals") continue args.append((params, refl, experiments)) from libtbx import easy_mp results = easy_mp.parallel_map( run_one_refinement, args, processes=self.params.nproc, preserve_exception_message=True, ) for soln in results: if soln is None: continue solutions.append(soln) if len(solutions): logger.info("Candidate solutions:") logger.info(str(solutions)) best_solution = solutions.best_solution() logger.debug("best model_likelihood: %.2f" % best_solution.model_likelihood) logger.debug("best n_indexed: %i" % best_solution.n_indexed) self.hkl_offset = best_solution.hkl_offset return best_solution.crystal, best_solution.n_indexed else: return None, None
[docs] def correct_non_primitive_basis(self, experiments, reflections, threshold): assert len(experiments.crystals()) == 1 while True: sel = reflections["miller_index"] != (0, 0, 0) if sel.count(True) == 0: break T = detect_non_primitive_basis( reflections["miller_index"].select(sel), threshold=threshold ) if T is None: break crystal_model = experiments.crystals()[0] direct_matrix = matrix.sqr(crystal_model.get_A()).inverse() M = T.inverse().transpose() new_direct_matrix = M * direct_matrix crystal_model.set_A(new_direct_matrix.inverse()) from rstbx.dps_core.cell_assessment import unit_cell_too_small unit_cell_too_small(crystal_model.get_unit_cell()) cb_op = crystal_model.get_unit_cell().change_of_basis_op_to_niggli_cell() crystal_model.update(crystal_model.change_basis(cb_op)) reflections["id"] = flex.int(len(reflections), -1) reflections.unset_flags( flex.bool(len(reflections), True), reflections.flags.indexed ) self.index_reflections(experiments, reflections)
[docs] def apply_symmetry(self, crystal_model, target_space_group): A = crystal_model.get_A() from cctbx.crystal_orientation import crystal_orientation from cctbx.sgtbx.bravais_types import bravais_lattice from rstbx import dps_core # import dependency from rstbx.dps_core.lepage import iotbx_converter max_delta = self.params.known_symmetry.max_delta items = iotbx_converter(crystal_model.get_unit_cell(), max_delta=max_delta) target_sg_ref = target_space_group.info().reference_setting().group() best_angular_difference = 1e8 best_subgroup = None for item in items: if bravais_lattice(group=target_sg_ref) != bravais_lattice( group=item["ref_subsym"].space_group() ): continue if item["max_angular_difference"] < best_angular_difference: best_angular_difference = item["max_angular_difference"] best_subgroup = item if best_subgroup is None: return None, None cb_op_inp_best = best_subgroup["cb_op_inp_best"] orient = crystal_orientation(A, True) orient_best = orient.change_basis( matrix.sqr(cb_op_inp_best.c().as_double_array()[0:9]).transpose() ) constrain_orient = orient_best.constrain(best_subgroup["system"]) best_subsym = best_subgroup["best_subsym"] cb_op_best_ref = best_subsym.change_of_basis_op_to_reference_setting() target_sg_best = target_sg_ref.change_basis(cb_op_best_ref.inverse()) ref_subsym = best_subsym.change_basis(cb_op_best_ref) cb_op_ref_primitive = ref_subsym.change_of_basis_op_to_primitive_setting() primitive_subsym = ref_subsym.change_basis(cb_op_ref_primitive) cb_op_best_primitive = cb_op_ref_primitive * cb_op_best_ref cb_op_inp_primitive = cb_op_ref_primitive * cb_op_best_ref * cb_op_inp_best direct_matrix = constrain_orient.direct_matrix() a = matrix.col(direct_matrix[:3]) b = matrix.col(direct_matrix[3:6]) c = matrix.col(direct_matrix[6:9]) model = Crystal(a, b, c, space_group=target_sg_best) assert target_sg_best.is_compatible_unit_cell(model.get_unit_cell()) model = model.change_basis(cb_op_best_primitive) return model, cb_op_inp_primitive
[docs] def index_reflections(self, experiments, reflections): if self.params.index_assignment.method == "local": params_local = self.params.index_assignment.local from dials.algorithms.indexing import index_reflections_local index_reflections_local( reflections, experiments, self.d_min, epsilon=params_local.epsilon, delta=params_local.delta, l_min=params_local.l_min, nearest_neighbours=params_local.nearest_neighbours, ) else: params_simple = self.params.index_assignment.simple from dials.algorithms.indexing import index_reflections index_reflections( reflections, experiments, self.d_min, tolerance=params_simple.hkl_tolerance, ) if self.hkl_offset is not None and self.hkl_offset != (0, 0, 0): reflections["miller_index"] = apply_hkl_offset( reflections["miller_index"], self.hkl_offset ) self.hkl_offset = None
[docs] def refine(self, experiments, reflections): from dials.algorithms.indexing.refinement import refine refiner, refined, outliers = refine( self.all_params, reflections, experiments, verbosity=self.params.refinement_protocol.verbosity, debug_plots=self.params.debug_plots, ) if outliers is not None: reflections["id"].set_selected(outliers, -1) predicted = refiner.predict_for_indexed() verbosity = self.params.refinement_protocol.verbosity reflections["xyzcal.mm"] = predicted["xyzcal.mm"] reflections["entering"] = predicted["entering"] reflections.unset_flags( flex.bool(len(reflections), True), reflections.flags.centroid_outlier ) assert ( reflections.get_flags(reflections.flags.centroid_outlier).count(True) == 0 ) reflections.set_flags( predicted.get_flags(predicted.flags.centroid_outlier), reflections.flags.centroid_outlier, ) reflections.set_flags( refiner.selection_used_for_refinement(), reflections.flags.used_in_refinement, ) return refiner.get_experiments(), reflections
[docs] def debug_show_candidate_basis_vectors(self): vectors = self.candidate_basis_vectors logger.debug("Candidate basis vectors:") for i, v in enumerate(vectors): logger.debug("%s %s" % (i, v.length())) # , vector_heights[i] if self.params.debug: # print a table of the angles between each pair of vectors from cStringIO import StringIO s = StringIO() angles = flex.double(len(vectors) ** 2) angles.reshape(flex.grid(len(vectors), len(vectors))) for i in range(len(vectors)): v_i = vectors[i] for j in range(i + 1, len(vectors)): v_j = vectors[j] angles[i, j] = v_i.angle(v_j, deg=True) print((" " * 7), end=" ", file=s) for i in range(len(vectors)): print("%7.3f" % vectors[i].length(), end=" ", file=s) print(file=s) for i in range(len(vectors)): print("%7.3f" % vectors[i].length(), end=" ", file=s) for j in range(len(vectors)): if j <= i: print((" " * 7), end=" ", file=s) else: print("%5.1f " % angles[i, j], end=" ", file=s) print(file=s) logger.debug(s.getvalue())
[docs] def debug_plot_candidate_basis_vectors(self): from matplotlib import pyplot from mpl_toolkits.mplot3d import Axes3D # import dependency fig = pyplot.figure() ax = fig.add_subplot(111, projection="3d") ax.scatter([0], [0], [0], marker="+", s=50) # http://stackoverflow.com/questions/11140163/python-matplotlib-plotting-a-3d-cube-a-sphere-and-a-vector # draw a vector from matplotlib.patches import FancyArrowPatch from mpl_toolkits.mplot3d import proj3d class Arrow3D(FancyArrowPatch): def __init__(self, xs, ys, zs, *args, **kwargs): FancyArrowPatch.__init__(self, (0, 0), (0, 0), *args, **kwargs) self._verts3d = xs, ys, zs def draw(self, renderer): xs3d, ys3d, zs3d = self._verts3d xs, ys, zs = proj3d.proj_transform(xs3d, ys3d, zs3d, renderer.M) self.set_positions((xs[0], ys[0]), (xs[1], ys[1])) FancyArrowPatch.draw(self, renderer) for v in self.candidate_basis_vectors: x, y, z = v.elems a = Arrow3D( [0, x], [0, y], [0, z], mutation_scale=10, lw=1, arrowstyle="-|>", color="k", ) ax.add_artist(a) a = Arrow3D( [0, -x], [0, -y], [0, -z], mutation_scale=10, lw=1, arrowstyle="-|>", color="k", ) ax.add_artist(a) x, y, z = zip(*self.candidate_basis_vectors) ax.scatter(x, y, z, marker=".", s=1) ax.scatter([-i for i in x], [-i for i in y], [-i for i in z], marker=".", s=1) pyplot.show()
[docs] def debug_write_reciprocal_lattice_points_as_pdb( self, file_name="reciprocal_lattice.pdb" ): from cctbx import crystal, xray cs = crystal.symmetry( unit_cell=(1000, 1000, 1000, 90, 90, 90), space_group="P1" ) for i_panel in range(len(self.imagesets[0].get_detector())): if len(self.imagesets[0].get_detector()) > 1: file_name = "reciprocal_lattice_%i.pdb" % i_panel with open(file_name, "wb") as f: xs = xray.structure(crystal_symmetry=cs) reflections = self.reflections.select( self.reflections["panel"] == i_panel ) for site in reflections["rlp"]: xs.add_scatterer(xray.scatterer("C", site=site)) xs.sites_mod_short() f.write(xs.as_pdb_file())
[docs] def debug_write_ccp4_map(self, map_data, file_name): from iotbx import ccp4_map gridding_first = (0, 0, 0) gridding_last = map_data.all() labels = ["cctbx.miller.fft_map"] ccp4_map.write_ccp4_map( file_name=file_name, unit_cell=self.fft_cell, space_group=sgtbx.space_group("P1"), gridding_first=gridding_first, gridding_last=gridding_last, map_data=map_data, labels=flex.std_string(labels), )
[docs] def export_as_json(self, experiments, file_name="experiments.json", compact=False): from dxtbx.serialize import dump assert experiments.is_consistent() dump.experiment_list(experiments, file_name)
[docs] def export_reflections(self, reflections, file_name="reflections.pickle"): reflections.as_pickle(file_name)
[docs] def find_lattices(self): raise NotImplementedError()
from libtbx import group_args
[docs]class Solution(group_args): pass
[docs]def filter_doubled_cell(solutions): from dials.algorithms.indexing.compare_orientation_matrices import ( difference_rotation_matrix_axis_angle, ) accepted_solutions = [] for i1, s1 in enumerate(solutions): doubled_cell = False for (m1, m2, m3) in ( (2, 1, 1), (1, 2, 1), (1, 1, 2), (2, 2, 1), (2, 1, 2), (1, 2, 2), (2, 2, 2), ): if doubled_cell: break a, b, c = s1.crystal.get_real_space_vectors() new_cryst = Crystal( real_space_a=1 / m1 * a, real_space_b=1 / m2 * b, real_space_c=1 / m3 * c, space_group=s1.crystal.get_space_group(), ) new_unit_cell = new_cryst.get_unit_cell() for s2 in solutions: if s2 is s1: continue if new_unit_cell.is_similar_to( s2.crystal.get_unit_cell(), relative_length_tolerance=0.05 ): R, axis, angle, cb = difference_rotation_matrix_axis_angle( new_cryst, s2.crystal ) if (angle < 1) and (s1.n_indexed < (1.1 * s2.n_indexed)): doubled_cell = True break if not doubled_cell: accepted_solutions.append(s1) return accepted_solutions
# Tracker for solutions based on code in rstbx/dps_core/basis_choice.py
[docs]class SolutionTrackerFilter(object): def __init__( self, check_doubled_cell=True, likelihood_cutoff=0.8, volume_cutoff=1.25, n_indexed_cutoff=0.9, ): self.check_doubled_cell = check_doubled_cell self.likelihood_cutoff = likelihood_cutoff self.volume_cutoff = volume_cutoff self.n_indexed_cutoff = n_indexed_cutoff self.all_solutions = [] self.filtered_solutions = []
[docs] def append(self, item): self.all_solutions.append(item) self.update_analysis()
def __len__(self): return len(self.filtered_solutions)
[docs] def filter_by_likelihood(self, solutions): best_likelihood = max(s.model_likelihood for s in solutions) offset = 0 while (best_likelihood + offset) <= 0: offset += 1 return [ s for s in solutions if (s.model_likelihood + offset) >= (self.likelihood_cutoff * (best_likelihood + offset)) ]
[docs] def filter_by_volume(self, solutions): # filter by volume - prefer solutions with a smaller unit cell min_volume = min(s.crystal.get_unit_cell().volume() for s in solutions) return [ s for s in solutions if s.crystal.get_unit_cell().volume() < (self.volume_cutoff * min_volume) ]
[docs] def filter_by_n_indexed(self, solutions, n_indexed_cutoff=None): if n_indexed_cutoff is None: n_indexed_cutoff = self.n_indexed_cutoff # filter by number of indexed reflections - prefer solutions that # account for more of the diffracted spots max_n_indexed = max(s.n_indexed for s in solutions) return [s for s in solutions if s.n_indexed >= n_indexed_cutoff * max_n_indexed]
[docs] def update_analysis(self): # pre-filter out solutions that only account for a very small # percentage of the indexed spots relative to the best one self.filtered_solutions = self.filter_by_n_indexed( self.all_solutions, n_indexed_cutoff=0.05 ) # 5 percent if self.check_doubled_cell: self.filtered_solutions = filter_doubled_cell(self.filtered_solutions) self.filtered_solutions = self.filter_by_likelihood(self.filtered_solutions) self.filtered_solutions = self.filter_by_volume(self.filtered_solutions) self.filtered_solutions = self.filter_by_n_indexed(self.filtered_solutions) return
[docs] def best_solution(self): self.best_filtered_liklihood = max( s.model_likelihood for s in self.filtered_solutions ) solutions = [ s for s in self.filtered_solutions if s.model_likelihood == self.best_filtered_liklihood ] return solutions[0]
def __str__(self): rows = [] rows.append( ["unit_cell", "volume", "n_indexed", "fraction_indexed", "likelihood"] ) for i, s in enumerate(self.all_solutions): s = self.all_solutions[i] rows.append( [ format( s.crystal.get_unit_cell(), "{:.2f} {:.2f} {:.2f} {:.1f} {:.1f} {:.1f}", ), "%.0f" % s.crystal.get_unit_cell().volume(), str(s.n_indexed), "%.0f" % (s.fraction_indexed * 100), "%.2f" % s.model_likelihood, ] ) from libtbx import table_utils return table_utils.format(rows=rows, has_header=True)
[docs]class SolutionTrackerWeighted(object): def __init__(self, power=2, volume_weight=1, n_indexed_weight=1, rmsd_weight=1): self.volume_weight = volume_weight self.n_indexed_weight = n_indexed_weight self.rmsd_weight = rmsd_weight self.power = power self.all_solutions = []
[docs] def append(self, item): self.all_solutions.append(item)
def __len__(self): return len(self.all_solutions)
[docs] def score_by_volume(self, reverse=False): # smaller volume = better volumes = flex.double( s.crystal.get_unit_cell().volume() for s in self.all_solutions ) score = flex.log(volumes) / math.log(2) return self.volume_weight * (score - flex.min(score))
[docs] def score_by_rmsd_xy(self, reverse=False): # smaller rmsds = better rmsd_x, rmsd_y, rmsd_z = flex.vec3_double( s.rmsds for s in self.all_solutions ).parts() rmsd_xy = flex.sqrt(flex.pow2(rmsd_x) + flex.pow2(rmsd_y)) score = flex.log(rmsd_xy) / math.log(2) return self.rmsd_weight * (score - flex.min(score))
[docs] def score_by_rmsd_z(self, reverse=False): # smaller rmsds = better rmsd_x, rmsd_y, rmsd_z = flex.vec3_double( s.rmsds for s in self.all_solutions ).parts() score = flex.log(rmsd_z) / math.log(2) return score - flex.min(score)
[docs] def score_by_fraction_indexed(self, reverse=False): # more indexed reflections = better fraction_indexed = flex.double(s.fraction_indexed for s in self.all_solutions) score = flex.log(fraction_indexed) / math.log(2) return self.n_indexed_weight * (-score + flex.max(score))
[docs] def best_solution(self): scores = self.solution_scores() perm = flex.sort_permutation(scores) return self.all_solutions[perm[0]]
[docs] def solution_scores(self): scores = sum( flex.pow(score.as_double(), self.power) for score in ( self.score_by_fraction_indexed(), self.score_by_volume(), self.score_by_rmsd_xy(), # self.score_by_rmsd_z(), ) ) return scores
def __str__(self): rows = [] rows.append( [ "unit_cell", "volume", "volume score", "#indexed", "% indexed", "% indexed score", "rmsd_xy", "rmsd_xy score", #'rmsd_z', 'rank_rmsd_z', "overall score", ] ) score_by_fraction_indexed = self.score_by_fraction_indexed() score_by_volume = self.score_by_volume() score_by_rmsd_xy = self.score_by_rmsd_xy() score_by_rmsd_z = self.score_by_rmsd_z() overall_scores = self.solution_scores() perm = flex.sort_permutation(overall_scores) rmsd_x, rmsd_y, rmsd_z = flex.vec3_double( s.rmsds for s in self.all_solutions ).parts() rmsd_xy = flex.sqrt(flex.pow2(rmsd_x) + flex.pow2(rmsd_y)) rmsd_z *= 180 / math.pi for i in perm: s = self.all_solutions[i] rows.append( [ format( s.crystal.get_unit_cell(), "{:.2f} {:.2f} {:.2f} {:.1f} {:.1f} {:.1f}", ), "%.0f" % s.crystal.get_unit_cell().volume(), "%.2f" % score_by_volume[i], str(s.n_indexed), "%.0f" % (s.fraction_indexed * 100), "%.2f" % score_by_fraction_indexed[i], "%.2f" % rmsd_xy[i], "%.2f" % score_by_rmsd_xy[i], # "%.2f" %rmsd_z[i], "%.2f" %score_by_rmsd_z[i], "%.2f" % overall_scores[i], ] ) from libtbx import table_utils return table_utils.format(rows=rows, has_header=True)
[docs]def detect_non_primitive_basis(miller_indices, threshold=0.9): from rstbx.indexing_api import tools for test in tools.R: cum = tools.cpp_absence_test(miller_indices, test["mod"], test["vec"]) for counter in xrange(test["mod"]): if float(cum[counter]) / miller_indices.size() > threshold and counter == 0: # (if counter != 0 there is no obvious way to correct this) logger.debug( "Detected exclusive presence of %dH %dK %dL = %dn, remainder %d" % ( test["vec"][0], test["vec"][1], test["vec"][2], test["mod"], counter, ) ) logger.debug( "%s, %s, %s" % ( test["vec"], test["mod"], float(cum[counter]) / miller_indices.size(), ) ) # flag = {'vec':test['vec'],'mod':test['mod'], #'remainder':counter, 'trans':test['trans'].elems} return test["trans"]
[docs]def optimise_basis_vectors(reciprocal_lattice_points, vectors): optimised = flex.vec3_double() for vector in vectors: minimised = basis_vector_minimser(reciprocal_lattice_points, vector) optimised.append(tuple(minimised.x)) return optimised
from scitbx import lbfgs # Optimise the initial basis vectors as per equation 11.4.3.4 of # Otwinowski et al, International Tables Vol. F, chapter 11.4 pp. 282-295
[docs]class basis_vector_target(object): def __init__(self, reciprocal_lattice_points): self.reciprocal_lattice_points = reciprocal_lattice_points self._xyz_parts = self.reciprocal_lattice_points.parts()
[docs] def compute_functional_and_gradients(self, vector): assert len(vector) == 3 two_pi_S_dot_v = 2 * math.pi * self.reciprocal_lattice_points.dot(vector) f = -flex.sum(flex.cos(two_pi_S_dot_v)) sin_part = flex.sin(two_pi_S_dot_v) g = flex.double( [flex.sum(2 * math.pi * self._xyz_parts[i] * sin_part) for i in range(3)] ) return f, g
[docs]class basis_vector_minimser(object): def __init__( self, reciprocal_lattice_points, vector, lbfgs_termination_params=None, lbfgs_core_params=lbfgs.core_parameters(m=20), ): self.reciprocal_lattice_points = reciprocal_lattice_points if not isinstance(vector, flex.double): self.x = flex.double(vector) else: self.x = vector.deep_copy() self.n = len(self.x) assert self.n == 3 self.target = basis_vector_target(self.reciprocal_lattice_points) self.minimizer = lbfgs.run( target_evaluator=self, termination_params=lbfgs_termination_params, core_params=lbfgs_core_params, ) # print "number of iterations:", self.minimizer.iter()
[docs] def compute_functional_and_gradients(self): f, g = self.target.compute_functional_and_gradients(tuple(self.x)) # g_fd = _gradient_fd(self.target, tuple(self.x)) # from libtbx.test_utils import approx_equal # assert approx_equal(g, g_fd, eps=1e-3) return f, g
[docs] def callback_after_step(self, minimizer): # print tuple(self.x) return
def _gradient_fd(target, vector, eps=1e-6): grads = [] for i in range(len(vector)): v = list(vector) v[i] -= eps tm, _ = target.compute_functional_and_gradients(v) v[i] += 2 * eps tp, _ = target.compute_functional_and_gradients(v) grads.append((tp - tm) / (2 * eps)) return grads
[docs]def reject_weight_outliers_selection(reflections, sigma_cutoff=5): from scitbx.math import basic_statistics variances = flex.vec3_double([r.centroid_variance for r in reflections]) selection = None for v in variances.parts(): w = 1 / v ln_w = flex.log(w) stats = basic_statistics(ln_w) sel = ln_w < ( sigma_cutoff * stats.bias_corrected_standard_deviation + stats.mean ) if selection is None: selection = sel else: selection &= sel return selection
[docs]def hist_outline(hist): step_size = hist.slot_width() half_step_size = 0.5 * step_size n_slots = len(hist.slots()) bins = flex.double(n_slots * 2 + 2, 0) data = flex.double(n_slots * 2 + 2, 0) for i in range(n_slots): bins[2 * i + 1] = hist.slot_centers()[i] - half_step_size bins[2 * i + 2] = hist.slot_centers()[i] + half_step_size data[2 * i + 1] = hist.slots()[i] data[2 * i + 2] = hist.slots()[i] bins[0] = bins[1] - step_size bins[-1] = bins[-2] + step_size data[0] = 0 data[-1] = 0 return (bins, data)
[docs]def plot_centroid_weights_histograms(reflections, n_slots=50): from matplotlib import pyplot variances = flex.vec3_double([r.centroid_variance for r in reflections]) vx, vy, vz = variances.parts() wx = 1 / vx wy = 1 / vy wz = 1 / vz # hx = flex.histogram(vx, n_slots=n_slots) # hy = flex.histogram(vy, n_slots=n_slots) # hz = flex.histogram(vz, n_slots=n_slots) wx = flex.log(wx) wy = flex.log(wy) wz = flex.log(wz) hx = flex.histogram(wx, n_slots=n_slots) hy = flex.histogram(wy, n_slots=n_slots) hz = flex.histogram(wz, n_slots=n_slots) fig = pyplot.figure() # outliers = reflections.select(wx > 50) # for refl in outliers: # print refl for i, h in enumerate([hx, hy, hz]): ax = fig.add_subplot(311 + i) slots = h.slots().as_double() bins, data = hist_outline(h) log_scale = True if log_scale: data.set_selected( data == 0, 0.1 ) # otherwise lines don't get drawn when we have some empty bins ax.set_yscale("log") ax.plot(bins, data, "-k", linewidth=2) # pyplot.suptitle(title) data_min = min([slot.low_cutoff for slot in h.slot_infos() if slot.n > 0]) data_max = max([slot.low_cutoff for slot in h.slot_infos() if slot.n > 0]) ax.set_xlim(data_min, data_max + h.slot_width()) pyplot.show()
[docs]def apply_hkl_offset(indices, offset): h, k, l = indices.as_vec3_double().parts() h += offset[0] k += offset[1] l += offset[2] return flex.miller_index(h.iround(), k.iround(), l.iround())