Source code for dxtbx.model.scan

from __future__ import annotations

import os

import pycbf

import libtbx.phil
from scitbx.array_family import flex

from dxtbx.model.scan_helpers import scan_helper_image_files

try:
    from ..dxtbx_model_ext import Scan
except ModuleNotFoundError:
    from dxtbx_model_ext import Scan  # type: ignore

scan_phil_scope = libtbx.phil.parse(
    """
  scan
    .expert_level = 1
    .short_caption = "Scan overrides"
  {

    image_range = None
      .type = ints(size=2)
      .help = "Override the image range"
      .short_caption = "Image range"

    extrapolate_scan = False
      .type = bool
      .help = "When overriding the image range, extrapolate exposure and epoch information from existing images"
      .short_caption = "Extrapolate scan"

    oscillation = None
      .type = floats(size=2)
      .help = "Override the image oscillation"
      .short_caption = "Oscillation"

    batch_offset = None
      .type = int(value_min=0)
      .help = "Override the batch offset"
      .short_caption = "Batch offset"
  }
"""
)


[docs] class ScanFactory: """A factory for scan instances, to help with constructing the classes in a set of common circumstances."""
[docs] @staticmethod def from_phil(params, reference=None): """ Generate a scan model from phil parameters """ if reference is None: if params.scan.image_range is None and params.scan.oscillation is None: return None if params.scan.image_range is None: raise RuntimeError("No image range set") if params.scan.oscillation is None: raise RuntimeError("No oscillation set") scan = Scan(params.scan.image_range, params.scan.oscillation) else: scan = reference if params.scan.image_range is not None: most_recent_image_index = ( scan.get_image_range()[1] - scan.get_image_range()[0] ) scan.set_oscillation( scan.get_image_oscillation(params.scan.image_range[0]) ) scan.set_image_range(params.scan.image_range) if ( params.scan.extrapolate_scan and (params.scan.image_range[1] - params.scan.image_range[0]) > most_recent_image_index ): exposure_times = scan.get_exposure_times() epochs = scan.get_epochs() exposure_time = exposure_times[most_recent_image_index] epoch_correction = epochs[most_recent_image_index] for i in range( most_recent_image_index + 1, params.scan.image_range[1] - params.scan.image_range[0] + 1, ): exposure_times[i] = exposure_time epoch_correction += exposure_time epochs[i] = epoch_correction scan.set_epochs(epochs) scan.set_exposure_times(exposure_times) if params.scan.oscillation is not None: scan.set_oscillation(params.scan.oscillation) if params.scan.batch_offset is not None: scan.set_batch_offset(params.scan.batch_offset) return scan
[docs] @staticmethod def from_dict(d, t=None): """Convert the dictionary to a scan model Params: d The dictionary of parameters t The template dictionary to use Returns: The scan model """ def add_properties_table(scan_dict, num_images): """ Handles legacy case before Scan had a properties table. Moves oscillation, epochs, and exposure times to a properties table and adds this to scan_dict. """ properties = {} if scan_dict: if "oscillation" in scan_dict: if num_images == 1: properties["oscillation_width"] = [scan_dict["oscillation"][1]] properties["oscillation"] = [scan_dict["oscillation"][0]] else: osc = scan_dict["oscillation"] properties["oscillation"] = [ osc[0] + (osc[1] - osc[0]) * i for i in range(num_images) ] del scan_dict["oscillation"] if "exposure_time" in scan_dict: properties["exposure_time"] = scan_dict["exposure_time"] del scan_dict["exposure_time"] if "epochs" in scan_dict: properties["epochs"] = scan_dict["epochs"] del scan_dict["epochs"] scan_dict["properties"] = make_properties_table_consistent( properties, num_images ) return scan_dict def make_properties_table_consistent(properties, num_images): """ Handles legacy case before Scan had a properties table. Ensures oscillation, epochs, and exposure times have the same length. """ if not properties: return properties property_length = len(next(iter(properties.values()))) all_same_length = all( len(lst) == property_length for lst in properties.values() ) if all_same_length and property_length == num_images: return properties if "oscillation" in properties: assert len(properties["oscillation"]) > 0 if num_images == 1 and "oscillation_width" not in properties: assert len(properties["oscillation"]) > 1 properties["oscillation_width"] = [properties["oscillation"][1]] properties["oscillation"] = [properties["oscillation"][0]] elif num_images > 1: osc_0 = properties["oscillation"][0] if "oscillation_width" in properties: osc_1 = properties["oscillation_width"][0] del properties["oscillation_width"] else: assert len(properties["oscillation"]) > 1 osc_1 = ( properties["oscillation"][1] - properties["oscillation"][0] ) properties["oscillation"] = [ osc_0 + osc_1 * i for i in range(num_images) ] if "exposure_time" in properties: assert len(properties["exposure_time"]) > 0 # Assume same exposure time for each image properties["exposure_time"] = [ properties["exposure_time"][0] for i in range(num_images) ] if "epochs" in properties: assert len(properties["epochs"]) > 0 # If 1 epoch, assume increasing by epochs[0] # Else assume increasing as epochs[1] - epochs[0] if len(properties["epochs"]) == 1: properties["epochs"] = [ properties["epochs"][0] + properties["epochs"][0] * i for i in range(num_images) ] else: diff = properties["epochs"][1] - properties["epochs"][0] properties["epochs"] = [ properties["epochs"][0] + i * diff for i in range(num_images) ] return properties if d is None and t is None: return None joint = t.copy() if t else {} # Accounting for legacy cases where t or d does not # contain properties dict num_images = None if "image_range" in d: num_images = 1 + d["image_range"][1] - d["image_range"][0] elif "image_range" in joint: num_images = 1 + joint["image_range"][1] - joint["image_range"][0] if "properties" in joint and "properties" in d: properties = t["properties"].copy() properties.update(d["properties"]) joint.update(d) joint["properties"] = properties elif "properties" in d: joint = add_properties_table(joint, num_images) d_copy = d.copy() joint["properties"].update(d_copy["properties"]) joint["properties"] = make_properties_table_consistent( joint["properties"], num_images ) del d_copy["properties"] joint.update(d_copy) elif "properties" in joint: d = add_properties_table(d, num_images) d_copy = d.copy() joint["properties"].update(d_copy["properties"]) joint["properties"] = make_properties_table_consistent( joint["properties"], num_images ) del d_copy["properties"] joint.update(d_copy) else: joint.update(d) if "properties" not in d and not isinstance(joint["exposure_time"], list): joint["exposure_time"] = [joint["exposure_time"]] joint.setdefault("batch_offset", 0) # backwards compatibility 20180205 joint.setdefault("valid_image_ranges", {}) # backwards compatibility 20181113 return Scan.from_dict(joint)
[docs] @staticmethod def make_scan( image_range, exposure_times, oscillation, epochs, batch_offset=0, deg=True ): if not isinstance(exposure_times, list): num_images = image_range[1] - image_range[0] + 1 exposure_times = [exposure_times for i in range(num_images)] else: num_images = image_range[1] - image_range[0] + 1 num_exp = len(exposure_times) if num_exp != num_images: if num_exp == 0: exposure_times = [0 for i in range(num_images)] else: exposure_times = exposure_times.extend( [exposure_times[-1] for i in range(num_images - num_exp)] ) epoch_list = [epochs[j] for j in sorted(epochs)] return Scan( tuple(map(int, image_range)), tuple(map(float, oscillation)), flex.double(list(map(float, exposure_times))), flex.double(list(map(float, epoch_list))), batch_offset, deg, )
[docs] @staticmethod def make_scan_from_properties(image_range, properties, batch_offset=0, deg=True): return Scan(tuple(map(int, image_range)), properties, batch_offset, deg)
[docs] @staticmethod def single_file(filename, exposure_times, osc_start, osc_width, epoch): """Construct an scan instance for a single image.""" index = scan_helper_image_files.image_to_index(os.path.split(filename)[-1]) if epoch is None: epoch = 0.0 # if the oscillation width is negative at this stage it is almost # certainly an artefact of the omega end being 0 when the omega start # angle was ~ 360 so it would be ~ -360 - see dxtbx#378 if osc_width < -180: osc_width += 360 return ScanFactory.make_scan( (index, index), exposure_times, (osc_start, osc_width), {index: epoch} )
[docs] @staticmethod def imgCIF(cif_file): """Initialize a scan model from an imgCIF file.""" cbf_handle = pycbf.cbf_handle_struct() cbf_handle.read_file(cif_file, pycbf.MSG_DIGEST) return ScanFactory.imgCIF_H(cif_file, cbf_handle)
[docs] @staticmethod def imgCIF_H(cif_file, cbf_handle): """Initialize a scan model from an imgCIF file handle, where it is assumed that the file has already been read.""" exposure = cbf_handle.get_integration_time() timestamp = cbf_handle.get_timestamp()[0] gonio = cbf_handle.construct_goniometer() try: angles = tuple(gonio.get_rotation_range()) except Exception as e: if str(e).strip() == "CBFlib Error(s): CBF_NOTFOUND": # probably a still shot -> no scan object return None raise # xia2-56 handle gracefully reverse turning goniometers - this assumes the # rotation axis is correctly inverted in the goniometer factory if angles[1] < 0: angles = -angles[0], -angles[1] index = scan_helper_image_files.image_to_index(cif_file) gonio.__swig_destroy__(gonio) return ScanFactory.make_scan( (index, index), exposure, angles, {index: timestamp} )
[docs] @staticmethod def add(scans): """Sum a list of scans wrapping the slightly clumsy idiomatic method: sum(scans[1:], scans[0]).""" return sum(scans[1:], scans[0])
[docs] @staticmethod def search(filename): """Get a list of files which appear to match the template and directory implied by the input filename. This could well be used to get a list of image headers to read and hence construct scans from.""" template, directory = scan_helper_image_files.image_to_template_directory( filename ) indices = scan_helper_image_files.template_directory_to_indices( template, directory ) return [ scan_helper_image_files.template_directory_index_to_image( template, directory, index ) for index in indices ]