Source code for dials.algorithms.refinement.refiner

"""Refiner is the refinement module public interface. RefinerFactory is
what should usually be used to construct a Refiner."""

from __future__ import annotations

import copy
import logging
import math

import libtbx
from dxtbx.model.experiment_list import ExperimentList
from libtbx.phil import parse

import dials.util
from dials.algorithms.refinement import DialsRefineConfigError
from dials.algorithms.refinement.constraints import ConstraintManagerFactory
from dials.algorithms.refinement.engine import AdaptLstbx, refinery_phil_str
from dials.algorithms.refinement.parameterisation import (
    build_prediction_parameterisation,
)
from dials.algorithms.refinement.parameterisation import (
    phil_str as parameterisation_phil_str,
)
from dials.algorithms.refinement.parameterisation.autoreduce import AutoReduce
from dials.algorithms.refinement.parameterisation.parameter_report import (
    ParameterReporter,
)
from dials.algorithms.refinement.prediction.managed_predictors import (
    ExperimentsPredictorFactory,
)
from dials.algorithms.refinement.refinement_helpers import ordinal_number, string_sel
from dials.algorithms.refinement.reflection_manager import ReflectionManagerFactory
from dials.algorithms.refinement.reflection_manager import (
    phil_str as reflections_phil_str,
)
from dials.algorithms.refinement.restraints import RestraintsParameterisation
from dials.algorithms.refinement.target import TargetFactory
from dials.algorithms.refinement.target import phil_str as target_phil_str
from dials.array_family import flex
from dials.util.system import MEMORY_LIMIT

logger = logging.getLogger(__name__)

# The include scope directive does not work here. For example:
#
#   include scope dials.algorithms.refinement.outlier_detection.phil_scope
#
# results in:
#
#   AttributeError: 'module' object has no attribute 'refinement'
#
# to work around this, just include external phil scopes as strings
format_data = {
    "reflections_phil": reflections_phil_str,
    "target_phil": target_phil_str,
    "parameterisation_phil": parameterisation_phil_str,
    "refinery_phil": refinery_phil_str,
}
phil_scope = parse(
    """
refinement
  .help = "Parameters to configure the refinement"
{{

  mp
    .expert_level = 2
  {{
    nproc = 1
      .type = int(value_min=1)
      .help = "The number of processes to use. Not all choices of refinement"
              "engine support nproc > 1. Where multiprocessing is possible,"
              "it is helpful only in certain circumstances, so this is not"
              "recommended for typical use."
  }}

  parameterisation
    .help = "Parameters to control the parameterisation of experimental models"
  {{
    {parameterisation_phil}
  }}

  {refinery_phil}

  target
    .help = "Parameters to configure the target function"
    .expert_level = 1
  {{
    {target_phil}
  }}

  reflections
    .help = "Parameters used by the reflection manager"
  {{
    {reflections_phil}
  }}

}}
""".format(**format_data),
    process_includes=True,
)

RAD2DEG = 180 / math.pi


def _copy_experiments_for_refining(experiments):
    """
    Make a partial copy of experiments, copying properties used in refinement.

    Any experiment property that can be altered by refinement e.g. Beam,
    Detector and Goniometer will be deep-copied, whereas anything that the
    refiner doesn't touch (e.g. Scan, ImageSet) will be left as original
    references.

    This makes it safe to pass an object into the refiner or get an object
    out of the refiner without having to worry about your copy being
    unexpectedly altered, but saves time by avoiding the copying of potentially
    expensive experiment properties (e.g. ImageSet and its attributes).

    Args
      experiments (Experiment or ExperimentList or Iterable[Experiment]):

    Returns:
      ExperimentList: The copied experiments in new ExperimentList
    """
    # Look for a non-list e.g. a single experiment and convert to a list
    if not hasattr(experiments, "__iter__"):
        experiments = [experiments]
    out_list = ExperimentList()
    # Save a map of object id to copies so shared objects remain shared
    id_memo = {}

    # Copy each experiment individually
    for experiment in experiments:
        # Be inclusive about the initial copy
        new_exp = copy.copy(experiment)

        # Ensure every 'refined' attribute is uniquely copied
        for model in ["beam", "goniometer", "detector", "crystal"]:
            original = getattr(experiment, model)
            if id(original) not in id_memo:
                id_memo[id(original)] = copy.deepcopy(original)
            # assign the new copy to the experiment
            setattr(new_exp, model, id_memo[id(original)])

        # Collect this together
        out_list.append(new_exp)

    return out_list


def _trim_scans_to_observations(experiments, reflections):
    """Check the range of each scan matches the range of observed data and
    trim the scan to match if it is too wide"""

    # Get observed image number (or at least observed phi)
    obs_phi = reflections["xyzobs.mm.value"].parts()[2]
    try:
        obs_z = reflections["xyzobs.px.value"].parts()[2]
    except KeyError:
        obs_z = None

    # Get z_min and z_max from shoeboxes if present
    try:
        shoebox = reflections["shoebox"]
        bb = shoebox.bounding_boxes()
        z_min, z_max = bb.parts()[4:]
        if z_min.all_eq(0):
            shoebox = None
    except KeyError:
        shoebox = None

    for iexp, exp in enumerate(experiments):
        sel = reflections["id"] == iexp
        isel = sel.iselection()
        if obs_z is not None:
            exp_z = obs_z.select(isel)
        else:
            exp_phi = obs_phi.select(isel)
            exp_z = exp.scan.get_array_index_from_angle(exp_phi, deg=False)

        start, stop = exp.scan.get_array_range()
        min_exp_z = flex.min(exp_z)
        max_exp_z = flex.max(exp_z)

        # If observed array range is correct, skip to next experiment
        if int(min_exp_z) == start and int(math.ceil(max_exp_z)) == stop:
            continue

        # Extend array range either by shoebox size, or 0.5 deg if shoebox not available
        if shoebox is not None:
            obs_start = flex.min(z_min.select(isel))
            obs_stop = flex.max(z_max.select(isel))
        else:
            obs_start = int(min_exp_z)
            obs_stop = int(math.ceil(max_exp_z))
            half_deg_in_images = int(math.ceil(0.5 / exp.scan.get_oscillation()[1]))
            obs_start -= half_deg_in_images
            obs_stop += half_deg_in_images

        # Convert obs_start, obs_stop from position in array range to integer image number
        if obs_start > start or obs_stop < stop:
            im_start = max(start, obs_start) + 1
            im_stop = min(obs_stop, stop)

            logger.warning(
                f"The reflections for experiment {iexp} do not fill the scan range. The scan will be trimmed "
                f"to images {{{im_start},{im_stop}}} to match the range of observed data"
            )

            # Ensure the scan is unique to this experiment and set trimmed limits
            exp.scan = copy.deepcopy(exp.scan)
            new_oscillation = (
                exp.scan.get_angle_from_image_index(im_start),
                exp.scan.get_oscillation()[1],
            )
            exp.scan.set_image_range((im_start, im_stop))
            exp.scan.set_oscillation(new_oscillation)

    return experiments


[docs] class RefinerFactory: """Factory class to create refiners""" @staticmethod def _filter_reflections(reflections): """Return a copy of the input reflections filtered to keep only those columns that are required by refinement""" cols = [ "id", "miller_index", "panel", "s1", "xyzobs.mm.value", "xyzobs.px.value", "xyzcal.px", "xyzobs.mm.variance", "flags", "shoebox", "delpsical.weights", "wavelength", "wavelength_cal", "s0", ] # NB xyzobs.px.value & xyzcal.px required by SauterPoon outlier rejector # NB delpsical.weights is used by ExternalDelPsiWeightingStrategy rt = flex.reflection_table() # copy columns to the new table. Could use the select method # for this except that 's1' is optional in the input so would want # to copy that in like this if present anyway for k in cols: if k in reflections: rt[k] = reflections[k] return rt
[docs] @classmethod def from_parameters_data_experiments(cls, params, reflections, experiments): # copy the experiments experiments = _copy_experiments_for_refining(experiments) # copy and filter the reflections reflections = cls._filter_reflections(reflections) ( experiments, refman, ref_predictor, do_stills, ) = cls._build_reflection_manager_and_predictor( params, reflections, experiments ) return cls._build_refiner(params, experiments, refman, ref_predictor, do_stills)
[docs] @classmethod def reflections_after_outlier_rejection(cls, params, reflections, experiments): # copy the experiments experiments = _copy_experiments_for_refining(experiments) # copy and filter the reflections reflections = cls._filter_reflections(reflections) experiments, refman, _, _ = cls._build_reflection_manager_and_predictor( params, reflections, experiments ) return refman.get_obs()
@classmethod def _build_reflection_manager_and_predictor(cls, params, reflections, experiments): # Currently a refinement job can only have one parameterisation of the # prediction equation. This can either be of the XYDelPsi (stills) type, the # XYPhi (scans) type or the scan-varying XYPhi type with a varying crystal # model single_as_still = params.refinement.parameterisation.treat_single_image_as_still exps_are_stills = [] for exp in experiments: if exp.scan is None: exps_are_stills.append(True) elif exp.scan.get_num_images() == 1: if single_as_still: exps_are_stills.append(True) elif exp.scan.is_still(): exps_are_stills.append(True) else: exps_are_stills.append(False) else: if exp.scan.is_still(): raise DialsRefineConfigError("Cannot refine a zero-width scan") exps_are_stills.append(False) # check experiment types are consistent if not all(exps_are_stills[0] == e for e in exps_are_stills): raise DialsRefineConfigError("Cannot refine a mixture of stills and scans") do_stills = exps_are_stills[0] # If experiments are stills, ensure scan-varying refinement won't be attempted if do_stills: params.refinement.parameterisation.scan_varying = False # Refiner does not accept scan_varying=Auto. This is a special case for # doing macrocycles of refinement in dials.refine. if params.refinement.parameterisation.scan_varying is libtbx.Auto: params.refinement.parameterisation.scan_varying = False # Calculate reflection block_width for scan-varying refinement. Trim scans # to the extent of the observations, if requested. if params.refinement.parameterisation.scan_varying: if params.refinement.parameterisation.trim_scan_to_observations: experiments = _trim_scans_to_observations(experiments, reflections) from dials.algorithms.refinement.reflection_manager import BlockCalculator block_calculator = BlockCalculator(experiments, reflections) if params.refinement.parameterisation.compose_model_per == "block": reflections = block_calculator.per_width( params.refinement.parameterisation.block_width, deg=True ) elif params.refinement.parameterisation.compose_model_per == "image": reflections = block_calculator.per_image() logger.debug("\nBuilding reflection manager") logger.debug("Input reflection list size = %d observations", len(reflections)) # create reflection manager refman = ReflectionManagerFactory.from_parameters_reflections_experiments( params.refinement.reflections, reflections, experiments, do_stills ) logger.debug( "Number of observations that pass initial inclusion criteria = %d", refman.get_accepted_refs_size(), ) sample_size = refman.get_sample_size() if sample_size > 0: logger.debug("Working set size = %d observations", sample_size) logger.debug("Reflection manager built\n") # configure use of sparse data types params = cls.config_sparse(params, experiments) # create managed reflection predictor ref_predictor = ExperimentsPredictorFactory.from_experiments( experiments, force_stills=do_stills, spherical_relp=params.refinement.parameterisation.spherical_relp_model, ) # Predict for the managed observations, set columns for residuals and set # the used_in_refinement flag to the predictions obs = refman.get_obs() ref_predictor(obs) x_obs, y_obs, phi_obs = obs["xyzobs.mm.value"].parts() x_calc, y_calc, phi_calc = obs["xyzcal.mm"].parts() obs["x_resid"] = x_calc - x_obs obs["y_resid"] = y_calc - y_obs obs["phi_resid"] = phi_calc - phi_obs refman.update_residuals() # determine whether to do basic centroid analysis to automatically # determine outlier rejection block if params.refinement.reflections.outlier.block_width is libtbx.Auto: ca = refman.get_centroid_analyser() analysis = ca(calc_average_residuals=False, calc_periodograms=False) else: analysis = None # Now predictions and centroid analysis are available, so we can finalise # the reflection manager refman.finalise(analysis) return (experiments, refman, ref_predictor, do_stills) @classmethod def _build_refiner(cls, params, experiments, refman, ref_predictor, do_stills): """low level build""" do_sparse = params.refinement.parameterisation.sparse # Create model parameterisations logger.debug("Building prediction equation parameterisation") pred_param = build_prediction_parameterisation( params.refinement.parameterisation, experiments, refman, do_stills ) # Build a constraints manager, if requested cmf = ConstraintManagerFactory(params, pred_param) constraints_manager = cmf() # Test for parameters that have too little data to refine and act accordingly autoreduce = AutoReduce( params.refinement.parameterisation.auto_reduction, pred_param, refman, constraints_manager, cmf, ) autoreduce() # if reduction was done, constraints_manager will have changed constraints_manager = autoreduce.constraints_manager # Build a restraints parameterisation (if requested). # Only unit cell restraints are supported at the moment. restraints_parameterisation = cls.config_restraints( params.refinement.parameterisation, pred_param ) # Parameter reporting logger.debug("Prediction equation parameterisation built") logger.debug("Parameter order : name mapping") for i, e in enumerate(pred_param.get_param_names()): logger.debug("Parameter %03d : %s", i + 1, e) param_reporter = ParameterReporter( pred_param.get_detector_parameterisations(), pred_param.get_beam_parameterisations(), pred_param.get_crystal_orientation_parameterisations(), pred_param.get_crystal_unit_cell_parameterisations(), pred_param.get_goniometer_parameterisations(), ) # Create target function logger.debug("Building target function") target = cls.config_target( params.refinement.target, experiments, refman, ref_predictor, pred_param, restraints_parameterisation, do_stills, do_sparse, ) logger.debug("Target function built") # create refinery logger.debug("Building refinement engine") refinery = cls.config_refinery(params, target, pred_param, constraints_manager) logger.debug("Refinement engine built") nparam = len(pred_param) ndim = target.dim nref = len(refman.get_matches()) logger.info( "There are %s parameters to refine against %s reflections in %s dimensions", nparam, nref, ndim, ) if not params.refinement.parameterisation.sparse and isinstance( refinery, AdaptLstbx ): dense_jacobian_gigabytes = ( nparam * nref * ndim * flex.double.element_size() ) / 1e9 avail_memory_gigabytes = MEMORY_LIMIT / 1e9 # Report if the Jacobian requires a large amount of storage if ( dense_jacobian_gigabytes > 0.2 * avail_memory_gigabytes or dense_jacobian_gigabytes > 0.5 ): logger.info( "Storage of the Jacobian matrix requires %.1f GB", dense_jacobian_gigabytes, ) # build refiner interface and return if params.refinement.parameterisation.scan_varying: refiner = ScanVaryingRefiner else: refiner = Refiner return refiner( experiments, pred_param, param_reporter, refman, target, refinery )
[docs] @staticmethod def config_sparse(params, experiments): """Configure whether to use sparse datatypes""" # Automatic selection for sparse parameter if params.refinement.parameterisation.sparse == libtbx.Auto: if len(experiments) > 1 or params.refinement.parameterisation.scan_varying: params.refinement.parameterisation.sparse = True else: params.refinement.parameterisation.sparse = False if params.refinement.refinery.engine == "SparseLevMar": params.refinement.parameterisation.sparse = True if params.refinement.mp.nproc > 1: if params.refinement.refinery.engine != "SparseLevMar": # sparse vectors cannot be pickled, so can't use easy_mp here params.refinement.parameterisation.sparse = False else: pass # but SparseLevMar requires sparse jacobian; does not implement mp # Check incompatible selection elif ( params.refinement.parameterisation.sparse and params.refinement.mp.nproc > 1 ): logger.warning( "Could not set sparse=True and nproc=%s", params.refinement.mp.nproc ) logger.warning("Resetting sparse=False") params.refinement.parameterisation.sparse = False return params
[docs] @staticmethod def config_restraints(params, pred_param): """Given a set of user parameters plus a model parameterisation, create restraints plus a parameterisation of these restraints Params: params: The input PHIL parameters pred_param: A PredictionParameters object Returns: A restraints parameterisation or None """ if not any( ( params.crystal.unit_cell.restraints.tie_to_target, params.crystal.unit_cell.restraints.tie_to_group, ) ): return None if params.scan_varying and not params.crystal.unit_cell.force_static: logger.warning("Restraints will be ignored for scan_varying=True") return None det_params = pred_param.get_detector_parameterisations() beam_params = pred_param.get_beam_parameterisations() xl_ori_params = pred_param.get_crystal_orientation_parameterisations() xl_uc_params = pred_param.get_crystal_unit_cell_parameterisations() gon_params = pred_param.get_goniometer_parameterisations() rp = RestraintsParameterisation( detector_parameterisations=det_params, beam_parameterisations=beam_params, xl_orientation_parameterisations=xl_ori_params, xl_unit_cell_parameterisations=xl_uc_params, goniometer_parameterisations=gon_params, ) # Shorten params path cell_r = params.crystal.unit_cell.restraints for tie in cell_r.tie_to_target: if len(tie.values) != 6: raise DialsRefineConfigError( "6 cell parameters must be provided as the tie_to_target.values." ) if len(tie.sigmas) != 6: raise DialsRefineConfigError( "6 sigmas must be provided as the tie_to_target.sigmas. " "Note that individual sigmas of 0.0 will remove " "the restraint for the corresponding cell parameter." ) if tie.id is None: # get one experiment id for each parameterisation to apply to all tie.id = [e.get_experiment_ids()[0] for e in xl_uc_params] for exp_id in tie.id: rp.add_restraints_to_target_xl_unit_cell(exp_id, tie.values, tie.sigmas) for tie in cell_r.tie_to_group: if len(tie.sigmas) != 6: raise DialsRefineConfigError( "6 sigmas must be provided as the tie_to_group.sigmas. " "Note that individual sigmas of 0.0 will remove " "the restraint for the corresponding cell parameter." ) if tie.id is None: rp.add_restraints_to_group_xl_unit_cell(tie.target, "all", tie.sigmas) else: rp.add_restraints_to_group_xl_unit_cell(tie.target, tie.id, tie.sigmas) return rp
[docs] @staticmethod def config_refinery(params, target, pred_param, constraints_manager): """Given a set of parameters, a target class, a prediction parameterisation class and a constraints_manager (which could be None), build a refinery Params: params The input parameters Returns: The refinery instance """ # Shorten parameter path options = params.refinement.refinery if options.engine == "SimpleLBFGS": from dials.algorithms.refinement.engine import SimpleLBFGS as refinery elif options.engine == "LBFGScurvs": from dials.algorithms.refinement.engine import LBFGScurvs as refinery elif options.engine == "GaussNewton": from dials.algorithms.refinement.engine import ( GaussNewtonIterations as refinery, ) elif options.engine == "LevMar": from dials.algorithms.refinement.engine import ( LevenbergMarquardtIterations as refinery, ) elif options.engine == "SparseLevMar": from dials.algorithms.refinement.sparse_engine import ( SparseLevenbergMarquardtIterations as refinery, ) else: raise RuntimeError( "Refinement engine " + options.engine + " not recognised" ) logger.debug("Selected refinement engine type: %s", options.engine) engine = refinery( target=target, prediction_parameterisation=pred_param, constraints_manager=constraints_manager, log=options.log, tracking=options.journal, max_iterations=options.max_iterations, ) if params.refinement.mp.nproc > 1: nproc = params.refinement.mp.nproc try: engine.set_nproc(nproc) except NotImplementedError: logger.warning( "Could not set nproc=%s for refinement engine of type %s", nproc, options.engine, ) return engine
# Overload to allow subclasses of RefinerFactory to use a different # TargetFactory
[docs] @staticmethod def config_target( params, experiments, reflection_manager, predictor, pred_param, restraints_param, do_stills, do_sparse, ): target = TargetFactory.from_parameters_and_experiments( params, experiments, reflection_manager, predictor, pred_param, restraints_param, do_stills, do_sparse, ) return target
[docs] class Refiner: """Public interface for performing DIALS refinement. Public methods: run rmsds get_experiments get_matches get_param_reporter parameter_correlation_plot selection_used_for_refinement predict_for_reflection_table predict_for_indexed Notes: * The return value of run is a recorded history of the refinement * The experiments accessor provides a copy of the experiments used by refinement * get_matches exposes the function of the same name from the privately stored reflection manager * The return value of selection_used_for_refinement is a flex.bool """
[docs] def __init__( self, experiments, pred_param, param_reporter, refman, target, refinery ): """ Mandatory arguments: experiments - a dxtbx ExperimentList object pred_param - An object derived from the PredictionParameterisation class param_reporter -A ParameterReporter object refman - A ReflectionManager object target - An object derived from the Target class refinery - An object derived from the Refinery class """ # the experimental models self._experiments = experiments # refinement module main objects self._pred_param = pred_param self._refman = refman self._target = target self._refinery = refinery # parameter reporter self._param_report = param_reporter # Keep track of whether this is stills or scans type refinement self.experiment_type = refman.experiment_type self._exp_rmsd_table_data = None return
[docs] def get_experiments(self): """Return a copy of the current refiner experiments""" return _copy_experiments_for_refining(self._experiments)
[docs] def rmsds(self): """Return rmsds of the current model""" # ensure predictions for the matches are up to date self._refinery.prepare_for_step() return self._target.rmsds()
[docs] def rmsds_for_reflection_table(self, reflections): """Calculate unweighted RMSDs for the specified reflections""" # ensure predictions for these reflections are up to date preds = self.predict_for_reflection_table(reflections) return self._target.rmsds_for_reflection_table(preds)
[docs] def get_matches(self): """Delegated to the reflection manager""" return self._refman.get_matches()
[docs] def get_free_reflections(self): """Delegated to the reflection manager""" return self._refman.get_free_reflections()
[docs] def get_param_reporter(self): """Get the ParameterReport object linked to this Refiner""" return self._param_report
[docs] def get_parameter_correlation_matrix(self, step, col_select=None): """Return the correlation matrix between columns of the Jacobian at the specified refinement step. The parameter col_select can be used to select subsets of the full number of columns. The column labels are also returned as a list of strings""" corrmats = self._refinery.get_correlation_matrix_for_step(step) if corrmats is None: return None, None all_labels = self._pred_param.get_param_names() if col_select is None: col_select = list(range(len(all_labels))) sel = string_sel(col_select, all_labels) labels = [e for e, s in zip(all_labels, sel) if s] num_cols = len(labels) if num_cols == 0: return None, None for k, corrmat in corrmats.items(): assert corrmat.is_square_matrix() idx = flex.bool(sel).iselection() sub_corrmat = flex.double(flex.grid(num_cols, num_cols)) for i, x in enumerate(idx): for j, y in enumerate(idx): sub_corrmat[i, j] = corrmat[x, y] corrmats[k] = sub_corrmat return (corrmats, labels)
@property def history(self): """Get the refinement engine's step history""" return self._refinery.history
[docs] def print_step_table(self): """print useful output about refinement steps in the form of a simple table""" logger.info("\nRefinement steps:") rmsd_multipliers = [] header = ["Step", "Nref"] for name, units in zip(self._target.rmsd_names, self._target.rmsd_units): if units == "mm": header.append(name + "\n(mm)") rmsd_multipliers.append(1.0) elif units == "A": header.append(name + "\n(A)") rmsd_multipliers.append(1.0) elif units == "rad": # convert radians to degrees for reporting header.append(name + "\n(deg)") rmsd_multipliers.append(RAD2DEG) else: # leave unknown units alone header.append(name + "\n(" + units + ")") rows = [] for i in range(self._refinery.history.get_nrows()): rmsds = [ r * m for (r, m) in zip(self._refinery.history["rmsd"][i], rmsd_multipliers) ] rows.append( [str(i), str(self._refinery.history["num_reflections"][i])] + [f"{r:.5g}" for r in rmsds] ) logger.info(dials.util.tabulate(rows, header)) logger.info(self._refinery.history.reason_for_termination) return
[docs] def print_out_of_sample_rmsd_table(self): """print out-of-sample RSMDs per step, if these were tracked""" # check if it makes sense to proceed if "out_of_sample_rmsd" not in self._refinery.history: return nref = len(self.get_free_reflections()) if nref < 10: return # don't do anything if very few refs logger.info("\nRMSDs for out-of-sample (free) reflections:") rmsd_multipliers = [] header = ["Step", "Nref"] for name, units in zip(self._target.rmsd_names, self._target.rmsd_units): if units == "mm": header.append(name + "\n(mm)") rmsd_multipliers.append(1.0) elif units == "rad": # convert radians to degrees for reporting header.append(name + "\n(deg)") rmsd_multipliers.append(RAD2DEG) else: # leave unknown units alone header.append(name + "\n(" + units + ")") rows = [] for i in range(self._refinery.history.get_nrows()): rmsds = [ r * m for r, m in zip( self._refinery.history["out_of_sample_rmsd"][i], rmsd_multipliers ) ] rows.append([str(i), str(nref)] + [f"{e:.5g}" for e in rmsds]) logger.info(dials.util.tabulate(rows, header)) return
[docs] def calc_exp_rmsd_table(self): if self._exp_rmsd_table_data: return self._exp_rmsd_table_data header = ["Exp\nid", "Nref"] for name, units in zip(self._target.rmsd_names, self._target.rmsd_units): if name == "RMSD_X" or name == "RMSD_Y" and units == "mm": header.append(name + "\n(px)") elif name == "RMSD_Phi" and units == "rad": # will convert radians to images for reporting of scans header.append("RMSD_Z" + "\n(images)") elif units == "rad": # will convert other angles in radians to degrees (e.g. for # RMSD_DeltaPsi and RMSD_2theta) header.append(name + "\n(deg)") elif name == "RMSD_wavelength" and units == "frame": header.append(name + "\n(frame)") else: # skip other/unknown RMSDs pass rows = [] for iexp, exp in enumerate(self._experiments): detector = exp.detector px_sizes = [p.get_pixel_size() for p in detector] it = iter(px_sizes) px_size = next(it) if not all(tst == px_size for tst in it): logger.info( "The detector in experiment %d does not have the same pixel " + "sizes on each panel. Skipping...", iexp, ) continue px_per_mm = [1.0 / e for e in px_size] scan = exp.scan try: images_per_rad = 1.0 / abs(scan.get_oscillation(deg=False)[1]) except (AttributeError, ZeroDivisionError, RuntimeError): images_per_rad = None raw_rmsds = self._target.rmsds_for_experiment(iexp) if raw_rmsds is None: continue # skip experiments where rmsd cannot be calculated num = self._target.get_num_matches_for_experiment(iexp) rmsds = [] for name, units, rmsd in zip( self._target.rmsd_names, self._target.rmsd_units, raw_rmsds ): if name == "RMSD_X" and units == "mm": rmsds.append(rmsd * px_per_mm[0]) elif name == "RMSD_Y" and units == "mm": rmsds.append(rmsd * px_per_mm[1]) elif name == "RMSD_Phi" and units == "rad": rmsds.append(rmsd * images_per_rad) elif name == "RMSD_wavelength" and units == "frame": rmsds.append(rmsd) elif units == "rad": rmsds.append(rmsd * RAD2DEG) rows.append([str(iexp), str(num)] + [f"{r:.5g}" for r in rmsds]) self._exp_rmsd_table_data = (header, rows) return self._exp_rmsd_table_data
[docs] def print_exp_rmsd_table(self): """print useful output about refinement steps in the form of a simple table""" logger.info("\nRMSDs by experiment:") header, rows = self.calc_exp_rmsd_table() if len(rows) > 0: logger.info(dials.util.tabulate(rows, header)) return
[docs] def print_panel_rmsd_table(self): """print useful output about refinement steps in the form of a simple table""" if len(self._experiments.scans()) > 1 and not self._experiments.all_tof(): logger.warning( "Multiple scans present. Only the first scan will be used " "to determine the image width for reporting RMSDs" ) scan = self._experiments.scans()[0] images_per_rad = None if scan and scan.has_property("oscillation"): if scan.get_oscillation(deg=False)[1] != 0.0: images_per_rad = 1.0 / abs(scan.get_oscillation(deg=False)[1]) for idetector, detector in enumerate(self._experiments.detectors()): if len(detector) == 1: continue logger.info("\nDetector %s RMSDs by panel:", idetector + 1) header = ["Panel\nid", "Nref"] for name, units in zip(self._target.rmsd_names, self._target.rmsd_units): if name == "RMSD_X" or name == "RMSD_Y" and units == "mm": header.append(name + "\n(px)") elif ( name == "RMSD_Phi" and units == "rad" ): # convert radians to images for reporting of scans header.append("RMSD_Z" + "\n(images)") elif ( name == "RMSD_DeltaPsi" and units == "rad" ): # convert radians to degrees for reporting of stills header.append(name + "\n(deg)") elif name == "RMSD_wavelength" and units == "frame": header.append(name + "\n(frame)") else: # skip RMSDs that cannot be expressed in image/scan space pass rows = [] for ipanel, panel in enumerate(detector): px_size = panel.get_pixel_size() px_per_mm = [1.0 / e for e in px_size] num = self._target.get_num_matches_for_panel(ipanel) if num <= 0: continue raw_rmsds = self._target.rmsds_for_panel(ipanel) if raw_rmsds is None: continue # skip panels where rmsd cannot be calculated rmsds = [] for name, units, rmsd in zip( self._target.rmsd_names, self._target.rmsd_units, raw_rmsds ): if name == "RMSD_X" and units == "mm": rmsds.append(rmsd * px_per_mm[0]) elif name == "RMSD_Y" and units == "mm": rmsds.append(rmsd * px_per_mm[1]) elif name == "RMSD_Phi" and units == "rad": rmsds.append(rmsd * images_per_rad) elif name == "RMSD_DeltaPsi" and units == "rad": rmsds.append(rmsd * RAD2DEG) elif name == "RMSD_wavelength" and units == "frame": rmsds.append(rmsd) rows.append([str(ipanel), str(num)] + [f"{r:.5g}" for r in rmsds]) if len(rows) > 0: logger.info(dials.util.tabulate(rows, header)) return
[docs] def run(self): """Run refinement""" #################################### # Do refinement and return history # #################################### logger.debug("\nExperimental models before refinement:") for i, beam in enumerate(self._experiments.beams()): logger.debug(ordinal_number(i) + " " + str(beam)) for i, detector in enumerate(self._experiments.detectors()): logger.debug(ordinal_number(i) + " " + str(detector)) for i, goniometer in enumerate(self._experiments.goniometers()): if goniometer is None: continue logger.debug(ordinal_number(i) + " " + str(goniometer)) for i, scan in enumerate(self._experiments.scans()): if scan is None: continue logger.debug(ordinal_number(i) + " " + str(scan)) for i, crystal in enumerate(self._experiments.crystals()): logger.debug(ordinal_number(i) + " " + str(crystal)) self._refinery.run() # These involve calculation, so skip them when output is quiet if logger.getEffectiveLevel() < logging.ERROR: self.print_step_table() self.print_out_of_sample_rmsd_table() self.print_exp_rmsd_table() det_npanels = [len(d) for d in self._experiments.detectors()] if any(n > 1 for n in det_npanels): self.print_panel_rmsd_table() # Perform post-run tasks to write the refined states back to the models self._update_models() logger.debug("\nExperimental models after refinement:") for i, beam in enumerate(self._experiments.beams()): logger.debug(ordinal_number(i) + " " + str(beam)) for i, detector in enumerate(self._experiments.detectors()): logger.debug(ordinal_number(i) + " " + str(detector)) for i, goniometer in enumerate(self._experiments.goniometers()): if goniometer is None: continue logger.debug(ordinal_number(i) + " " + str(goniometer)) for i, scan in enumerate(self._experiments.scans()): if scan is None: continue logger.debug(ordinal_number(i) + " " + str(scan)) for i, crystal in enumerate(self._experiments.crystals()): logger.debug(ordinal_number(i) + " " + str(crystal)) # Report on the refined parameters logger.debug(str(self._param_report)) # Return the refinement history return self._refinery.history
def _update_models(self): """Perform any extra tasks required to update the models after refinement. Does nothing here, but used by subclasses""" pass
[docs] def selection_used_for_refinement(self): """Return a selection as a flex.bool in terms of the input reflection data of those reflections that were used in the final step of refinement.""" matches = self._refman.get_matches() selection = flex.bool(len(self._refman.get_indexed()), False) try: # new reflection table format for matches isel = matches["iobs"] selection.set_selected(isel, True) except TypeError: # old ObsPredMatch format for matches for m in matches: selection[m.iobs] = True return selection
[docs] def predict_for_indexed(self): """perform prediction for all the indexed reflections passed into refinement and additionally set the used_in_refinement flag. Do not compose the derivatives of states of the model as this is expensive and they are not needed outside of a refinement run""" reflections = self.predict_for_reflection_table( self._refman.get_indexed(), skip_derivatives=True ) reflections.sort("iobs") mask = self.selection_used_for_refinement() reflections.set_flags(mask, reflections.flags.used_in_refinement) return reflections
[docs] def predict_for_reflection_table(self, reflections, skip_derivatives=False): """perform prediction for all reflections in the supplied table""" # delegate to the target object, which has access to the predictor return self._target.predict_for_reflection_table(reflections, skip_derivatives)
[docs] class ScanVaryingRefiner(Refiner): """Includes functionality to update the models with their states at scan-points after scan-varying refinement""" def _update_models(self): for iexp, exp in enumerate(self._experiments): ar_range = exp.scan.get_array_range() obs_image_numbers = list(range(ar_range[0], ar_range[1] + 1)) # write scan-varying s0 vectors back to beam models s0_list = self._pred_param.get_varying_s0(obs_image_numbers, iexp) if s0_list is not None: exp.beam.set_s0_at_scan_points(s0_list) # write scan-varying setting rotation matrices back to goniometer models S_list = self._pred_param.get_varying_setting_rotation( obs_image_numbers, iexp ) if S_list is not None: exp.goniometer.set_setting_rotation_at_scan_points(S_list) # write scan-varying crystal setting matrices back to crystal models A_list = self._pred_param.get_varying_UB(obs_image_numbers, iexp) if A_list is not None: exp.crystal.set_A_at_scan_points(A_list) # Calculate scan-varying errors if requested if self._pred_param.set_scan_varying_errors: # get state covariance matrices the whole range of images. We select # the first element of this at each image because crystal scan-varying # parameterisations are not multi-state state_cov_list = [ self._pred_param.calculate_model_state_uncertainties( obs_image_number=t, experiment_id=iexp ) for t in range(ar_range[0], ar_range[1] + 1) ] if "U_cov" in state_cov_list[0]: u_cov_list = [e["U_cov"] for e in state_cov_list] else: u_cov_list = None if "B_cov" in state_cov_list[0]: b_cov_list = [e["B_cov"] for e in state_cov_list] else: b_cov_list = None # return these to the model parameterisations to be set in the models self._pred_param.set_model_state_uncertainties( u_cov_list, b_cov_list, iexp )