Source code for dials.algorithms.integration.integrator

from __future__ import annotations

import collections
import functools
import logging
import math
import pickle
import random

import dials.extensions
from dials.algorithms.integration import TimingInfo, processor
from dials.algorithms.integration.filtering import IceRingFilter
from dials.algorithms.integration.parallel_integrator import (
    IntegratorProcessor,
    ReferenceCalculatorProcessor,
)
from dials.algorithms.integration.processor import (
    Processor2D,
    Processor3D,
    ProcessorFlat3D,
    ProcessorSingle2D,
    ProcessorStills,
    build_processor,
    job,
)
from dials.algorithms.integration.report import (
    IntegrationReport,
    ProfileModelReport,
    ProfileValidationReport,
)
from dials.algorithms.integration.validation import ValidatedMultiExpProfileModeller
from dials.algorithms.profile_model.modeller import MultiExpProfileModeller
from dials.algorithms.shoebox import MaskCode
from dials.array_family import flex

# constants
from dials.constants import EPS, FULL_PARTIALITY
from dials.util import Sorry, phil, pprint, tabulate
from dials.util.command_line import heading
from dials.util.report import Report
from dials.util.system import MEMORY_LIMIT
from dials_algorithms_integration_integrator_ext import (
    Executor,
    JobList,
    ReflectionManager,
    max_memory_needed,
)

logger = logging.getLogger(__name__)

__all__ = [
    "Executor",
    "Integrator",
    "Integrator2D",
    "Integrator3D",
    "Integrator3DThreaded",
    "IntegratorExecutor",
    "IntegratorFlat3D",
    "IntegratorSingle2D",
    "IntegratorStills",
    "JobList",
    "Parameters",
    "Processor2D",
    "Processor3D",
    "ProcessorFlat3D",
    "ProcessorSingle2D",
    "ProcessorStills",
    "ProfileModellerExecutor",
    "ProfileValidatorExecutor",
    "ReflectionManager",
    "frame_hist",
    "generate_phil_scope",
    "hist",
    "job",
    "nframes_hist",
    "phil_scope",
]


[docs] def generate_phil_scope(): """ Generate the integration phil scope. :return: The phil scope """ phil_scope = phil.parse( """ integration { include scope dials.data.lookup.phil_scope block { size = auto .type = float .help = "The block size in rotation angle (degrees)." units = *degrees radians frames .type = choice .help = "The units of the block size" threshold = 0.95 .type = float(value_min=0.0, value_max=1.0) .help = "For block size auto the block size is calculated by sorting" "reflections by the number of frames they cover and then" "selecting the block size to be 2*nframes[threshold] such" "that 100*threshold % of reflections are guaranteed to be" "fully contained in 1 block" force = False .type = bool .help = "If the number of processors is 1 and force is False, then the" "number of blocks may be set to 1. If force is True then the" "block size is always calculated." max_memory_usage = 0.90 .type = float(value_min=0.0,value_max=1.0) .help = "The maximum percentage of available memory to use for" "allocating shoebox arrays." } use_dynamic_mask = True .type = bool .help = "Use dynamic mask if available" debug { reference { filename = "reference_profiles.refl" .type = str .help = "The filename for the reference profiles" output = False .type = bool .help = "Save the reference profiles" } during = modelling *integration .type = choice .help = "Do debugging during modelling or integration" output = False .type = bool .help = "Save shoeboxes after each processing task." separate_files = True .type = bool .help = "If this is true, the shoeboxes are saved in separate files" "from the output integrated.refl file. This is necessary" "in most cases since the amount of memory used by the" "shoeboxes is typically greater than the available system" "memory. If, however, you know that memory is not an issue," "you can saved the shoeboxes in the integrated.refl file" "by setting this option to False. This only works if the debug" "output is during integrated and not modelling." delete_shoeboxes = False .type = bool .help = "Delete shoeboxes immediately before saving files. This option" "in combination with debug.output=True enables intermediate" "processing steps to make use of shoeboxes." select = None .type = reflection_table_selector .help = "A string specifying the selection. The string should be of the" "form: select=${COLUMN}[<|<=|==|!=|>=|>]${VALUE}. In addition" "to the items in the reflection table, the following implicit" "columns are defined if the necessary data is there:" " intensity.sum.i_over_sigma" " intensity.prf.i_over_sigma" split_experiments = True .type = bool .help = "Split shoeboxes into different files" } integrator = *auto 3d flat3d 2d single2d stills 3d_threaded .type = choice .help = "The integrator to use." .expert_level=3 profile { fitting = True .type = bool .help = "Use profile fitting if available" valid_foreground_threshold = 0.75 .type = float(value_min=0, value_max=1) .help = "The minimum fraction of foreground pixels that must be valid" "in order for a reflection to be integrated by profile fitting." .expert_level = 2 sigma_b_multiplier = 2.0 .type = float(value_min=1.0) .help = "Background box expansion factor" .expert_level = 3 validation { number_of_partitions = 1 .type = int(value_min=1) .help = "The number of subsamples to take from the reference spots." "If the value is 1, then no validation is performed." min_partition_size = 100 .type = int(value_min=1) .help = "The minimum number of spots to use in each subsample." } } filter .expert_level = 1 { min_zeta = 0.05 .help = "Filter the reflections by the value of zeta. A value of less" "than or equal to zero indicates that this will not be used. A" "positive value is used as the minimum permissible value." .type = float(value_min=0.0, value_max=1.0) ice_rings = False .help = "Set the ice ring flags" .type = bool } include scope dials.algorithms.integration.overlaps_filter.phil_scope mp { method = *multiprocessing drmaa sge lsf pbs .type = choice .help = "The multiprocessing method to use" njobs = 1 .type = int(value_min=1) .help = "The number of cluster jobs to use" nproc = 1 .type = int(value_min=1) .help = "The number of processes to use per cluster job" multiprocessing.n_subset_split = None .type = int(value_min=1) .help = "Number of subsets to split the reflection table for integration." } summation { detector_gain = 1 .type = float .help = "Multiplier for variances after integration of still images." "See Leslie 1999." } } """, process_includes=True, ) main_scope = phil_scope.get_without_substitution("integration") assert len(main_scope) == 1 main_scope = main_scope[0] main_scope.adopt_scope(dials.extensions.Background.phil_scope()) main_scope.adopt_scope(dials.extensions.Centroid.phil_scope()) return phil_scope
# The integration phil scope phil_scope = generate_phil_scope()
[docs] def hist(data, width=80, symbol="#", prefix=""): """ A utility function to print a histogram of reflections on frames. :param data: The data to histogram :param width: The number of characters in each line :param symbol: The plot symbol :param prefix: String to prefix to each line :return: The histogram string """ assert len(data) > 0, "Need > 0 reflections" assert width > 0, "Width should be > 0" count = collections.Counter(data) count = sorted(count.items()) frame, count = zip(*count) max_frame = max(frame) min_count = min(count) max_count = max(count) assert max_count > 0, "Max should be > 0" assert min_count >= 0, "Min should be >= 0" if max_frame == 0: num_frame_zeros = 1 else: num_frame_zeros = int(math.floor(math.log10(max_frame))) + 1 num_count_zeros = int(math.floor(math.log10(max_count))) + 1 assert num_frame_zeros > 0, "Num should be > 0" assert num_count_zeros > 0, "Num should be > 0" num_hist = width - (num_frame_zeros + num_count_zeros + 5) - len(prefix) assert num_hist > 0, "num_hist should be > 0" fmt = "%s%%-%dd [%%-%dd]: %%s" % (prefix, num_frame_zeros, num_count_zeros) scale = float(num_hist) / max_count return "\n".join( ( fmt % (key, value, int(value * scale) * symbol) for key, value in zip(frame, count) ) )
[docs] def frame_hist(bbox, width=80, symbol="#", prefix=""): """ A utility function to print a histogram of reflections on frames. :param bbox: The bounding boxes :param width: The width of each line :param symbol: The histogram symbol :param prefix: A string to prefix to each line :return: The histogram string """ return hist( [(z + 1) for b in bbox for z in range(b[4], b[5])], width=width, symbol=symbol, prefix=prefix, )
[docs] def nframes_hist(bbox, width=80, symbol="#", prefix=""): """ A utility function to print a histogram of number of frames. :param bbox: The bounding boxes :param width: The width of each line :param symbol: The histogram symbol :param prefix: A string to prefix to each line :return: The histogram string """ return hist([b[5] - b[4] for b in bbox], width=width, symbol=symbol, prefix=prefix)
[docs] class Parameters: """ A stack of classes to represent the integration parameters """
[docs] class Filter: """ Filter parameters """
[docs] def __init__(self): self.min_zeta = 0.05 self.powder_filter = None
[docs] class Profile: """ Profile parameters """
[docs] class Validation:
[docs] def __init__(self): self.number_of_partitions = 2 self.min_partition_size = 100
[docs] def __init__(self): self.fitting = True self.sigma_b_multiplier = 2.0 self.valid_foreground_threshold = 0.75 self.validation = Parameters.Profile.Validation()
[docs] def __init__(self): """ Initialize """ self.modelling = processor.Parameters() self.integration = processor.Parameters() self.filter = Parameters.Filter() self.profile = Parameters.Profile() self.debug_reference_filename = "reference_profiles.pickle" self.debug_reference_output = False
[docs] @staticmethod def from_phil(params): """ Convert the phil parameters """ # Init the parameters result = Parameters() # Create the multi processing parameters mp = processor.MultiProcessing() mp.method = params.mp.method mp.nproc = params.mp.nproc mp.njobs = params.mp.njobs mp.n_subset_split = params.mp.multiprocessing.n_subset_split # Set the lookup parameters lookup = processor.Lookup() lookup.mask = params.lookup.mask # Set the block parameters block = processor.Block() block.size = params.block.size block.units = params.block.units block.threshold = params.block.threshold block.force = params.block.force block.max_memory_usage = params.block.max_memory_usage # Set the modelling processor parameters result.modelling.mp = mp result.modelling.lookup = lookup result.modelling.block = block if params.debug.during == "modelling": result.modelling.debug.output = params.debug.output result.modelling.debug.select = params.debug.select result.modelling.debug.separate_files = True # Set the integration processor parameters result.integration.mp = mp result.integration.lookup = lookup result.integration.block = block if params.debug.during == "integration": result.integration.debug.output = params.debug.output result.integration.debug.select = params.debug.select result.integration.debug.separate_files = params.debug.separate_files result.integration.summation = params.summation result.integration.integrator = params.integrator result.debug_reference_filename = params.debug.reference.filename result.debug_reference_output = params.debug.reference.output # Profile parameters result.profile.sigma_b_multiplier = params.profile.sigma_b_multiplier result.profile.valid_foreground_threshold = ( params.profile.valid_foreground_threshold ) # Get the min zeta filter result.filter.min_zeta = params.filter.min_zeta if params.filter.ice_rings is True: result.filter.powder_filter = IceRingFilter() # Get post-integration overlap filtering parameters result.integration.overlaps_filter = params.overlaps_filter # Set the profile fitting parameters result.profile.fitting = params.profile.fitting result.profile.validation.number_of_partitions = ( params.profile.validation.number_of_partitions ) result.profile.validation.min_partition_size = ( params.profile.validation.min_partition_size ) # Return the result return result
def _initialize_rotation(experiments, params, reflections): """ A pre-processing class for oscillation data. """ # Compute some reflection properties reflections.compute_zeta_multi(experiments) reflections.compute_d(experiments) reflections.compute_bbox( experiments, sigma_b_multiplier=params.profile.sigma_b_multiplier ) # Filter the reflections by zeta mask = flex.abs(reflections["zeta"]) < params.filter.min_zeta reflections.set_flags(mask, reflections.flags.dont_integrate) # Filter the reflections by powder ring if params.filter.powder_filter is not None: mask = params.filter.powder_filter(reflections["d"]) reflections.set_flags(mask, reflections.flags.in_powder_ring) def _initialize_stills(experiments, params, reflections): """ A pre-processing class for stills data. """ # Compute some reflection properties reflections.compute_d(experiments) reflections.compute_bbox( experiments, sigma_b_multiplier=params.profile.sigma_b_multiplier ) # Check the bounding boxes are all 1 frame in width z0, z1 = reflections["bbox"].parts()[4:6] assert (z1 - z0).all_eq(1), "bbox is invalid" # Filter the reflections by powder ring if params.filter.powder_filter is not None: mask = params.filter.powder_filter(reflections["d"]) reflections.set_flags(mask, reflections.flags.in_powder_ring) def _finalize(reflections, experiments, params): """ A generic post-processing function. """ overlaps_scope = params.integration.overlaps_filter if True in [ overlaps_scope.foreground_foreground.enable, overlaps_scope.foreground_background.enable, ]: from dials.algorithms.integration.overlaps_filter import OverlapsFilterMultiExpt overlaps_filter = OverlapsFilterMultiExpt(reflections, experiments) if overlaps_scope.foreground_foreground.enable: overlaps_filter.remove_foreground_foreground_overlaps() if overlaps_scope.foreground_background.enable: overlaps_filter.remove_foreground_background_overlaps() reflections = overlaps_filter.refl return reflections, experiments def _finalize_rotation(reflections, experiments, params): """ A post-processing function for oscillation data. """ reflections, experiments = _finalize(reflections, experiments, params) # Compute the corrections reflections.compute_corrections(experiments) return reflections, experiments def _finalize_stills(reflections, experiments, params): """ A post-processing function for stills data. """ reflections, experiments = _finalize(reflections, experiments, params) integrated = reflections # Select only those reflections which were integrated if "intensity.prf.variance" in integrated: selection = integrated.get_flags(integrated.flags.integrated, all=True) else: selection = integrated.get_flags(integrated.flags.integrated_sum) integrated = integrated.select(selection) len_all = len(integrated) integrated = integrated.select( ~integrated.get_flags(integrated.flags.foreground_includes_bad_pixels) ) logger.info( "Filtering %d reflections with at least one bad foreground pixel out of %d", len_all - len(integrated), len_all, ) # verify sigmas are sensible if "intensity.prf.value" in integrated: if (integrated["intensity.prf.variance"] < 0).count(True) > 0: raise Sorry( "Found negative variances (prf). Are bad pixels properly masked out?" ) if "intensity.sum.value" in integrated: if (integrated["intensity.sum.variance"] <= 0).count(True) > 0: if (integrated["intensity.sum.variance"] < 0).count(True) > 0: raise Sorry( "Found negative variances (sum). Are bad pixels properly masked out?" ) n = (integrated["intensity.sum.variance"] == 0).count(True) sel = (integrated["intensity.sum.variance"] == 0) & ( integrated["intensity.sum.value"] == 0 ) if n == sel.count(True): logger.info( "Filtering %d reflections with no integrated signal (sum and variance = 0) out of %d", n, len(integrated), ) integrated = integrated.select(integrated["intensity.sum.variance"] > 0) else: raise Sorry( "Found reflections with variances == 0 but summed signal != 0" ) # apply detector gain to summation variances integrated["intensity.sum.variance"] *= ( params.integration.summation.detector_gain ) if "background.sum.value" in integrated: if (integrated["background.sum.variance"] < 0).count(True) > 0: raise Sorry( "Found negative variances (background sum). Are bad pixels properly masked out?" ) if (integrated["background.sum.variance"] == 0).count(True) > 0: logger.info( "Filtering %d reflections with zero background variance", (integrated["background.sum.variance"] == 0).count(True), ) integrated = integrated.select(integrated["background.sum.variance"] > 0) # apply detector gain to background summation variances integrated["background.sum.variance"] *= ( params.integration.summation.detector_gain ) reflections = integrated return reflections, experiments
[docs] class ProfileModellerExecutor(Executor): """ The class to do profile modelling calculations """ __getstate_manages_dict__ = 1
[docs] def __init__(self, experiments, profile_fitter): """ Initialise the executor :param experiments: The experiment list """ self.experiments = experiments self.profile_fitter = profile_fitter super().__init__()
[docs] def initialize(self, frame0, frame1, reflections): """ Initialise the processing for a job :param frame0: The first frame in the job :param frame1: The last frame in the job :param reflections: The reflections that will be processed """ # Count "effectively full" and partial reflections N.B. # here effectively full is equivalent to area under a normal # curve to +/- 3 sigma full_value = FULL_PARTIALITY - EPS fully_recorded = reflections["partiality"] > full_value npart = fully_recorded.count(False) nfull = fully_recorded.count(True) nice = reflections.get_flags(reflections.flags.in_powder_ring).count(True) ntot = len(reflections) # Write some output logger.debug("") logger.debug(" Beginning modelling job %d", job.index) logger.info("") logger.info(" Frames: %d -> %d", frame0 + 1, frame1) logger.info("") logger.info(" Number of reflections") logger.info(" Partial: %d", npart) logger.info(" Full: %d", nfull) logger.info(" In ice ring: %d", nice) logger.info(" Total: %d", ntot) logger.info("") # Print a histogram of reflections on frames if frame1 - frame0 > 1: logger.debug( " The following histogram shows the number of reflections predicted" ) logger.debug(" to have all or part of their intensity on each frame.") logger.debug("") logger.debug(frame_hist(reflections["bbox"], prefix=" ", symbol="*")) logger.debug("")
[docs] def process(self, frame, reflections): """ Process the data :param frame: The frame being processed :param reflections: The reflections to process """ # Check if pixels are overloaded reflections.is_overloaded(self.experiments) # Compute the shoebox mask reflections.compute_mask(self.experiments) # Process the data reflections.compute_background(self.experiments) reflections.compute_centroid(self.experiments) reflections.compute_summed_intensity() # Do the profile modelling self.profile_fitter.model(reflections) # Print some info fmt = " Modelled % 5d / % 5d reflection profiles on image %d" nmod = reflections.get_flags(reflections.flags.used_in_modelling).count(True) ntot = len(reflections) logger.debug(fmt, nmod, ntot, frame + 1)
[docs] def finalize(self): """ Finalize the processing """ pass
[docs] def data(self): """ :return: the modeller """ return self.profile_fitter
def __getinitargs__(self): """ Support for pickling """ return (self.experiments, self.profile_fitter)
[docs] class ProfileValidatorExecutor(Executor): """ The class to do profile validation calculations """ __getstate_manages_dict__ = 1
[docs] def __init__(self, experiments, profile_fitter): """ Initialise the executor :param experiments: The experiment list """ self.experiments = experiments self.profile_fitter = profile_fitter super().__init__()
[docs] def initialize(self, frame0, frame1, reflections): """ Initialise the processing for a job :param frame0: The first frame in the job :param frame1: The last frame in the job :param reflections: The reflections that will be processed """ # Get some info full_value = FULL_PARTIALITY - EPS fully_recorded = reflections["partiality"] > full_value npart = fully_recorded.count(False) nfull = fully_recorded.count(True) nice = reflections.get_flags(reflections.flags.in_powder_ring).count(True) ntot = len(reflections) # Write some output logger.debug("") logger.debug(" Beginning modelling job %d", job.index) logger.info("") logger.info(" Frames: %d -> %d", frame0, frame1) logger.info("") logger.info(" Number of reflections") logger.info(" Partial: %d", npart) logger.info(" Full: %d", nfull) logger.info(" In ice ring: %d", nice) logger.info(" Total: %d", ntot) logger.info("") # Print a histogram of reflections on frames if frame1 - frame0 > 1: logger.debug( " The following histogram shows the number of reflections predicted" ) logger.debug(" to have all or part of their intensity on each frame.") logger.debug("") logger.debug(frame_hist(reflections["bbox"], prefix=" ", symbol="*")) logger.debug("") self.results = None
[docs] def process(self, frame, reflections): """ Process the data :param frame: The frame being processed :param reflections: The reflections to process """ # Check if pixels are overloaded reflections.is_overloaded(self.experiments) # Compute the shoebox mask reflections.compute_mask(self.experiments) # Process the data reflections.compute_background(self.experiments) reflections.compute_centroid(self.experiments) reflections.compute_summed_intensity() # Do the profile validation self.results = self.profile_fitter.validate(reflections) # Print some info fmt = " Validated % 5d / % 5d reflection profiles on image %d" nmod = reflections.get_flags(reflections.flags.used_in_modelling).count(True) ntot = len(reflections) logger.debug(fmt, nmod, ntot, frame + 1)
[docs] def finalize(self): """ Finalize the processing """ pass
[docs] def data(self): """ :return: the modeller """ return self.results
def __getinitargs__(self): """ Support for pickling """ return (self.experiments, self.profile_fitter)
[docs] class IntegratorExecutor(Executor): """ The class to process the integration data """ __getstate_manages_dict__ = 1
[docs] def __init__( self, experiments, profile_fitter=None, valid_foreground_threshold=0.75 ): """ Initialize the executor :param experiments: The experiment list """ self.experiments = experiments self.overlaps = None self.profile_fitter = profile_fitter self.valid_foreground_threshold = valid_foreground_threshold super().__init__()
[docs] def initialize(self, frame0, frame1, reflections): """ Initialize the processing for the job :param frame0: The first frame to process :param frame1: The last frame to process :param reflections: The reflections to process """ # Get some info full_value = FULL_PARTIALITY - EPS fully_recorded = reflections["partiality"] > full_value npart = fully_recorded.count(False) nfull = fully_recorded.count(True) nice = reflections.get_flags(reflections.flags.in_powder_ring).count(True) nint = reflections.get_flags(reflections.flags.dont_integrate).count(False) ntot = len(reflections) # Write some output logger.debug("") logger.debug(" Beginning integration job %d", job.index) logger.info("") logger.info(" Frames: %d -> %d", frame0, frame1) logger.info("") logger.info(" Number of reflections") logger.info(" Partial: %d", npart) logger.info(" Full: %d", nfull) logger.info(" In ice ring: %d", nice) logger.info(" Integrate: %d", nint) logger.info(" Total: %d", ntot) logger.info("") # Print a histogram of reflections on frames if frame1 - frame0 > 1: logger.debug( " The following histogram shows the number of reflections predicted" ) logger.debug(" to have all or part of their intensity on each frame.") logger.debug("") logger.debug(frame_hist(reflections["bbox"], prefix=" ", symbol="*")) logger.debug("") # Find any overlaps self.overlaps = reflections.find_overlaps(self.experiments)
[docs] def process(self, frame, reflections): """ Process the reflections on a frame :param frame: The frame to process :param reflections: The reflections to process """ # Check if pixels are overloaded reflections.is_overloaded(self.experiments) # Compute the shoebox mask reflections.compute_mask(self.experiments) # Check for invalid pixels in foreground/background reflections.contains_invalid_pixels() # Exclude reflections where a high fraction of the foreground is masked # e.g. due to a panel edge, as this will make the fitting unreliable. sbox = reflections["shoebox"] nvalfg = sbox.count_mask_values(MaskCode.Valid | MaskCode.Foreground) nforeg = sbox.count_mask_values(MaskCode.Foreground) fraction_valid = nvalfg.as_double() / nforeg.as_double() selection = fraction_valid < self.valid_foreground_threshold reflections.set_flags(selection, reflections.flags.dont_integrate) logger.debug( f"{selection.count(True)} reflections have" " a fraction of valid pixels below the valid foreground threshold" ) # Process the data reflections.compute_background(self.experiments) reflections.compute_centroid(self.experiments) reflections.compute_summed_intensity() if self.profile_fitter: reflections.compute_fitted_intensity(self.profile_fitter) # Compute the number of background/foreground pixels reflections["num_pixels.valid"] = sbox.count_mask_values(MaskCode.Valid) reflections["num_pixels.background"] = sbox.count_mask_values( MaskCode.Valid | MaskCode.Background ) reflections["num_pixels.background_used"] = sbox.count_mask_values( MaskCode.Valid | MaskCode.Background | MaskCode.BackgroundUsed ) reflections["num_pixels.foreground"] = nvalfg # Print some info fmt = " Integrated % 5d (sum) + % 5d (prf) / %5d reflections on image %d" nsum = reflections.get_flags(reflections.flags.integrated_sum).count(True) nprf = reflections.get_flags(reflections.flags.integrated_prf).count(True) ntot = len(reflections) logger.debug(fmt, nsum, nprf, ntot, frame + 1)
[docs] def finalize(self): """ Finalize the processing """ pass
[docs] def data(self): """ Return data """ return None
def __getinitargs__(self): """ Support for pickling """ return (self.experiments, self.profile_fitter)
[docs] class Integrator: """ The integrator class """
[docs] def __init__(self, experiments, reflections, params): """ Initialize the integrator :param experiments: The experiment list :param reflections: The reflections to process :param params: The parameters to use """ # Save some stuff self.experiments = experiments self.reflections = reflections self.params = Parameters.from_phil(params.integration) self.profile_model_report = None self.integration_report = None
[docs] def fit_profiles(self): """Do profile fitting if appropriate. Sets self.profile_validation_report and self.profile_model_report. Returns profile_fitter (may be none) """ fitting_class = [e.profile.fitting_class() for e in self.experiments] fitting_avail = all(c is not None for c in fitting_class) if self.params.profile.fitting and fitting_avail: profile_fitting = True profile_fitter = None else: profile_fitting = False profile_fitter = None # Do profile modelling if profile_fitting: logger.info("=" * 80) logger.info("") logger.info(heading("Modelling reflection profiles")) logger.info("") # Get the selection selection = self.reflections.get_flags( self.reflections.flags.reference_spot ) # Get the reference spots reference = self.reflections.select(selection) # Check if we need to skip if len(reference) == 0: logger.info( "** Skipping profile modelling - no reference profiles given **" ) else: # Try to set up the validation if self.params.profile.validation.number_of_partitions > 1: n = len(reference) k_max = int( math.floor( n / self.params.profile.validation.min_partition_size ) ) if k_max < self.params.profile.validation.number_of_partitions: num_folds = k_max else: num_folds = self.params.profile.validation.number_of_partitions if num_folds > 1: indices = ( list(range(num_folds)) * int(math.ceil(n / num_folds)) )[0:n] random.shuffle(indices) reference["profile.index"] = flex.size_t(indices) if num_folds < 1: num_folds = 1 else: num_folds = 1 # Create the profile fitter profile_modellers = [] for i in range(num_folds): profile_fitter_single = MultiExpProfileModeller() # (num_folds) for expr in self.experiments: profile_fitter_single.add(expr.profile.fitting_class()(expr)) profile_modellers.append(profile_fitter_single) # Create the data processor executor = ProfileModellerExecutor( self.experiments, ValidatedMultiExpProfileModeller(profile_modellers), ) processor = build_processor( self.ProcessorClass, self.experiments, reference, self.params.modelling, ) processor.executor = executor # Process the reference profiles reference, profile_fitter_list, time_info = processor.process() # Set the reference spots info # self.reflections.set_selected(selection, reference) # Finalize the profile models for validation assert len(profile_fitter_list) > 0, "No profile fitters" profile_fitter = None for pf in profile_fitter_list.values(): if pf is None: continue if profile_fitter is None: profile_fitter = pf else: profile_fitter.accumulate(pf) profile_fitter.finalize() # Get the finalized modeller finalized_profile_fitter = profile_fitter.finalized_model() # Dump reference profiles if self.params.debug_reference_output: reference_debug = [] for i in range(len(finalized_profile_fitter)): m = finalized_profile_fitter[i] p = [] for j in range(len(m)): try: p.append( { "data": m.data(j), "mask": m.mask(j), "coord": m.coord(j), "n_reflections": m.n_reflections(j), } ) except Exception: p.append(None) reference_debug.append(p) with open(self.params.debug_reference_filename, "wb") as outfile: pickle.dump(reference_debug, outfile) # Print profiles for i in range(len(finalized_profile_fitter)): m = finalized_profile_fitter[i] logger.debug("") logger.debug("Profiles for experiment %d", i) for j in range(len(m)): logger.debug("Profile %d", j) try: logger.debug(pprint.profile3d(m.data(j))) except Exception: logger.debug("** NO PROFILE **") # Print the modeller report self.profile_model_report = ProfileModelReport( self.experiments, finalized_profile_fitter, reference ) logger.info("") logger.info(self.profile_model_report.as_str(prefix=" ")) # Print the time info logger.info("") logger.info("Timing information for reference profile formation") logger.info(str(time_info)) logger.info("") # If we have more than 1 fold then do the validation if num_folds > 1: # Create the data processor executor = ProfileValidatorExecutor( self.experiments, profile_fitter ) processor = build_processor( self.ProcessorClass, self.experiments, reference, self.params.modelling, ) processor.executor = executor # Process the reference profiles reference, validation, time_info = processor.process() # Print the modeller report self.profile_validation_report = ProfileValidationReport( self.experiments, profile_fitter, reference, num_folds ) logger.info("") logger.info(self.profile_validation_report.as_str(prefix=" ")) # Print the time info logger.info("") logger.info("Timing information for reference profile validation") logger.info(str(time_info)) logger.info("") # Set to the finalized fitter profile_fitter = finalized_profile_fitter return profile_fitter
[docs] def integrate(self): """ Integrate the data """ # Ensure we get the same random sample each time random.seed(0) # Init the report self.profile_model_report = None self.integration_report = None # Heading logger.info("=" * 80) logger.info("") logger.info(heading("Processing reflections")) logger.info("") # Print the summary logger.info( " Processing the following experiments:\n" "\n" " Experiments: %d\n" " Beams: %d\n" " Detectors: %d\n" " Goniometers: %d\n" " Scans: %d\n" " Crystals: %d\n" " Imagesets: %d\n", len(self.experiments), len(self.experiments.beams()), len(self.experiments.detectors()), len(self.experiments.goniometers()), len(self.experiments.scans()), len(self.experiments.crystals()), len(self.experiments.imagesets()), ) # Initialize the reflections self.initialize_reflections(self.experiments, self.params, self.reflections) # Check if we want to do some profile fitting profile_fitter = self.fit_profiles() logger.info("=" * 80) logger.info("") logger.info(heading("Integrating reflections")) logger.info("") # Create the data processor executor = IntegratorExecutor( self.experiments, profile_fitter, self.params.profile.valid_foreground_threshold, ) # determine the max memory needed during integration def _determine_max_memory_needed(experiments, reflections): max_needed = 0 for imageset in experiments.imagesets(): # find all experiments belonging to that imageset, as each # imageset is processed as a whole for integration. if all(experiments.identifiers()): expt_ids = [ experiment.identifier for experiment in experiments if experiment.imageset == imageset ] subset = reflections.select_on_experiment_identifiers(expt_ids) else: subset = flex.reflection_table() for j, experiment in enumerate(experiments): if experiment.imageset == imageset: subset.extend(reflections.select(reflections["id"] == j)) if not imageset.get_scan() or imageset.get_scan().is_still(): frame0, frame1 = (0, len(imageset)) else: frame0, frame1 = imageset.get_scan().get_array_range() flatten = self.params.integration.integrator == "flat3d" max_needed = max( max_memory_needed(subset, frame0, frame1, flatten), max_needed, ) assert max_needed > 0, "Could not determine memory requirements" return max_needed def _iterative_table_split(tables, experiments, available_memory): split_tables = [] for table in tables: mem_needed = _determine_max_memory_needed(experiments, table) if mem_needed > available_memory: n_to_split = int(math.ceil(mem_needed / available_memory)) flex.set_random_seed(0) split_tables.extend(table.random_split(n_to_split)) else: split_tables.append(table) if len(split_tables) == len(tables): # nothing was split, all passed memory check return split_tables # some tables were split - so need to check again that all are ok return _iterative_table_split(split_tables, experiments, available_memory) def _run_processor(reflections): processor = build_processor( self.ProcessorClass, self.experiments, reflections, self.params.integration, ) processor.executor = executor # Process the reflections reflections, _, time_info = processor.process() return reflections, time_info if self.params.integration.mp.method != "multiprocessing": self.reflections, time_info = _run_processor(self.reflections) else: # Here, don't consider nproc as the processor will reduce nproc to 1 if # necessary. Only want to split if we can't even process with nproc = 1 if self.params.integration.mp.n_subset_split: tables = self.reflections.random_split( self.params.integration.mp.n_subset_split ) else: # Need to do a memory check and decide whether to split table. # Split if its size in memory exceeds the fraction of available memory # specified by the PHIL parameter integration.block.max_memory_usage. tables = _iterative_table_split( [self.reflections], self.experiments, MEMORY_LIMIT * self.params.integration.block.max_memory_usage, ) if len(tables) == 1: # will not fail a memory check in the processor, so proceed self.reflections, time_info = _run_processor(self.reflections) else: # Split the reflections and process by performing multiple # passes over each imageset time_info = TimingInfo() reflections = flex.reflection_table() logger.info( """Predicted maximum memory needed exceeds available memory. Splitting reflection table into %s subsets for processing """, len(tables), ) for i, table in enumerate(tables): logger.info("Processing subset %s of reflection table", i + 1) processed, this_time_info = _run_processor(table) reflections.extend(processed) time_info += this_time_info self.reflections = reflections # Finalize the reflections self.reflections, self.experiments = self.finalize_reflections( self.reflections, self.experiments, self.params ) # Create the integration report self.integration_report = IntegrationReport(self.experiments, self.reflections) logger.info("") logger.info(self.integration_report.as_str(prefix=" ")) # Print the time info logger.info("Timing information for integration") logger.info(str(time_info)) logger.info("") # Return the reflections return self.reflections
[docs] def report(self): """ Return the report of the processing """ result = Report() if self.profile_model_report is not None: result.combine(self.profile_model_report) result.combine(self.integration_report) return result
[docs] def summary(self, block_size, block_size_units): """Print a summary of the integration stuff.""" # Compute the task table if self._experiments.all_stills(): rows = [["#", "Group", "Frame From", "Frame To"]] for i in range(len(self)): job = self._manager.job(i) group = job.index() f0, f1 = job.frames() rows.append([str(i), str(group), str(f0), str(f1)]) elif self._experiments.all_sequences(): rows = [["#", "Group", "Frame From", "Frame To", "Angle From", "Angle To"]] for i in range(len(self)): job = self._manager.job(i) group = job.index() expr = job.expr() f0, f1 = job.frames() scan = self._experiments[expr[0]].scan p0 = scan.get_angle_from_array_index(f0) p1 = scan.get_angle_from_array_index(f1) rows.append( [str(i), str(group), str(f0 + 1), str(f1), str(p0), str(p1)] ) else: raise RuntimeError("Experiments must be all sequences or all stills") return tabulate(rows, headers="firstrow")
[docs] class Integrator3D(Integrator): """ Integrator for 3D algorithms """ initialize_reflections = staticmethod(_initialize_rotation) ProcessorClass = Processor3D finalize_reflections = staticmethod(_finalize_rotation)
[docs] class IntegratorFlat3D(Integrator): """ Integrator for flattened 3D algorithms """ initialize_reflections = staticmethod(_initialize_rotation) ProcessorClass = ProcessorFlat3D finalize_reflections = staticmethod(_finalize_rotation)
[docs] class Integrator2D(Integrator): """ Integrator for 2D algorithms """ initialize_reflections = staticmethod(_initialize_rotation) ProcessorClass = Processor2D finalize_reflections = staticmethod(_finalize_rotation)
[docs] class IntegratorSingle2D(Integrator): """ Integrator for 2D algorithms on a single image """ initialize_reflections = staticmethod(_initialize_rotation) ProcessorClass = ProcessorSingle2D finalize_reflections = staticmethod(_finalize_rotation)
[docs] class IntegratorStills(Integrator): """ Integrator for still algorithms """ initialize_reflections = staticmethod(_initialize_stills) ProcessorClass = ProcessorStills finalize_reflections = staticmethod(_finalize_stills)
[docs] class Integrator3DThreaded: """ Integrator for 3D algorithms """
[docs] def __init__(self, experiments, reflections, params): """ Initialize the integrator :param experiments: The experiment list :param reflections: The reflections to process :param params: The parameters to use """ # Save some stuff self.experiments = experiments self.reflections = reflections self.params = params self.profile_model_report = None self.integration_report = None
[docs] def initialise(self): """ Initialise the integrator """ # Compute some reflection properties self.reflections.compute_zeta_multi(self.experiments) self.reflections.compute_d(self.experiments) self.reflections.compute_bbox( self.experiments, sigma_b_multiplier=self.params.integration.profile.sigma_b_multiplier, ) # Filter the reflections by zeta mask = ( flex.abs(self.reflections["zeta"]) < self.params.integration.filter.min_zeta ) self.reflections.set_flags(mask, self.reflections.flags.dont_integrate)
# Filter the reflections by powder ring # if self.params.integration.filter.powder_filter is not None: # mask = self.params.integration.filter.powder_filter(self.reflections['d']) # self.reflections.set_flags(mask, self.reflections.flags.in_powder_ring)
[docs] def finalise(self): """ Finalise the integrator """ # Compute the corrections self.reflections.compute_corrections(self.experiments)
[docs] def integrate(self): """ Integrate the data """ # Init the report self.profile_model_report = None self.integration_report = None # Heading logger.info("=" * 80) logger.info("") logger.info(heading("Processing reflections")) logger.info("") # Create summary format fmt = ( " Processing the following experiments:\n" "\n" " Experiments: %d\n" " Beams: %d\n" " Detectors: %d\n" " Goniometers: %d\n" " Scans: %d\n" " Crystals: %d\n" " Imagesets: %d\n" ) # Print the summary logger.info( fmt, len(self.experiments), len(self.experiments.beams()), len(self.experiments.detectors()), len(self.experiments.goniometers()), len(self.experiments.scans()), len(self.experiments.crystals()), len(self.experiments.imagesets()), ) # Do the initialisation self.initialise() # Do profile modelling if self.params.integration.profile.fitting: logger.info("=" * 80) logger.info("") logger.info(heading("Modelling reflection profiles")) logger.info("") # Compute the reference profiles reference_calculator = ReferenceCalculatorProcessor( experiments=self.experiments, reflections=self.reflections, params=self.params, ) # Get the reference profiles self.reference_profiles = reference_calculator.profiles() else: self.reference_profiles = None logger.info("=" * 80) logger.info("") logger.info(heading("Integrating reflections")) logger.info("") integrator = IntegratorProcessor( experiments=self.experiments, reflections=self.reflections, reference=self.reference_profiles, params=self.params, ) # Process the reflections self.reflections = integrator.reflections() # Do the finalisation self.finalise() # Create the integration report self.integration_report = IntegrationReport(self.experiments, self.reflections) logger.info("") logger.info(self.integration_report.as_str(prefix=" ")) # Print the time info # logger.info("Timing information for integration") # logger.info(str(time_info)) # logger.info("") # Return the reflections return self.reflections
[docs] def report(self): """ Return the report of the processing """ result = Report() if self.profile_model_report is not None: result.combine(self.profile_model_report) result.combine(self.integration_report) return result
[docs] def summary(self, block_size, block_size_units): """Print a summary of the integration stuff.""" # Compute the task table if self._experiments.all_stills(): rows = [["#", "Group", "Frame From", "Frame To"]] for i in range(len(self)): job = self._manager.job(i) group = job.index() f0, f1 = job.frames() rows.append([str(i), str(group), str(f0), str(f1)]) elif self._experiments.all_sequences(): rows = [["#", "Group", "Frame From", "Frame To", "Angle From", "Angle To"]] for i in range(len(self)): job = self._manager.job(i) group = job.index() expr = job.expr() f0, f1 = job.frames() scan = self._experiments[expr[0]].scan p0 = scan.get_angle_from_array_index(f0) p1 = scan.get_angle_from_array_index(f1) rows.append( [str(i), str(group), str(f0 + 1), str(f1), str(p0), str(p1)] ) else: raise RuntimeError("Experiments must be all sequences or all stills") return tabulate(rows, headers="firstrow")
def create_integrator(params, experiments, reflections): """ Create an integrator object with a given configuration. :param params: The input phil parameters :param experiments: The list of experiments :param reflections: The reflections to integrate :return: An integrator object """ # Check each experiment has an imageset for exp in experiments: if exp.imageset is None: raise Sorry( """ One or more experiment does not contain an imageset. Access to the image data is crucial for integration. """ ) # Read the mask in if necessary if params.integration.lookup.mask and isinstance( params.integration.lookup.mask, str ): with open(params.integration.lookup.mask, "rb") as infile: params.integration.lookup.mask = pickle.load(infile, encoding="bytes") # Set algorithms as reflection table defaults BackgroundAlgorithm = dials.extensions.Background.load( params.integration.background.algorithm ) flex.reflection_table.background_algorithm = functools.partial( BackgroundAlgorithm, params ) CentroidAlgorithm = dials.extensions.Centroid.load( params.integration.centroid.algorithm ) flex.reflection_table.centroid_algorithm = functools.partial( CentroidAlgorithm, params ) # Get the classes we need if params.integration.integrator == "auto": if experiments.all_stills(): params.integration.integrator = "stills" else: params.integration.integrator = "3d" IntegratorClass = { "3d": Integrator3D, "flat3d": IntegratorFlat3D, "2d": Integrator2D, "single2d": IntegratorSingle2D, "stills": IntegratorStills, "3d_threaded": Integrator3DThreaded, }.get(params.integration.integrator) if not IntegratorClass: raise ValueError(f"Unknown integration type {params.integration.integrator}") # Remove scan if stills if experiments.all_stills(): for experiment in experiments: experiment.scan = None # Return an instantiation of the class return IntegratorClass(experiments, reflections, params)