Source code for dials.util.export_mtz

from __future__ import annotations

import logging
import time
import warnings
from collections import Counter
from copy import deepcopy
from dataclasses import dataclass, field
from math import isclose
from typing import List, Optional

import gemmi
import numpy as np
import pandas as pd

from cctbx import uctbx
from dxtbx import flumpy
from iotbx import mtz
from libtbx import env
from rstbx.cftbx.coordinate_frame_helpers import align_reference_frame
from scitbx import matrix
from scitbx.math import r3_rotation_axis_and_angle_from_matrix

import dials.util.ext
from dials.algorithms.scaling.scaling_library import (
    MergedHalfDatasets,
    determine_best_unit_cell,
)
from dials.array_family import flex
from dials.util.batch_handling import (
    assign_batches_to_reflections,
    calculate_batch_offsets,
    get_image_ranges,
)
from dials.util.filter_reflections import filter_reflection_table
from dials.util.multi_dataset_handling import (
    assign_unique_identifiers,
    parse_multiple_datasets,
)
from dials.util.reindex import reindex_experiments, reindex_reflections
from dials.util.version import dials_version

logger = logging.getLogger(__name__)


[docs] class MTZWriterBase: """Helper for adding metadata, crystals and datasets to an mtz file object."""
[docs] def __init__(self, space_group, unit_cell=None): """If a unit cell is provided, will be used as default unless specified for each crystal.""" warnings.warn( "MTZWriterBase classes (MergedMTZWriter and MADMergedMTZWriter) are deprecated. Use MergedMTZCreator instead.\n", DeprecationWarning, stacklevel=2, ) mtz_file = mtz.object() mtz_file.set_title(f"From {env.dispatcher_name}") date_str = time.strftime("%Y-%m-%d at %H:%M:%S %Z") if time.strftime("%Z") != "GMT": date_str += time.strftime(" (%Y-%m-%d at %H:%M:%S %Z)", time.gmtime()) mtz_file.add_history(f"From {dials_version()}, run on {date_str}") mtz_file.set_space_group_info(space_group.info()) self.mtz_file = mtz_file if unit_cell: self.unit_cell = unit_cell self.current_crystal = None self.current_dataset = None self.n_crystals = 0 self.n_datasets = 0
[docs] def add_crystal(self, crystal_name=None, project_name=None, unit_cell=None): """Add a crystal to the mtz file object.""" if not unit_cell: if not self.unit_cell: raise ValueError("Unit cell must be provided.") else: unit_cell = self.unit_cell if not crystal_name: crystal_name = f"crystal_{self.n_crystals + 1}" if not project_name: project_name = "DIALS" self.current_crystal = self.mtz_file.add_crystal( crystal_name, project_name, unit_cell.parameters() ) self.n_crystals += 1
[docs] def add_empty_dataset(self, wavelength, name=None): """Add an empty dataset object to the mtz file.""" if not name: name = "FROMDIALS" self.current_dataset = self.current_crystal.add_dataset(name, wavelength) self.n_datasets += 1
[docs] class MergedMTZWriter(MTZWriterBase): """Mtz writer for merged data."""
[docs] def add_dataset( self, merged_array=None, anom_array=None, amplitudes=None, anom_amplitudes=None, dano=None, multiplicities=None, anom_multiplicities=None, suffix=None, half_datasets: Optional[MergedHalfDatasets] = None, r_free_array=None, ): """Add merged data to the most recent dataset. Args: merged_array: A merged miller array of IMEAN intensities wavelength: The wavelength of the dataset anom_array (Optional): An anomalous merged miller array amplitudes (Optional): A merged miller array of amplitudes anom_amplitudes (Optional): An anomalous merged array of amplitudes suffix (Optional[str]): Column name suffix to use for this dataset. """ if not suffix: suffix = "" if merged_array: self.current_dataset.add_miller_array(merged_array, "IMEAN" + suffix) if multiplicities: self.current_dataset.add_miller_array(multiplicities, "N" + suffix) if amplitudes: self.current_dataset.add_miller_array(amplitudes, "F" + suffix) if anom_array: self.current_dataset.add_miller_array(anom_array, "I" + suffix) if anom_multiplicities: self.current_dataset.add_miller_array(anom_multiplicities, "N" + suffix) if anom_amplitudes: self.current_dataset.add_miller_array(anom_amplitudes, "F" + suffix) if dano: self.current_dataset.add_miller_array( dano, "DANO" + suffix, column_types="DQ" ) if half_datasets: self.current_dataset.add_miller_array( half_datasets.data1, "IHALF1" + suffix, column_types="JQ" ) self.current_dataset.add_miller_array( half_datasets.data2, "IHALF2" + suffix, column_types="JQ" ) self.current_dataset.add_miller_array( half_datasets.multiplicity1, "NHALF1" + suffix, ) self.current_dataset.add_miller_array( half_datasets.multiplicity2, "NHALF2" + suffix, ) if r_free_array: self.current_dataset.add_miller_array( r_free_array, column_root_label="FreeR_flag", column_types="I" )
[docs] class MADMergedMTZWriter(MergedMTZWriter): """Mtz writer for multi-wavelength merged data."""
[docs] def add_dataset( self, merged_array=None, anom_array=None, amplitudes=None, anom_amplitudes=None, dano=None, multiplicities=None, anom_multiplicities=None, suffix=None, half_datasets: Optional[MergedHalfDatasets] = None, r_free_array=None, ): if not suffix: suffix = f"_WAVE{self.n_datasets}" super().add_dataset( merged_array, anom_array, amplitudes, anom_amplitudes, dano, multiplicities, anom_multiplicities, suffix, half_datasets, r_free_array=r_free_array, )
[docs] def add_batch_list( mtz, image_range, experiment, wavelength, dataset_id, batch_offset, force_static_model, ): """Add batch metadata to the gemmi mtz object.""" # Recalculate useful numbers and references here n_batches = image_range[1] - image_range[0] + 1 phi_start = flex.float(n_batches, 0) phi_range = flex.float(n_batches, 0) umat_array = flex.float(flex.grid(n_batches, 9)) cell_array = flex.float(flex.grid(n_batches, 6)) # Reciprocal lattice vectors in the lab frame at zero scan angle if experiment.goniometer: S = matrix.sqr(experiment.goniometer.get_setting_rotation()) F = matrix.sqr(experiment.goniometer.get_fixed_rotation()) UBlab = S * F * matrix.sqr(experiment.crystal.get_A()) axis = matrix.col(experiment.goniometer.get_rotation_axis()) axis_datum = matrix.col(experiment.goniometer.get_rotation_axis_datum()) else: UBlab = matrix.sqr(experiment.crystal.get_A()) i0 = image_range[0] for i in range(n_batches): if experiment.scan and experiment.scan.get_oscillation()[1] != 0.0: phi_start[i], phi_range[i] = experiment.scan.get_image_oscillation(i + i0) # Unit cell and UB matrix for the centre of the image for scan-varying model if ( not force_static_model and experiment.crystal.num_scan_points > 0 and experiment.goniometer ): # Get the index of the image in the sequence e.g. first => 0, second => 1 image_index = i + i0 - experiment.scan.get_image_range()[0] # Find the U matrix at the frame centre by calculating the linear transform # that goes from the start of the frame to the end, and then applying half of # that to the start value U0 = matrix.sqr(experiment.crystal.get_U_at_scan_point(image_index)) U1 = matrix.sqr(experiment.crystal.get_U_at_scan_point(image_index + 1)) M = U1 * U0.inverse() ( angle_M, axis_M, ) = M.r3_rotation_matrix_as_unit_quaternion().unit_quaternion_as_axis_and_angle( deg=False ) M_half = axis_M.axis_and_angle_as_r3_rotation_matrix(angle_M / 2, deg=False) Ucentre = M_half * U0 # Find the B matrix at the frame centre by interpolation B0 = matrix.sqr(experiment.crystal.get_B_at_scan_point(image_index)) B1 = matrix.sqr(experiment.crystal.get_B_at_scan_point(image_index + 1)) Bcentre = (B0 + B1) / 2 # Unit cell at the frame centre unit_cell = uctbx.unit_cell( orthogonalization_matrix=Bcentre.transpose().inverse() ) # Get full lab frame UB then unwind to zero scan angle phi_centre = phi_start[i] + phi_range[i] / 2 R = matrix.sqr( axis_datum.axis_and_angle_as_r3_rotation_matrix(phi_centre, deg=False) ) Rlab_inv = matrix.sqr( axis.axis_and_angle_as_r3_rotation_matrix(-phi_centre, deg=False) ) _UBlab = Rlab_inv * S * R * F * Ucentre * Bcentre else: unit_cell = experiment.crystal.get_unit_cell() _UBlab = UBlab # We assume a single-axis goniometer as it is not clear that multi- # axis goniometry was ever fully supported in MTZ format. Orientation # will be taken from the laboratory frame for this image. U = matrix.sqr(dials.util.ext.ub_to_mosflm_u(_UBlab, unit_cell)) # FIXME need to get what was refined and what was constrained from the # crystal model - see https://github.com/dials/dials/issues/355 _unit_cell_params = unit_cell.parameters() for j in range(6): cell_array[i, j] = _unit_cell_params[j] # Transpose to put in column-major order for MTZ export U_t_elements = U.transpose().elems for j in range(9): umat_array[i, j] = U_t_elements[j] # We ignore panels beyond the first one, at the moment panel = experiment.detector[0] panel_size = panel.get_image_size() panel_distance = panel.get_directed_distance() if experiment.goniometer: axis = flex.float(experiment.goniometer.get_rotation_axis()) else: axis = flex.float((0.0, 0.0, 0.0)) source = flex.float(experiment.beam.get_sample_to_source_direction()) # get the mosaic spread though today it may not actually be set - should # this be in the BATCH headers? try: mosaic = experiment.crystal.get_mosaicity() except AttributeError: mosaic = 0.0 max_batch_number = 0 if mtz.batches: max_batch_number = mtz.batches[-1].number batch_offset += image_range[0] - 1 if max_batch_number > batch_offset: batch_offset = max_batch_number batch = gemmi.Mtz.Batch() # Setting fields that are the same for all batches batch.dataset_id = dataset_id batch.wavelength = wavelength batch.ints[12] = 1 # ncryst batch.ints[14] = 2 # ldtype 3D batch.ints[15] = 1 # jsaxs - goniostat scan axis number batch.ints[17] = 1 # ngonax - number of goniostat axes batch.ints[19] = 1 # ndet batch.floats[21] = mosaic # crydat[0] for j in range(3): batch.floats[38 + j] = axis[j] # scanax batch.floats[43] = 1.0 # bscale (batch scale) for j in range(3): batch.floats[59 + j] = axis[j] # e1 batch.floats[80 + flex.min_index(source)] = -1.0 # idealised source vector for j in range(3): batch.floats[83 + j] = source[j] # source including tilts batch.floats[111] = panel_distance # dx batch.floats[114] = panel_size[0] # NX batch.floats[116] = panel_size[1] # NY batch.axes = ["AXIS"] # gonlab[0] # Setting fields that differ for i_batch in range(n_batches): batch.number = batch_offset + i_batch + 1 batch.title = f"Batch {batch.number}" for j in range(6): batch.floats[j] = cell_array[i_batch, j] # cell for j in range(9): batch.floats[6 + j] = umat_array[i_batch, j] # Umat batch.floats[36] = phi_start[i_batch] # phistt batch.floats[37] = phi_start[i_batch] + phi_range[i_batch] # phiend batch.floats[47] = phi_range[i_batch] # phirange # Append this batch mtz.batches.append(batch) return
[docs] def write_columns(mtz, reflection_table): """Write the column definitions AND data to the current dataset.""" nref = len(reflection_table["miller_index"]) assert nref xdet, ydet, _ = [ flex.double(x) for x in reflection_table["xyzobs.px.value"].parts() ] type_table = { "H": "H", "K": "H", "L": "H", "I": "J", "SIGI": "Q", "IPR": "J", "SIGIPR": "Q", "BG": "R", "SIGBG": "R", "XDET": "R", "YDET": "R", "BATCH": "B", "BGPKRATIOS": "R", "WIDTH": "R", "MPART": "I", "M_ISYM": "Y", "FLAG": "I", "LP": "R", "FRACTIONCALC": "R", "ROT": "R", "QE": "R", } mtz_data = pd.DataFrame( flumpy.to_numpy(reflection_table["miller_index"]).astype("float32"), columns=["H", "K", "L"], ) mtz_data.insert(3, "M/ISYM", np.zeros(nref, dtype="float32")) # H, K, L are in the base dataset, but we have to add M/ISYM mtz.add_column("M/ISYM", type_table["M_ISYM"]) mtz.add_column("BATCH", type_table["BATCH"]) mtz_data.insert( 4, "BATCH", flumpy.to_numpy(reflection_table["batch"]).astype("float32") ) # if intensity values used in scaling exist, then just export these as I, SIGI if "intensity.scale.value" in reflection_table: I_scaling = reflection_table["intensity.scale.value"] V_scaling = reflection_table["intensity.scale.variance"] assert V_scaling.all_gt(0) # Trap negative variances mtz.add_column("I", type_table["I"]) mtz_data.insert( len(mtz_data.columns), "I", flumpy.to_numpy(I_scaling).astype("float32") ) mtz.add_column("SIGI", type_table["SIGI"]) mtz_data.insert( len(mtz_data.columns), "SIGI", flumpy.to_numpy(flex.sqrt(V_scaling)).astype("float32"), ) mtz.add_column("SCALEUSED", "R") mtz_data.insert( len(mtz_data.columns), "SCALEUSED", flumpy.to_numpy(reflection_table["inverse_scale_factor"]).astype("float32"), ) mtz.add_column("SIGSCALEUSED", "R") mtz_data.insert( len(mtz_data.columns), "SIGSCALEUSED", flumpy.to_numpy( flex.sqrt(reflection_table["inverse_scale_factor_variance"]) ).astype("float32"), ) else: if "intensity.prf.value" in reflection_table: if "intensity.sum.value" in reflection_table: col_names = ("IPR", "SIGIPR") else: col_names = ("I", "SIGI") I_profile = reflection_table["intensity.prf.value"] V_profile = reflection_table["intensity.prf.variance"] assert V_profile.all_gt(0) # Trap negative variances mtz.add_column(col_names[0], type_table["I"]) mtz_data.insert( len(mtz_data.columns), col_names[0], flumpy.to_numpy(I_profile.as_float()).astype("float32"), ) mtz.add_column(col_names[1], type_table["SIGI"]) mtz_data.insert( len(mtz_data.columns), col_names[1], flumpy.to_numpy(flex.sqrt(V_profile)).astype("float32"), ) if "intensity.sum.value" in reflection_table: I_sum = reflection_table["intensity.sum.value"] V_sum = reflection_table["intensity.sum.variance"] assert V_sum.all_gt(0) # Trap negative variances mtz.add_column("I", type_table["I"]) mtz_data.insert( len(mtz_data.columns), "I", flumpy.to_numpy(I_sum).astype("float32") ) mtz.add_column("SIGI", type_table["SIGI"]) mtz_data.insert( len(mtz_data.columns), "SIGI", flumpy.to_numpy(flex.sqrt(V_sum)).astype("float32"), ) if ( "background.sum.value" in reflection_table and "background.sum.variance" in reflection_table ): bg = reflection_table["background.sum.value"] varbg = reflection_table["background.sum.variance"] assert (varbg >= 0).count(False) == 0 sigbg = flex.sqrt(varbg) mtz.add_column("BG", type_table["BG"]) mtz_data.insert( len(mtz_data.columns), "BG", flumpy.to_numpy(bg).astype("float32") ) mtz.add_column("SIGBG", type_table["SIGBG"]) mtz_data.insert( len(mtz_data.columns), "SIGBG", flumpy.to_numpy(sigbg).astype("float32") ) mtz.add_column("FRACTIONCALC", type_table["FRACTIONCALC"]) mtz_data.insert( len(mtz_data.columns), "FRACTIONCALC", flumpy.to_numpy(reflection_table["fractioncalc"]).astype("float32"), ) mtz.add_column("XDET", type_table["XDET"]) mtz_data.insert( len(mtz_data.columns), "XDET", flumpy.to_numpy(xdet).astype("float32") ) mtz.add_column("YDET", type_table["YDET"]) mtz_data.insert( len(mtz_data.columns), "YDET", flumpy.to_numpy(ydet).astype("float32") ) mtz.add_column("ROT", type_table["ROT"]) mtz_data.insert( len(mtz_data.columns), "ROT", flumpy.to_numpy(reflection_table["ROT"]).astype("float32"), ) if "lp" in reflection_table: mtz.add_column("LP", type_table["LP"]) mtz_data.insert( len(mtz_data.columns), "LP", flumpy.to_numpy(reflection_table["lp"]).astype("float32"), ) if "qe" in reflection_table: mtz.add_column("QE", type_table["QE"]) mtz_data.insert( len(mtz_data.columns), "QE", flumpy.to_numpy(reflection_table["qe"]).astype("float32"), ) elif "dqe" in reflection_table: mtz.add_column("QE", type_table["QE"]) mtz_data.insert( len(mtz_data.columns), "QE", flumpy.to_numpy(reflection_table["dqe"]).astype("float32"), ) else: mtz.add_column("QE", type_table["QE"]) mtz_data.insert(len(mtz_data.columns), "QE", np.ones(nref).astype("float32")) mtz.switch_to_original_hkl() mtz.set_data(mtz_data)
[docs] def export_mtz( reflection_table, experiment_list, intensity_choice, filename, best_unit_cell=None, partiality_threshold=0.4, combine_partials=True, min_isigi=-5, filter_ice_rings=False, d_min=None, force_static_model=False, crystal_name=None, project_name=None, wavelength_tolerance=1e-4, ): """Export data from reflection_table corresponding to experiment_list to an MTZ file hklout.""" # First get the experiment identifier information out of the data expids_in_table = reflection_table.experiment_identifiers() if not list(expids_in_table.keys()): reflection_tables = parse_multiple_datasets([reflection_table]) experiment_list, refl_list = assign_unique_identifiers( experiment_list, reflection_tables ) reflection_table = flex.reflection_table() for reflections in refl_list: reflection_table.extend(reflections) expids_in_table = reflection_table.experiment_identifiers() reflection_table.assert_experiment_identifiers_are_consistent(experiment_list) expids_in_list = list(experiment_list.identifiers()) # Convert geometry to the Cambridge frame experiment_list = convert_to_cambridge(experiment_list) # Validate multi-experiment assumptions if len(experiment_list) > 1: # All experiments should match crystals, or else we need multiple crystals/datasets if not all( x.crystal == experiment_list[0].crystal for x in experiment_list[1:] ): logger.warning( "Experiment crystals differ. Using first experiment crystal for file-level data." ) # At least, all experiments must have the same space group if len({x.crystal.get_space_group().make_tidy() for x in experiment_list}) != 1: raise ValueError("Experiments do not have a unique space group") # Reindex to a tabulated setting if necessary sg = experiment_list[0].crystal.get_space_group() if sg.match_tabulated_settings().number() == 0: logger.warning( "The data will be reindexed to a tabulated setting of the space group" ) cb_op = sg.info().change_of_basis_op_to_reference_setting() experiment_list = reindex_experiments(experiment_list, cb_op) reflection_table = reindex_reflections( [ reflection_table, ], cb_op, ) # Convert experiment_list to a real python list or else identity assumptions # fail like: # assert experiment_list[0] is experiment_list[0] # And assumptions about added attributes break experiment_list = list(experiment_list) wavelengths = match_wavelengths(experiment_list, wavelength_tolerance) for w in wavelengths.values(): w.calculate_weighted_mean([reflection_table]) if len(wavelengths) > 1: identifiers_list = [e.identifier for e in experiment_list] logger.info( "Multiple wavelengths found: \n%s", "\n".join( " Wavelength: %.5f, experiment numbers: %s " % ( v.weighted_mean, ",".join( map(str, [identifiers_list.index(i) for i in v.identifiers]) ), ) for v in wavelengths.values() ), ) # also only work correctly with one panel (for the moment) if any(len(experiment.detector) != 1 for experiment in experiment_list): logger.warning("Ignoring multiple panels in output MTZ") if best_unit_cell is None: best_unit_cell = determine_best_unit_cell(experiment_list) reflection_table["d"] = best_unit_cell.d(reflection_table["miller_index"]) # Clean up the data with the passed in options reflection_table = filter_reflection_table( reflection_table, intensity_choice=intensity_choice, partiality_threshold=partiality_threshold, combine_partials=combine_partials, min_isigi=min_isigi, filter_ice_rings=filter_ice_rings, d_min=d_min, ) # get batch offsets and image ranges - even for scanless experiments batch_offsets = [ expt.scan.get_batch_offset() for expt in experiment_list if expt.scan is not None ] unique_offsets = set(batch_offsets) if len(set(unique_offsets)) <= 1: logger.debug("Calculating new batches") batch_offsets = calculate_batch_offsets(experiment_list) batch_starts = [ e.scan.get_image_range()[0] if e.scan else 0 for e in experiment_list ] effective_offsets = [o + s for o, s in zip(batch_offsets, batch_starts)] unique_offsets = set(effective_offsets) else: logger.debug("Keeping existing batches") image_ranges = get_image_ranges(experiment_list) if len(unique_offsets) != len(batch_offsets): raise ValueError( "Duplicate batch offsets detected: %s" % ", ".join( str(item) for item, count in Counter(batch_offsets).items() if count > 1 ) ) # Create the mtz file mtz = gemmi.Mtz(with_base=True) mtz.title = f"From {env.dispatcher_name}" date_str = time.strftime("%Y-%m-%d at %H:%M:%S %Z") if time.strftime("%Z") != "GMT": date_str += time.strftime(" (%Y-%m-%d at %H:%M:%S %Z)", time.gmtime()) mtz.history += [ f"From {dials_version()}, run on {date_str}", ] # Create the right gemmi spacegroup from the crystal's cctbx space_group # via a Hall symbol hall = experiment_list[0].crystal.get_space_group().type().hall_symbol() ops = gemmi.symops_from_hall(hall) mtz.spacegroup = gemmi.find_spacegroup_by_ops(ops) # FIXME TODO for more than one experiment into an MTZ file: # # - add an epoch (or recover an epoch) from the scan and add this as an extra # column to the MTZ file for scaling, so we know that the two lattices were # integrated at the same time # ✓ decide a sensible BATCH increment to apply to the BATCH value between # experiments and add this for id_ in expids_in_table.keys(): # Grab our subset of the data loc = expids_in_list.index( expids_in_table[id_] ) # get strid and use to find loc in list experiment = experiment_list[loc] identifier = experiment.identifier if len(wavelengths) > 1: for i, wl in enumerate(wavelengths.values()): if identifier in wl.identifiers: wavelength = wl.weighted_mean dataset_id = i + 1 break else: wavelength = list(wavelengths.values())[0].weighted_mean dataset_id = 1 reflections = reflection_table.select(reflection_table["id"] == id_) batch_offset = batch_offsets[loc] image_range = image_ranges[loc] reflections = assign_batches_to_reflections([reflections], [batch_offset])[0] experiment.data = dict(reflections) s0n = matrix.col(experiment.beam.get_s0()).normalize().elems logger.debug("Beam vector: %.4f %.4f %.4f" % s0n) add_batch_list( mtz, image_range, experiment, wavelength, dataset_id, batch_offset=batch_offset, force_static_model=force_static_model, ) # Create the batch offset array. This gives us an experiment (id)-dependent # batch offset to calculate the correct batch from image number. experiment.data["batch_offset"] = flex.int( len(experiment.data["id"]), batch_offset ) # Calculate whether we have a ROT value for this experiment, and set the column _, _, z = experiment.data["xyzcal.px"].parts() if experiment.scan: experiment.data["ROT"] = experiment.scan.get_angle_from_array_index(z) else: experiment.data["ROT"] = z mtz.set_cell_for_all(gemmi.UnitCell(*best_unit_cell.parameters())) # For multi-wave unmerged mtz, we add an empty dataset for each wavelength, # but only write the data into the final dataset (for unmerged the batches # link the unmerged data to the individual wavelengths). for wavelength in wavelengths.values(): ds = mtz.add_dataset("FROMDIALS") ds.crystal_name = crystal_name ds.project_name = project_name ds.wavelength = wavelength.weighted_mean # Combine all of the experiment data columns before writing combined_data = {k: v.deep_copy() for k, v in experiment_list[0].data.items()} for experiment in experiment_list[1:]: for k, v in experiment.data.items(): combined_data[k].extend(v) # ALL columns must be the same length assert len({len(v) for v in combined_data.values()}) == 1, "Column length mismatch" assert len(combined_data["id"]) == len( reflection_table["id"] ), "Lost rows in split/combine" # Write all the data and columns to the mtz file write_columns(mtz, combined_data) # Switch to ASU indices and sort file in standard order mtz.switch_to_asu_hkl() mtz.sort(5) logger.info( "Saving %s integrated reflections to %s", len(combined_data["id"]), filename ) mtz.write_to_file(filename) log_summary(mtz) return mtz
[docs] def log_summary(mtz): """Log a summary of an MTZ object, based on the output of `gemmi mtz --dump`""" logger.info("Title: " + mtz.title) logger.info(f"Total Number of Datasets = {len(mtz.datasets)}\n") for ds in mtz.datasets: logger.info( f"Dataset {ds.id:4d} {ds.project_name} > {ds.crystal_name} > {ds.dataset_name}:" ) logger.info( " cell {:7g} {:7g} {:7g} {:6g} {:6g} {:6g}".format( *ds.cell.parameters ) ) logger.info(f" wavelength {ds.wavelength:g}") logger.info(f"\nNumber of Columns = {len(mtz.columns)}") logger.info( f"Number of Reflections = {mtz.nreflections}", ) logger.info(f"Number of Batches = {len(mtz.batches)}") logger.info(f"Missing values marked as: {mtz.valm}") logger.info( "Global Cell (obsolete): {:7g} {:7g} {:7g} {:6g} {:6g} {:6g}".format( *mtz.cell.parameters ) ) mtz.update_reso() logger.info( f"Resolution: {mtz.resolution_high():.2f} - {mtz.resolution_low():.2f} A" ) logger.info("Sort Order: {:d} {:d} {:d} {:d} {:d}".format(*mtz.sort_order)) logger.info(f"Space Group: {mtz.spacegroup.hm}") logger.info(f"Space Group Number: {mtz.spacegroup.ccp4}") logger.info("Header info:") logger.info("Column Type Dataset Min Max") for col in mtz.columns: # col.min_value and col.max_value are not set, so we have to calculate them here logger.info( f"{col.label:<12s} {col.type} {col.dataset_id:2d} {np.nanmin(col.array):12.6g} {np.nanmax(col.array):10.6g}" ) logger.info(f"History ({len(mtz.history)} lines):") for line in mtz.history: logger.info(line)
[docs] @dataclass class WavelengthGroup: min_wl: float max_possible_wl: float identifiers: list[str] = field(default_factory=list) exp_nos: list[int] = field(default_factory=list) wavelengths: list[float] = field(default_factory=list) weighted_mean: float = 0
[docs] def add_experiment(self, identifier: str, loc_in_list: int, wl: float) -> None: self.identifiers.append(identifier) self.exp_nos.append(loc_in_list) self.wavelengths.append(wl)
[docs] def calculate_weighted_mean( self, reflection_tables: List[flex.reflection_table] ) -> None: n, nw = (0, 0) for i, w in zip(self.identifiers, self.wavelengths): for table in reflection_tables: refls = table.select_on_experiment_identifiers([i]) n_this = refls.select( refls.get_flags(refls.flags.integrated, all=False) ).size() if n_this: n += n_this nw += n_this * w break if n: self.weighted_mean = nw / n
[docs] def match_wavelengths(experiments, absolute_tolerance=1e-4): wavelengths = {} for i, x in enumerate(experiments): w = x.beam.get_wavelength() matches = [isclose(w, k, abs_tol=absolute_tolerance) for k in wavelengths] if not any(matches): wavelengths[w] = WavelengthGroup(w, w + absolute_tolerance) wavelengths[w].add_experiment(x.identifier, i, w) else: match_w = list(wavelengths.keys())[matches.index(True)] wavelengths[match_w].add_experiment(x.identifier, i, w) return wavelengths
[docs] def convert_to_cambridge(experiments): """Rotate the geometry of an experiment list to match the Cambridge frame, in which X is along the idealized X-ray beam and Z is along the primary rotation axis""" # First handle the potential for shared experiment models - we don't # want to apply multiple transformations to shared models, simplest way is # to make a copy for each experiment. n_expt = len(experiments) if len(experiments.crystals()) < n_expt: for expt in experiments: expt.crystal = deepcopy(expt.crystal) if len(experiments.beams()) < n_expt: for expt in experiments: expt.beam = deepcopy(expt.beam) if len(experiments.detectors()) < n_expt: for expt in experiments: expt.detector = deepcopy(expt.detector) if any(experiments.goniometers()) and len(experiments.goniometers()) < n_expt: for expt in experiments: expt.goniometer = deepcopy(expt.goniometer) for expt in experiments: if expt.goniometer: primary_axis = matrix.col(expt.goniometer.get_rotation_axis_datum()) else: primary_axis = matrix.col((1.0, 0.0, 0.0)) us0 = expt.beam.get_unit_s0() R = align_reference_frame(primary_axis, (0, 0, 1), us0, (1, 0, 0)) axis_angle = r3_rotation_axis_and_angle_from_matrix(R) axis = matrix.col(axis_angle.axis) angle = axis_angle.angle() logger.debug( f"Rotating experiment{'s' if len(experiments) else ''} about axis {axis.elems} by {np.degrees(angle):.2f}°" ) expt.detector.rotate_around_origin(axis, angle, deg=False) expt.beam.rotate_around_origin(axis, angle, deg=False) # For the goniometer, each component needs transformation if expt.goniometer: F = matrix.sqr(expt.goniometer.get_fixed_rotation()) expt.goniometer.set_fixed_rotation(R * F * R.transpose()) expt.goniometer.set_rotation_axis_datum(R * primary_axis) S = matrix.sqr(expt.goniometer.get_setting_rotation()) expt.goniometer.set_setting_rotation(R * S * R.transpose()) if expt.crystal is not None: expt.crystal = rotate_crystal(expt.crystal, R, axis, angle) return experiments
[docs] def rotate_crystal(crystal, Rmat, axis, angle): Amats = [] if crystal.num_scan_points > 0: scan_pts = list(range(crystal.num_scan_points)) Amats = [ Rmat * matrix.sqr(crystal.get_U_at_scan_point(t)) * matrix.sqr(crystal.get_B_at_scan_point(t)) for t in scan_pts ] crystal.rotate_around_origin(axis, angle, deg=False) if Amats: crystal.set_A_at_scan_points(Amats) return crystal