This documentation page refers to a previous release of DIALS (1.14).
Click here to go to the corresponding page for the latest version of DIALS

Source code for dials.algorithms.refinement.refiner

#!/usr/bin/env python
#
# dials.algorithms.refinement.refiner.py
#
#  Copyright (C) 2013 Diamond Light Source and STFC Rutherford Appleton
#                     Laboratory, UK.
#
#  Authors: James Parkhurst, David Waterman
#
#  This code is distributed under the BSD license, a copy of which is
#  included in the root directory of this package.

"""Refiner is the refinement module public interface. RefinerFactory is
what should usually be used to construct a Refiner."""

from __future__ import absolute_import, division
from __future__ import print_function
import copy
import logging

logger = logging.getLogger(__name__)

from dxtbx.model.experiment_list import ExperimentList
from dials.array_family import flex
from dials.algorithms.refinement.refinement_helpers import ordinal_number
from libtbx.phil import parse
from dials.util import Sorry
import libtbx

# The include scope directive does not work here. For example:
#
#   include scope dials.algorithms.refinement.outlier_detection.phil_scope
#
# results in:
#
#   AttributeError: 'module' object has no attribute 'refinement'
#
# to work around this, just include external phil scopes as strings
from dials.algorithms.refinement.reflection_manager import (
    phil_str as reflections_phil_str,
)
from dials.algorithms.refinement.target import phil_str as target_phil_str
from dials.algorithms.refinement.parameterisation import (
    phil_str as parameterisation_phil_str,
)
from dials.algorithms.refinement.engine import refinery_phil_str

format_data = {
    "reflections_phil": reflections_phil_str,
    "target_phil": target_phil_str,
    "parameterisation_phil": parameterisation_phil_str,
    "refinery_phil": refinery_phil_str,
}
phil_scope = parse(
    """
refinement
  .help = "Parameters to configure the refinement"
{

  mp
    .expert_level = 2
  {
    nproc = 1
      .type = int(value_min=1)
      .help = "The number of processes to use. Not all choices of refinement"
              "engine support nproc > 1. Where multiprocessing is possible,"
              "it is helpful only in certain circumstances, so this is not"
              "recommended for typical use."
  }

  verbosity = 0
    .help = "verbosity level"
    .type = int(value_min=0)
    .expert_level = 1

  parameterisation
    .help = "Parameters to control the parameterisation of experimental models"
  {
    %(parameterisation_phil)s
  }

  %(refinery_phil)s

  target
    .help = "Parameters to configure the target function"
    .expert_level = 1
  {
    %(target_phil)s
  }

  reflections
    .help = "Parameters used by the reflection manager"
  {
    %(reflections_phil)s
  }

}
"""
    % format_data,
    process_includes=True,
)


def _copy_experiments_for_refining(experiments):
    """
    Make a partial copy of experiments, copying properties used in refinement.

    Any experiment property that can be altered by refinement e.g. Beam,
    Detector and Goniometer will be deep-copied, whereas anything that the
    refiner doesn't touch (e.g. Scan, ImageSet) will be left as original
    references.

    This makes it safe to pass an object into the refiner or get an object
    out of the refiner without having to worry about your copy being
    unexpectedly altered, but saves time by avoiding the copying of potentially
    expensive experiment properties (e.g. ImageSet and its attributes).

    Args
      experiments (Experiment or ExperimentList or Iterable[Experiment]):

    Returns:
      ExperimentList: The copied experiments in new ExperimentList
    """
    # Look for a non-list e.g. a single experiment and convert to a list
    if not hasattr(experiments, "__iter__"):
        experiments = [experiments]
    out_list = ExperimentList()
    # Save a map of object id to copies so shared objects remain shared
    id_memo = {}

    # Copy each experiment individually
    for experiment in experiments:
        # Be inclusive about the initial copy
        new_exp = copy.copy(experiment)

        # Ensure every 'refined' attribute is uniquely copied
        for model in ["beam", "goniometer", "detector", "crystal"]:
            original = getattr(experiment, model)
            if id(original) not in id_memo:
                id_memo[id(original)] = copy.deepcopy(original)
            # assign the new copy to the experiment
            setattr(new_exp, model, id_memo[id(original)])

        # Collect this together
        out_list.append(new_exp)

    return out_list


[docs]class RefinerFactory(object): """Factory class to create refiners""" @classmethod def _filter_reflections(cls, reflections): """Return a copy of the input reflections filtered to keep only those columns that are required by refinement""" cols = [ "id", "miller_index", "panel", "s1", "xyzobs.mm.value", "xyzobs.px.value", "xyzcal.px", "xyzobs.mm.variance", "flags", "delpsical.weights", ] # NB xyzobs.px.value & xyzcal.px required by SauterPoon outlier rejector # NB delpsical.weights is used by ExternalDelPsiWeightingStrategy rt = flex.reflection_table() # copy columns to the new table. Could use the select method # for this except that 's1' is optional in the input so would want # to copy that in like this if present anyway for k in cols: if k in reflections.keys(): rt[k] = reflections[k] return rt
[docs] @classmethod def from_parameters_data_experiments( cls, params, reflections, experiments, verbosity=None ): # TODO Checks on the input # E.g. does every experiment contain at least one overlapping model with at # least one other experiment? Are all the experiments either rotation series # or stills (the combination of both not yet supported)? # if no verbosity override is given, take from the parameters if verbosity is None: verbosity = params.refinement.verbosity # copy the experiments experiments = _copy_experiments_for_refining(experiments) # copy and filter the reflections reflections = cls._filter_reflections(reflections) return cls._build_components( params, reflections, experiments, verbosity=verbosity )
@classmethod def _build_components(cls, params, reflections, experiments, verbosity): """low level build""" if verbosity == 0: logging.getLogger("dials.algorithms.refinement").setLevel(logging.WARNING) # Currently a refinement job can only have one parameterisation of the # prediction equation. This can either be of the XYDelPsi (stills) type, the # XYPhi (scans) type or the scan-varying XYPhi type with a varying crystal # model single_as_still = params.refinement.parameterisation.treat_single_image_as_still exps_are_stills = [] for exp in experiments: if exp.scan is None: exps_are_stills.append(True) elif exp.scan.get_num_images() == 1: if single_as_still: exps_are_stills.append(True) elif exp.scan.get_oscillation()[1] == 0.0: exps_are_stills.append(True) else: exps_are_stills.append(False) else: if exp.scan.get_oscillation()[1] <= 0.0: raise Sorry("Cannot refine a zero-width scan") exps_are_stills.append(False) # check experiment types are consistent if not all(exps_are_stills[0] == e for e in exps_are_stills): raise Sorry("Cannot refine a mixture of stills and scans") do_stills = exps_are_stills[0] # calculate reflection block_width if required for scan-varying refinement if params.refinement.parameterisation.scan_varying: from dials.algorithms.refinement.reflection_manager import BlockCalculator block_calculator = BlockCalculator(experiments, reflections) if params.refinement.parameterisation.compose_model_per == "block": reflections = block_calculator.per_width( params.refinement.parameterisation.block_width, deg=True ) elif params.refinement.parameterisation.compose_model_per == "image": reflections = block_calculator.per_image() logger.debug("\nBuilding reflection manager") logger.debug("Input reflection list size = %d observations", len(reflections)) # create reflection manager from dials.algorithms.refinement.reflection_manager import ( ReflectionManagerFactory, ) refman = ReflectionManagerFactory.from_parameters_reflections_experiments( params.refinement.reflections, reflections, experiments, do_stills, verbosity, ) logger.debug( "Number of observations that pass initial inclusion criteria = %d", refman.get_accepted_refs_size(), ) sample_size = refman.get_sample_size() if sample_size > 0: logger.debug("Working set size = %d observations", sample_size) logger.debug("Reflection manager built\n") # configure use of sparse data types params = cls.config_sparse(params, experiments) do_sparse = params.refinement.parameterisation.sparse # create managed reflection predictor from dials.algorithms.refinement.prediction.managed_predictors import ( ExperimentsPredictorFactory, ) ref_predictor = ExperimentsPredictorFactory.from_experiments( experiments, force_stills=do_stills, spherical_relp=params.refinement.parameterisation.spherical_relp_model, ) # Predict for the managed observations, set columns for residuals and set # the used_in_refinement flag to the predictions obs = refman.get_obs() ref_predictor(obs) x_obs, y_obs, phi_obs = obs["xyzobs.mm.value"].parts() x_calc, y_calc, phi_calc = obs["xyzcal.mm"].parts() obs["x_resid"] = x_calc - x_obs obs["y_resid"] = y_calc - y_obs obs["phi_resid"] = phi_calc - phi_obs # determine whether to do basic centroid analysis to automatically # determine outlier rejection block if params.refinement.reflections.outlier.block_width is libtbx.Auto: ca = refman.get_centroid_analyser() analysis = ca(calc_average_residuals=False, calc_periodograms=False) else: analysis = None # Now predictions and centroid analysis are available, so we can finalise # the reflection manager refman.finalise(analysis) # Create model parameterisations logger.debug("Building prediction equation parameterisation") pred_param, param_reporter = cls.config_parameterisation( params.refinement.parameterisation, experiments, refman, do_stills ) logger.debug("Prediction equation parameterisation built") logger.debug("Parameter order : name mapping") for i, e in enumerate(pred_param.get_param_names()): logger.debug("Parameter %03d : %s", i + 1, e) # Build a restraints parameterisation (if requested). # Only unit cell restraints are supported at the moment. restraints_parameterisation = cls.config_restraints( params.refinement.parameterisation, pred_param ) # Build a constraints manager, if requested from dials.algorithms.refinement.constraints import ConstraintManagerFactory cmf = ConstraintManagerFactory(params, pred_param, verbosity) constraints_manager = cmf() # Create target function logger.debug("Building target function") target = cls.config_target( params.refinement.target, experiments, refman, ref_predictor, pred_param, restraints_parameterisation, do_stills, do_sparse, ) logger.debug("Target function built") # create refinery logger.debug("Building refinement engine") refinery = cls.config_refinery( params, target, pred_param, constraints_manager, verbosity ) logger.debug("Refinement engine built") # build refiner interface and return if params.refinement.parameterisation.scan_varying: refiner = ScanVaryingRefiner else: refiner = Refiner return refiner( reflections, experiments, pred_param, param_reporter, refman, target, refinery, verbosity=verbosity, )
[docs] @staticmethod def config_sparse(params, experiments): """Configure whether to use sparse datatypes""" # Automatic selection for sparse parameter if params.refinement.parameterisation.sparse == libtbx.Auto: if len(experiments) > 1: params.refinement.parameterisation.sparse = True else: params.refinement.parameterisation.sparse = False if params.refinement.refinery.engine == "SparseLevMar": params.refinement.parameterisation.sparse = True if params.refinement.mp.nproc > 1: if params.refinement.refinery.engine != "SparseLevMar": # sparse vectors cannot be pickled, so can't use easy_mp here params.refinement.parameterisation.sparse = False else: pass # but SparseLevMar requires sparse jacobian; does not implement mp # Check incompatible selection elif ( params.refinement.parameterisation.sparse and params.refinement.mp.nproc > 1 ): logger.warning( "Could not set sparse=True and nproc={0}".format( params.refinement.mp.nproc ) ) logger.warning("Resetting sparse=False") params.refinement.parameterisation.sparse = False return params
[docs] @staticmethod def config_parameterisation(params, experiments, refman, do_stills=False): from dials.algorithms.refinement.parameterisation import ( build_prediction_parameterisation, ) pred_param = build_prediction_parameterisation( params, experiments, refman, do_stills ) # Parameter reporting from dials.algorithms.refinement.parameterisation.parameter_report import ( ParameterReporter, ) param_reporter = ParameterReporter( pred_param.get_detector_parameterisations(), pred_param.get_beam_parameterisations(), pred_param.get_crystal_orientation_parameterisations(), pred_param.get_crystal_unit_cell_parameterisations(), pred_param.get_goniometer_parameterisations(), ) return pred_param, param_reporter
[docs] @staticmethod def config_restraints(params, pred_param): """Given a set of user parameters plus a model parameterisation, create restraints plus a parameterisation of these restraints Params: params: The input PHIL parameters pred_param: A PredictionParameters object Returns: A restraints parameterisation or None """ if not any( [ params.crystal.unit_cell.restraints.tie_to_target, params.crystal.unit_cell.restraints.tie_to_group, ] ): return None det_params = pred_param.get_detector_parameterisations() beam_params = pred_param.get_beam_parameterisations() xl_ori_params = pred_param.get_crystal_orientation_parameterisations() xl_uc_params = pred_param.get_crystal_unit_cell_parameterisations() gon_params = pred_param.get_goniometer_parameterisations() from dials.algorithms.refinement.restraints import RestraintsParameterisation rp = RestraintsParameterisation( detector_parameterisations=det_params, beam_parameterisations=beam_params, xl_orientation_parameterisations=xl_ori_params, xl_unit_cell_parameterisations=xl_uc_params, goniometer_parameterisations=gon_params, ) # Shorten params path # FIXME Only unit cell restraints currently supported # beam_r = params.beam.restraints cell_r = params.crystal.unit_cell.restraints # orientation_r = params.crystal.orientation.restraints # detector_r = params.detector.restraints for tie in cell_r.tie_to_target: if len(tie.values) != 6: raise Sorry( "6 cell parameters must be provided as the tie_to_target.values." ) if len(tie.sigmas) != 6: raise Sorry( "6 sigmas must be provided as the tie_to_target.sigmas. " "Note that individual sigmas of 0.0 will remove " "the restraint for the corresponding cell parameter." ) if tie.id is None: # get one experiment id for each parameterisation to apply to all tie.id = [e.get_experiment_ids()[0] for e in xl_uc_params] for exp_id in tie.id: rp.add_restraints_to_target_xl_unit_cell(exp_id, tie.values, tie.sigmas) for tie in cell_r.tie_to_group: if len(tie.sigmas) != 6: raise Sorry( "6 sigmas must be provided as the tie_to_group.sigmas. " "Note that individual sigmas of 0.0 will remove " "the restraint for the corresponding cell parameter." ) if tie.id is None: rp.add_restraints_to_group_xl_unit_cell(tie.target, "all", tie.sigmas) else: rp.add_restraints_to_group_xl_unit_cell(tie.target, tie.id, tie.sigmas) return rp
[docs] @staticmethod def config_refinery(params, target, pred_param, constraints_manager, verbosity): """Given a set of parameters, a target class, a prediction parameterisation class and a constraints_manager (which could be None), build a refinery Params: params The input parameters Returns: The refinery instance """ # Shorten parameter path options = params.refinement.refinery if options.engine == "SimpleLBFGS": from dials.algorithms.refinement.engine import SimpleLBFGS as refinery elif options.engine == "LBFGScurvs": from dials.algorithms.refinement.engine import LBFGScurvs as refinery elif options.engine == "GaussNewton": from dials.algorithms.refinement.engine import ( GaussNewtonIterations as refinery, ) elif options.engine == "LevMar": from dials.algorithms.refinement.engine import ( LevenbergMarquardtIterations as refinery, ) elif options.engine == "SparseLevMar": from dials.algorithms.refinement.sparse_engine import ( SparseLevenbergMarquardtIterations as refinery, ) else: raise RuntimeError( "Refinement engine " + options.engine + " not recognised" ) logger.debug("Selected refinement engine type: %s", options.engine) engine = refinery( target=target, prediction_parameterisation=pred_param, constraints_manager=constraints_manager, log=options.log, verbosity=verbosity, tracking=options.journal, max_iterations=options.max_iterations, ) if params.refinement.mp.nproc > 1: nproc = params.refinement.mp.nproc try: engine.set_nproc(nproc) except NotImplementedError: logger.warning( "Could not set nproc={0} for refinement engine of type {1}".format( nproc, options.engine ) ) return engine
# Overload to allow subclasses of RefinerFactory to use a different # TargetFactory
[docs] @staticmethod def config_target( params, experiments, reflection_manager, predictor, pred_param, restraints_param, do_stills, do_sparse, ): from dials.algorithms.refinement.target import TargetFactory target = TargetFactory.from_parameters_and_experiments( params, experiments, reflection_manager, predictor, pred_param, restraints_param, do_stills, do_sparse, ) return target
[docs]class Refiner(object): """Public interface for performing DIALS refinement. Public methods: run rmsds get_experiments get_matches get_param_reporter parameter_correlation_plot selection_used_for_refinement predict_for_reflection_table predict_for_indexed Notes: * The return value of run is a recorded history of the refinement * The experiments accessor provides a copy of the experiments used by refinement * The model accessors provide copies of those models that might be modified by refinement (beam, crystal and detector) TO BE DEPRECATED * get_matches exposes the function of the same name from the privately stored reflection manager * The return value of selection_used_for_refinement is a flex.bool """ def __init__( self, reflections, experiments, pred_param, param_reporter, refman, target, refinery, verbosity=0, ): """ Mandatory arguments: reflections - Input ReflectionList data experiments - a dxtbx ExperimentList object pred_param - An object derived from the PredictionParameterisation class param_reporter -A ParameterReporter object refman - A ReflectionManager object target - An object derived from the Target class refinery - An object derived from the Refinery class Optional arguments: verbosity - An integer verbosity level """ # the experimental models self._experiments = experiments # refinement module main objects self._pred_param = pred_param self._refman = refman self._target = target self._refinery = refinery # parameter reporter self._param_report = param_reporter self._verbosity = verbosity return
[docs] def get_experiments(self): """Return a copy of the current refiner experiments""" return _copy_experiments_for_refining(self._experiments)
[docs] def rmsds(self): """Return rmsds of the current model""" # ensure predictions for the matches are up to date self._refinery.prepare_for_step() return self._target.rmsds()
[docs] def rmsds_for_reflection_table(self, reflections): """Calculate unweighted RMSDs for the specified reflections""" # ensure predictions for these reflections are up to date preds = self.predict_for_reflection_table(reflections) return self._target.rmsds_for_reflection_table(preds)
[docs] def get_matches(self): """Delegated to the reflection manager""" # FIXME Consider: Does this information really need to be exposed by the # public API (indexing code seems to use it, but is it necessary?) return self._refman.get_matches()
[docs] def get_free_reflections(self): """Delegated to the reflection manager""" return self._refman.get_free_reflections()
[docs] def get_param_reporter(self): """Get the ParameterReport object linked to this Refiner""" return self._param_report
[docs] def get_parameter_correlation_matrix(self, step, col_select=None): """Return the correlation matrix between columns of the Jacobian at the specified refinement step. The parameter col_select can be used to select subsets of the full number of columns. The column labels are also returned as a list of strings""" corrmats = self._refinery.get_correlation_matrix_for_step(step) if corrmats is None: return None, None all_labels = self._pred_param.get_param_names() from dials.algorithms.refinement.refinement_helpers import string_sel if col_select is None: col_select = range(len(all_labels)) sel = string_sel(col_select, all_labels) labels = [e for e, s in zip(all_labels, sel) if s] num_cols = num_rows = len(labels) if num_cols == 0: return None, None for k, corrmat in corrmats.items(): assert corrmat.is_square_matrix() from scitbx.array_family import flex idx = flex.bool(sel).iselection() sub_corrmat = flex.double(flex.grid(num_cols, num_cols)) for (i, x) in enumerate(idx): for (j, y) in enumerate(idx): sub_corrmat[i, j] = corrmat[x, y] corrmats[k] = sub_corrmat return (corrmats, labels)
[docs] def print_step_table(self): """print useful output about refinement steps in the form of a simple table""" from libtbx.table_utils import simple_table from math import pi rad2deg = 180 / pi logger.info("\nRefinement steps:") rmsd_multipliers = [] header = ["Step", "Nref"] for (name, units) in zip(self._target.rmsd_names, self._target.rmsd_units): if units == "mm": header.append(name + "\n(mm)") rmsd_multipliers.append(1.0) elif units == "rad": # convert radians to degrees for reporting header.append(name + "\n(deg)") rmsd_multipliers.append(rad2deg) else: # leave unknown units alone header.append(name + "\n(" + units + ")") rows = [] for i in range(self._refinery.history.get_nrows()): rmsds = [ r * m for (r, m) in zip(self._refinery.history["rmsd"][i], rmsd_multipliers) ] rows.append( [str(i), str(self._refinery.history["num_reflections"][i])] + ["%.5g" % r for r in rmsds] ) st = simple_table(rows, header) logger.info(st.format()) logger.info(self._refinery.history.reason_for_termination) return
[docs] def print_out_of_sample_rmsd_table(self): """print out-of-sample RSMDs per step, if these were tracked""" from libtbx.table_utils import simple_table from math import pi rad2deg = 180 / pi # check if it makes sense to proceed if not "out_of_sample_rmsd" in self._refinery.history: return nref = len(self.get_free_reflections()) if nref < 10: return # don't do anything if very few refs logger.info("\nRMSDs for out-of-sample (free) reflections:") rmsd_multipliers = [] header = ["Step", "Nref"] for (name, units) in zip(self._target.rmsd_names, self._target.rmsd_units): if units == "mm": header.append(name + "\n(mm)") rmsd_multipliers.append(1.0) elif units == "rad": # convert radians to degrees for reporting header.append(name + "\n(deg)") rmsd_multipliers.append(rad2deg) else: # leave unknown units alone header.append(name + "\n(" + units + ")") rows = [] for i in range(self._refinery.history.get_nrows()): rmsds = [ r * m for r, m in zip( self._refinery.history["out_of_sample_rmsd"][i], rmsd_multipliers ) ] rows.append([str(i), str(nref)] + ["%.5g" % e for e in rmsds]) st = simple_table(rows, header) logger.info(st.format()) return
[docs] def print_exp_rmsd_table(self): """print useful output about refinement steps in the form of a simple table""" from libtbx.table_utils import simple_table from math import pi rad2deg = 180 / pi logger.info("\nRMSDs by experiment:") header = ["Exp\nid", "Nref"] for (name, units) in zip(self._target.rmsd_names, self._target.rmsd_units): if name == "RMSD_X" or name == "RMSD_Y" and units == "mm": header.append(name + "\n(px)") elif name == "RMSD_Phi" and units == "rad": # will convert radians to images for reporting of scans header.append("RMSD_Z" + "\n(images)") elif units == "rad": # will convert other angles in radians to degrees (e.g. for # RMSD_DeltaPsi and RMSD_2theta) header.append(name + "\n(deg)") else: # skip other/unknown RMSDs pass rows = [] for iexp, exp in enumerate(self._experiments): detector = exp.detector px_sizes = [p.get_pixel_size() for p in detector] it = iter(px_sizes) px_size = next(it) if not all(tst == px_size for tst in it): logger.info( "The detector in experiment %d does not have the same pixel " + "sizes on each panel. Skipping...", iexp, ) continue px_per_mm = [1.0 / e for e in px_size] scan = exp.scan try: temp = scan.get_oscillation(deg=False) images_per_rad = 1.0 / abs(scan.get_oscillation(deg=False)[1]) except (AttributeError, ZeroDivisionError): images_per_rad = None raw_rmsds = self._target.rmsds_for_experiment(iexp) if raw_rmsds is None: continue # skip experiments where rmsd cannot be calculated num = self._target.get_num_matches_for_experiment(iexp) rmsds = [] for (name, units, rmsd) in zip( self._target.rmsd_names, self._target.rmsd_units, raw_rmsds ): if name == "RMSD_X" and units == "mm": rmsds.append(rmsd * px_per_mm[0]) elif name == "RMSD_Y" and units == "mm": rmsds.append(rmsd * px_per_mm[1]) elif name == "RMSD_Phi" and units == "rad": rmsds.append(rmsd * images_per_rad) elif units == "rad": rmsds.append(rmsd * rad2deg) rows.append([str(iexp), str(num)] + ["%.5g" % r for r in rmsds]) if len(rows) > 0: truncated = False max_rows = 100 if self._verbosity < 3 and len(rows) > max_rows: rows = rows[0:max_rows] truncated = True st = simple_table(rows, header) logger.info(st.format()) if truncated: logger.info( "Table truncated to show the first %d experiments only", max_rows ) logger.info("Re-run with verbosity >= 3 to show all experiments") return
[docs] def print_panel_rmsd_table(self): """print useful output about refinement steps in the form of a simple table""" from libtbx.table_utils import simple_table from math import pi rad2deg = 180 / pi if len(self._experiments.scans()) > 1: logger.warning( "Multiple scans present. Only the first scan will be used " "to determine the image width for reporting RMSDs" ) scan = self._experiments.scans()[0] try: temp = scan.get_oscillation(deg=False) images_per_rad = 1.0 / abs(scan.get_oscillation(deg=False)[1]) except AttributeError: images_per_rad = None for idetector, detector in enumerate(self._experiments.detectors()): if len(detector) == 1: continue logger.info("\nDetector {0} RMSDs by panel:".format(idetector + 1)) header = ["Panel\nid", "Nref"] for (name, units) in zip(self._target.rmsd_names, self._target.rmsd_units): if name == "RMSD_X" or name == "RMSD_Y" and units == "mm": header.append(name + "\n(px)") elif ( name == "RMSD_Phi" and units == "rad" ): # convert radians to images for reporting of scans header.append("RMSD_Z" + "\n(images)") elif ( name == "RMSD_DeltaPsi" and units == "rad" ): # convert radians to degrees for reporting of stills header.append(name + "\n(deg)") else: # skip RMSDs that cannot be expressed in image/scan space pass rows = [] for ipanel, panel in enumerate(detector): px_size = panel.get_pixel_size() px_per_mm = [1.0 / e for e in px_size] num = self._target.get_num_matches_for_panel(ipanel) if num <= 0: continue raw_rmsds = self._target.rmsds_for_panel(ipanel) if raw_rmsds is None: continue # skip panels where rmsd cannot be calculated rmsds = [] for (name, units, rmsd) in zip( self._target.rmsd_names, self._target.rmsd_units, raw_rmsds ): if name == "RMSD_X" and units == "mm": rmsds.append(rmsd * px_per_mm[0]) elif name == "RMSD_Y" and units == "mm": rmsds.append(rmsd * px_per_mm[1]) elif name == "RMSD_Phi" and units == "rad": rmsds.append(rmsd * images_per_rad) elif name == "RMSD_DeltaPsi" and units == "rad": rmsds.append(rmsd * rad2deg) rows.append([str(ipanel), str(num)] + ["%.5g" % r for r in rmsds]) if len(rows) > 0: st = simple_table(rows, header) logger.info(st.format()) return
[docs] def run(self): """Run refinement""" #################################### # Do refinement and return history # #################################### if self._verbosity > 1: logger.debug("\nExperimental models before refinement:") for i, beam in enumerate(self._experiments.beams()): logger.debug(ordinal_number(i) + " " + str(beam)) for i, detector in enumerate(self._experiments.detectors()): logger.debug(ordinal_number(i) + " " + str(detector)) for i, goniometer in enumerate(self._experiments.goniometers()): if goniometer is None: continue logger.debug(ordinal_number(i) + " " + str(goniometer)) for i, scan in enumerate(self._experiments.scans()): if scan is None: continue logger.debug(ordinal_number(i) + " " + str(scan)) for i, crystal in enumerate(self._experiments.crystals()): logger.debug(ordinal_number(i) + " " + str(crystal)) self._refinery.run() # These involve calculation, so skip them when verbosity is zero if self._verbosity > 0: self.print_step_table() self.print_out_of_sample_rmsd_table() self.print_exp_rmsd_table() det_npanels = [len(d) for d in self._experiments.detectors()] if any(n > 1 for n in det_npanels): self.print_panel_rmsd_table() # Perform post-run tasks to write the refined states back to the models self._update_models() if self._verbosity > 1: logger.debug("\nExperimental models after refinement:") for i, beam in enumerate(self._experiments.beams()): logger.debug(ordinal_number(i) + " " + str(beam)) for i, detector in enumerate(self._experiments.detectors()): logger.debug(ordinal_number(i) + " " + str(detector)) for i, goniometer in enumerate(self._experiments.goniometers()): if goniometer is None: continue logger.debug(ordinal_number(i) + " " + str(goniometer)) for i, scan in enumerate(self._experiments.scans()): if scan is None: continue logger.debug(ordinal_number(i) + " " + str(scan)) for i, crystal in enumerate(self._experiments.crystals()): logger.debug(ordinal_number(i) + " " + str(crystal)) # Report on the refined parameters logger.debug(str(self._param_report)) # Return the refinement history return self._refinery.history
def _update_models(self): """Perform any extra tasks required to update the models after refinement. Does nothing here, but used by subclasses""" pass
[docs] def selection_used_for_refinement(self): """Return a selection as a flex.bool in terms of the input reflection data of those reflections that were used in the final step of refinement.""" from scitbx.array_family import flex matches = self._refman.get_matches() selection = flex.bool(len(self._refman.get_indexed()), False) try: # new reflection table format for matches isel = matches["iobs"] selection.set_selected(isel, True) except TypeError: # old ObsPredMatch format for matches for m in matches: selection[m.iobs] = True return selection
[docs] def predict_for_indexed(self): """perform prediction for all the indexed reflections passed into refinement and additionally set the used_in_refinement flag. Do not compose the derivatives of states of the model as this is expensive and they are not needed outside of a refinement run""" reflections = self.predict_for_reflection_table( self._refman.get_indexed(), skip_derivatives=True ) reflections.sort("iobs") mask = self.selection_used_for_refinement() reflections.set_flags(mask, reflections.flags.used_in_refinement) return reflections
[docs] def predict_for_reflection_table(self, reflections, skip_derivatives=False): """perform prediction for all reflections in the supplied table""" # delegate to the target object, which has access to the predictor return self._target.predict_for_reflection_table(reflections, skip_derivatives)
[docs]class ScanVaryingRefiner(Refiner): """Includes functionality to update the models with their states at scan-points after scan-varying refinement""" def _update_models(self): for iexp, exp in enumerate(self._experiments): ar_range = exp.scan.get_array_range() obs_image_numbers = range(ar_range[0], ar_range[1] + 1) # write scan-varying s0 vectors back to beam models s0_list = self._pred_param.get_varying_s0(obs_image_numbers, iexp) if s0_list is not None: exp.beam.set_s0_at_scan_points(s0_list) # write scan-varying setting rotation matrices back to goniometer models S_list = self._pred_param.get_varying_setting_rotation( obs_image_numbers, iexp ) if S_list is not None: exp.goniometer.set_setting_rotation_at_scan_points(S_list) # write scan-varying crystal setting matrices back to crystal models A_list = self._pred_param.get_varying_UB(obs_image_numbers, iexp) if A_list is not None: exp.crystal.set_A_at_scan_points(A_list) # get state covariance matrices the whole range of images. We select # the first element of this at each image because crystal scan-varying # parameterisations are not multi-state state_cov_list = [ self._pred_param.calculate_model_state_uncertainties( obs_image_number=t, experiment_id=iexp ) for t in range(ar_range[0], ar_range[1] + 1) ] if "U_cov" in state_cov_list[0]: u_cov_list = [e["U_cov"] for e in state_cov_list] else: u_cov_list = None if "B_cov" in state_cov_list[0]: b_cov_list = [e["B_cov"] for e in state_cov_list] else: b_cov_list = None # return these to the model parameterisations to be set in the models self._pred_param.set_model_state_uncertainties(u_cov_list, b_cov_list, iexp)