Click here to go to the corresponding page for the latest version of DIALS
Source code for dials.algorithms.scaling.model.model
"""
Definitions of scaling models.
A scaling model is a collection of scaling model components with appropriate
methods to define how these are composed into one model.
"""
from __future__ import absolute_import, division, print_function
import logging
from collections import OrderedDict
from libtbx import phil
from dials.array_family import flex
from dials.algorithms.scaling.model.components.scale_components import (
SingleScaleFactor,
SingleBScaleFactor,
SHScaleComponent,
)
from dials.algorithms.scaling.model.components.smooth_scale_components import (
SmoothScaleComponent1D,
SmoothBScaleComponent1D,
SmoothScaleComponent2D,
SmoothScaleComponent3D,
)
from dials.algorithms.scaling.scaling_utilities import sph_harm_table
from dials.algorithms.scaling.plots import (
plot_absorption_parameters,
plot_absorption_surface,
plot_smooth_scales,
plot_array_decay_plot,
plot_array_modulation_plot,
plot_array_absorption_plot,
)
from dials_scaling_ext import (
calc_theta_phi,
calc_lookup_index,
create_sph_harm_lookup_table,
)
import six
logger = logging.getLogger("dials")
import pkg_resources
kb_model_phil_str = """\
decay_correction = True
.type = bool
.help = "Option to turn off decay correction (for physical/array/KB
default models)."
.expert_level = 1
"""
physical_model_phil_str = """\
scale_interval = 15.0
.type = float(value_min=1.0)
.help = "Rotation (phi) interval between model parameters for the scale"
"component."
.expert_level = 1
decay_correction = True
.type = bool
.help = "Option to turn off decay correction."
.expert_level = 1
decay_interval = 20.0
.type = float(value_min=1.0)
.help = "Rotation (phi) interval between model parameters for the decay"
"component."
.expert_level = 1
decay_restraint = 1e-1
.type = float(value_min=0.0)
.help = "Weight to weakly restrain B-values to 0."
.expert_level = 2
absorption_correction = True
.type = bool
.help = "Option to turn off absorption correction."
.expert_level = 1
lmax = 4
.type = int(value_min=2)
.help = "Number of spherical harmonics to include for absorption"
"correction, recommended to be no more than 6."
.expert_level = 1
surface_weight = 1e6
.type = float(value_min=0.0)
.help = "Restraint weight applied to spherical harmonic terms in the"
"absorption correction."
.expert_level = 1
fix_initial = True
.type = bool
.help = "If performing full matrix minimisation, in the final cycle,"
"constrain the initial parameter for more reliable parameter and"
"scale factor error estimates."
.expert_level = 2
"""
array_model_phil_str = """\
decay_correction = True
.type = bool
.help = "Option to turn off decay correction (a 2D grid of parameters as"
"a function of rotation and resolution (d-value))."
.expert_level = 1
decay_interval = 20.0
.type = float(value_min=1.0)
.help = "Rotation (phi) interval between model parameters for the decay"
"and absorption corrections."
.expert_level = 1
n_resolution_bins = 10
.type = int(value_min=1)
.help = "Number of resolution bins to use for the decay term."
.expert_level = 1
absorption_correction = True
.type = bool
.help = "Option to turn off absorption correction (a 3D grid of"
"parameters as a function of rotation angle, detector-x and"
"detector-y position)."
.expert_level = 1
n_absorption_bins = 3
.type = int(value_min=1)
.help = "Number of bins in each dimension (applied to both x and y) for"
"binning the detector position for the absorption term of the"
"array model."
.expert_level = 1
modulation_correction = False
.type = bool
.help = "Option to turn on a detector correction for the array default"
"model."
.expert_level = 2
n_modulation_bins = 20
.type = int(value_min=1)
.help = "Number of bins in each dimension (applied to both x and y) for"
"binning the detector position for the modulation correction."
.expert_level = 2
"""
[docs]class ScalingModelBase(object):
"""Abstract base class for scaling models."""
id_ = None
def __init__(self, configdict, is_scaled=False):
"""Initialise the model with no components and a :obj:`configdict`."""
if not configdict["corrections"]:
raise ValueError("No model components created.")
self._components = OrderedDict()
self._configdict = configdict
self._is_scaled = is_scaled
self._error_model = None
@property
def is_scaled(self):
""":obj:`bool`: Indicte whether this model has previously been refined."""
return self._is_scaled
[docs] def fix_initial_parameter(self, params):
"""Fix a parameter of the scaling model."""
return False
[docs] def limit_image_range(self, new_image_range):
"""Modify the model if necessary due to reducing the image range.
Args:
new_image_range (tuple): The (start, end) of the new image range.
"""
pass
[docs] def set_scaling_model_as_scaled(self):
"""Set the boolean 'is_scaled' flag as True."""
self._is_scaled = True
[docs] def set_scaling_model_as_unscaled(self):
"""Set the boolean 'is_scaled' flag as False."""
self._is_scaled = False
[docs] def configure_components(self, reflection_table, experiment, params):
"""Add the required reflection table data to the model components."""
raise NotImplementedError()
[docs] def plot_model_components(self):
"""Return a dict of plots for plotting model components with plotly."""
return {}
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""Create the model from input data."""
raise NotImplementedError()
[docs] def set_valid_image_range(self, image_range):
"""Set the valid image range for the model in the :obj:`configdict`."""
self._configdict["valid_image_range"] = image_range
@property
def error_model(self):
""":obj:`error_model`: The error model associated with the scaling model."""
return self._error_model
@property
def configdict(self):
""":obj:`dict`: a dictionary of the model configuration parameters."""
return self._configdict
@property
def components(self):
""":obj:`dict`: a dictionary of the model components."""
return self._components
@property
def n_params(self):
""":obj:`dict`: a dictionary of the model components."""
return sum(c.parameters.size() for c in self._components.values())
[docs] def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names.
This list indicates to the scaler the order to perform scaling in
consecutive scaling mode.
e.g. [['scale', 'decay'], ['absorption']] would cause the first cycle to
refine scale and decay, and then absorption in a subsequent cycle.
"""
raise NotImplementedError()
[docs] def to_dict(self):
"""Serialize the model to a dictionary.
Returns:
dict: A dictionary representation of the model.
"""
dictionary = OrderedDict({"__id__": self.id_})
dictionary["is_scaled"] = self._is_scaled
for key in self.components:
dictionary[key] = OrderedDict(
[
("n_parameters", self._components[key].n_params),
("parameters", list(self._components[key].parameters)),
(
"null_parameter_value",
self._components[key].null_parameter_value,
),
]
)
if self._components[key].parameter_esds:
dictionary[key]["est_standard_devs"] = list(
self._components[key].parameter_esds
)
dictionary["configuration_parameters"] = self._configdict
return dictionary
[docs] @classmethod
def from_dict(cls, obj):
"""Create a scaling model from a dictionary."""
raise NotImplementedError()
[docs] def set_error_model(self, error_model):
"""Associate an error model with the dataset."""
self._error_model = error_model
self._configdict.update(
{
"error_model_type": self.error_model.__class__.__name__,
"error_model_parameters": list(error_model.parameters),
}
)
[docs] def record_intensity_combination_Imid(self, Imid):
"""Record the intensity combination Imid value."""
self._configdict["Imid"] = Imid
def __str__(self):
""":obj:`str`: Return a string representation of a scaling model."""
msg = ["Scaling model:"]
msg.append(" type : " + str(self.id_))
for name, component in six.iteritems(self.components):
msg.append(" " + str(name).capitalize() + " component:")
if component.parameter_esds:
msg.append(" parameters (sigma)")
for p, e in zip(component.parameters, component.parameter_esds):
if p < 0.0:
msg.append(" %.4f (%.4f)" % (p, e))
else:
msg.append(" %.4f (%.4f)" % (p, e))
else:
msg.append(" parameters")
for p in component.parameters:
if p < 0.0:
msg.append(" %.4f" % p)
else:
msg.append(" %.4f" % p)
msg.append("")
return "\n".join(msg)
[docs]class PhysicalScalingModel(ScalingModelBase):
"""A scaling model for a physical parameterisation."""
id_ = "physical"
phil_scope = phil.parse(physical_model_phil_str)
def __init__(self, parameters_dict, configdict, is_scaled=False):
"""Create the phyiscal scaling model components."""
super(PhysicalScalingModel, self).__init__(configdict, is_scaled)
if "scale" in configdict["corrections"]:
scale_setup = parameters_dict["scale"]
self._components["scale"] = SmoothScaleComponent1D(
scale_setup["parameters"], scale_setup["parameter_esds"]
)
if "decay" in configdict["corrections"]:
decay_setup = parameters_dict["decay"]
self._components["decay"] = SmoothBScaleComponent1D(
decay_setup["parameters"], decay_setup["parameter_esds"]
)
if "absorption" in configdict["corrections"]:
absorption_setup = parameters_dict["absorption"]
self._components["absorption"] = SHScaleComponent(
absorption_setup["parameters"], absorption_setup["parameter_esds"]
)
@property
def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names to indicate scaling order."""
return [["scale", "decay"], ["absorption"]]
[docs] def fix_initial_parameter(self, params):
if "scale" in self.components and params.physical.fix_initial:
self.components["scale"].fix_initial_parameter()
return True
[docs] def configure_components(self, reflection_table, experiment, params):
"""Add the required reflection table data to the model components."""
if "scale" in self.components:
norm = (
reflection_table["xyzobs.px.value"].parts()[2]
* self._configdict["s_norm_fac"]
)
self.components["scale"].data = {"x": norm}
if "decay" in self.components:
norm = (
reflection_table["xyzobs.px.value"].parts()[2]
* self._configdict["d_norm_fac"]
)
self.components["decay"].parameter_restraints = flex.double(
self.components["decay"].parameters.size(),
params.physical.decay_restraint,
)
self.components["decay"].data = {"x": norm, "d": reflection_table["d"]}
if "absorption" in self.components:
lmax = self._configdict["lmax"]
if reflection_table.size() > 100000:
assert "s0c" in reflection_table
assert "s1c" in reflection_table
theta_phi_0 = calc_theta_phi(
reflection_table["s0c"]
) # array of tuples in radians
theta_phi_1 = calc_theta_phi(reflection_table["s1c"])
s0_lookup_index = calc_lookup_index(theta_phi_0, points_per_degree=2)
s1_lookup_index = calc_lookup_index(theta_phi_1, points_per_degree=2)
if SHScaleComponent.coefficients_list is None:
SHScaleComponent.coefficients_list = create_sph_harm_lookup_table(
lmax, points_per_degree=2
) # set the class variable and share
elif len(SHScaleComponent.coefficients_list) < (lmax * (2.0 + lmax)):
# this (rare) case can happen if adding a new dataset with a larger lmax!
SHScaleComponent.coefficients_list = create_sph_harm_lookup_table(
lmax, points_per_degree=2
) # set the class variable and share
self.components["absorption"].data = {
"s0_lookup": s0_lookup_index,
"s1_lookup": s1_lookup_index,
}
# here just pass in good reflections
else:
self.components["absorption"].data = {
"sph_harm_table": sph_harm_table(reflection_table, lmax)
}
surface_weight = self._configdict["abs_surface_weight"]
parameter_restraints = flex.double([])
for i in range(1, lmax + 1):
parameter_restraints.extend(flex.double([1.0] * ((2 * i) + 1)))
parameter_restraints *= surface_weight
self.components["absorption"].parameter_restraints = parameter_restraints
[docs] def limit_image_range(self, new_image_range):
"""Modify the model to be suitable for a reduced image range.
For this model, this involves determining whether the number of parameters
should be reduced and may reduce the number of parameters in the scale and
decay components.
Args:
new_image_range (tuple): The (start, end) of the new image range.
"""
conf = self.configdict
current_image_range = conf["valid_image_range"]
current_osc_range = conf["valid_osc_range"]
# calculate one osc as don't have access to scan object here
one_osc = (conf["valid_osc_range"][1] - conf["valid_osc_range"][0]) / (
conf["valid_image_range"][1] - (conf["valid_image_range"][0] - 1)
)
new_osc_range = (
(new_image_range[0] - current_image_range[0]) * one_osc,
(new_image_range[1] - current_image_range[0] + 1) * one_osc,
)
if "scale" in self.components:
n_param, s_norm_fac, scale_rot_int = initialise_smooth_input(
new_osc_range, one_osc, conf["scale_rot_interval"]
)
n_old_params = len(self.components["scale"].parameters)
conf["scale_rot_interval"] = scale_rot_int
conf["s_norm_fac"] = s_norm_fac
offset = calculate_new_offset(
current_image_range[0],
new_image_range[0],
s_norm_fac,
n_old_params,
n_param,
)
new_params = self.components["scale"].parameters[offset : offset + n_param]
self.components["scale"].set_new_parameters(new_params)
if "decay" in self.components:
n_param, d_norm_fac, decay_rot_int = initialise_smooth_input(
new_osc_range, one_osc, conf["decay_rot_interval"]
)
n_old_params = len(self.components["decay"].parameters)
conf["decay_rot_interval"] = decay_rot_int
conf["d_norm_fac"] = d_norm_fac
offset = calculate_new_offset(
current_image_range[0],
new_image_range[0],
d_norm_fac,
n_old_params,
n_param,
)
new_params = self.components["decay"].parameters[offset : offset + n_param]
self.components["decay"].set_new_parameters(new_params)
new_osc_range_0 = current_osc_range[0] + (
(new_image_range[0] - current_image_range[0]) * one_osc
)
new_osc_range_1 = current_osc_range[1] + (
(new_image_range[1] - current_image_range[1]) * one_osc
)
self._configdict["valid_osc_range"] = (new_osc_range_0, new_osc_range_1)
self.set_valid_image_range(new_image_range)
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""Create the scaling model defined by the params."""
params = params.physical
configdict = OrderedDict({"corrections": []})
parameters_dict = {}
osc_range = experiment.scan.get_oscillation_range()
one_osc_width = experiment.scan.get_oscillation()[1]
configdict.update({"valid_osc_range": osc_range})
configdict["corrections"].append("scale")
n_scale_param, s_norm_fac, scale_rot_int = initialise_smooth_input(
osc_range, one_osc_width, params.scale_interval
)
configdict.update(
{"s_norm_fac": s_norm_fac, "scale_rot_interval": scale_rot_int}
)
parameters_dict["scale"] = {
"parameters": flex.double(n_scale_param, 1.0),
"parameter_esds": None,
}
if params.decay_correction:
configdict["corrections"].append("decay")
n_decay_param, d_norm_fac, decay_rot_int = initialise_smooth_input(
osc_range, one_osc_width, params.decay_interval
)
configdict.update(
{"d_norm_fac": d_norm_fac, "decay_rot_interval": decay_rot_int}
)
parameters_dict["decay"] = {
"parameters": flex.double(n_decay_param, 0.0),
"parameter_esds": None,
}
if params.absorption_correction:
configdict["corrections"].append("absorption")
lmax = params.lmax
n_abs_param = (2 * lmax) + (lmax ** 2) # arithmetic sum formula (a1=3, d=2)
configdict.update({"lmax": lmax})
surface_weight = params.surface_weight
configdict.update({"abs_surface_weight": surface_weight})
parameters_dict["absorption"] = {
"parameters": flex.double(n_abs_param, 0.0),
"parameter_esds": None,
}
return cls(parameters_dict, configdict)
[docs] @classmethod
def from_dict(cls, obj):
"""Create a :obj:`PhysicalScalingModel` from a dictionary."""
if obj["__id__"] != cls.id_:
raise RuntimeError("expected __id__ %s, got %s" % (cls.id_, obj["__id__"]))
(s_params, d_params, abs_params) = (None, None, None)
(s_params_sds, d_params_sds, a_params_sds) = (None, None, None)
configdict = obj["configuration_parameters"]
is_scaled = obj["is_scaled"]
if "scale" in configdict["corrections"]:
s_params = flex.double(obj["scale"]["parameters"])
if "est_standard_devs" in obj["scale"]:
s_params_sds = flex.double(obj["scale"]["est_standard_devs"])
if "decay" in configdict["corrections"]:
d_params = flex.double(obj["decay"]["parameters"])
if "est_standard_devs" in obj["decay"]:
d_params_sds = flex.double(obj["decay"]["est_standard_devs"])
if "absorption" in configdict["corrections"]:
abs_params = flex.double(obj["absorption"]["parameters"])
if "est_standard_devs" in obj["absorption"]:
a_params_sds = flex.double(obj["absorption"]["est_standard_devs"])
parameters_dict = {
"scale": {"parameters": s_params, "parameter_esds": s_params_sds},
"decay": {"parameters": d_params, "parameter_esds": d_params_sds},
"absorption": {"parameters": abs_params, "parameter_esds": a_params_sds},
}
return cls(parameters_dict, configdict, is_scaled)
[docs] def plot_model_components(self):
d = OrderedDict()
d.update(plot_smooth_scales(self))
if "absorption" in self.components:
d.update(plot_absorption_parameters(self))
d.update(plot_absorption_surface(self))
return d
[docs]class ArrayScalingModel(ScalingModelBase):
"""A scaling model for an array-based parameterisation."""
id_ = "array"
phil_scope = phil.parse(array_model_phil_str)
def __init__(self, parameters_dict, configdict, is_scaled=False):
"""Create the array scaling model components."""
super(ArrayScalingModel, self).__init__(configdict, is_scaled)
if not any(i in configdict["corrections"] for i in ["decay", "absorption"]):
raise ValueError(
"Array model must have at least one of decay or absorption corrections"
)
if "decay" in configdict["corrections"]:
decay_setup = parameters_dict["decay"]
self._components["decay"] = SmoothScaleComponent2D(
decay_setup["parameters"],
shape=(configdict["n_res_param"], configdict["n_time_param"]),
parameter_esds=decay_setup["parameter_esds"],
)
if "absorption" in configdict["corrections"]:
abs_setup = parameters_dict["absorption"]
self._components["absorption"] = SmoothScaleComponent3D(
abs_setup["parameters"],
shape=(
configdict["n_x_param"],
configdict["n_y_param"],
configdict["n_time_param"],
),
parameter_esds=abs_setup["parameter_esds"],
)
if "modulation" in configdict["corrections"]:
mod_setup = parameters_dict["modulation"]
self._components["modulation"] = SmoothScaleComponent2D(
mod_setup["parameters"],
shape=(configdict["n_x_mod_param"], configdict["n_y_mod_param"]),
parameter_esds=mod_setup["parameter_esds"],
)
@property
def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names to indicate scaling order."""
return [["decay"], ["absorption"], ["modulation"]]
[docs] def configure_components(self, reflection_table, experiment, params):
"""Add the required reflection table data to the model components."""
xyz = reflection_table["xyzobs.px.value"].parts()
norm_time = xyz[2] * self.configdict["time_norm_fac"]
if "decay" in self.components:
d = reflection_table["d"]
norm_res = (
(1.0 / flex.pow2(d)) - self.configdict["resmin"]
) / self.configdict["res_bin_width"]
self.components["decay"].data = {"x": norm_res, "y": norm_time}
if "absorption" in self.components:
norm_x_abs = (xyz[0] - self.configdict["xmin"]) / self.configdict[
"x_bin_width"
]
norm_y_abs = (xyz[1] - self.configdict["ymin"]) / self.configdict[
"y_bin_width"
]
self.components["absorption"].data = {
"x": norm_x_abs,
"y": norm_y_abs,
"z": norm_time,
}
if "modulation" in self.components:
norm_x_det = (xyz[0] - self.configdict["xmin"]) / self.configdict[
"x_det_bin_width"
]
norm_y_det = (xyz[1] - self.configdict["ymin"]) / self.configdict[
"y_det_bin_width"
]
self.components["modulation"].data = {"x": norm_x_det, "y": norm_y_det}
[docs] def limit_image_range(self, new_image_range):
"""Modify the model to be suitable for a reduced image range.
For this model, this involves determining whether the number of parameters
should be reduced and may reduce the number of parameters in the absorption
and decay components.
Args:
new_image_range (tuple): The (start, end) of the new image range.
"""
conf = self.configdict
current_image_range = conf["valid_image_range"]
current_osc_range = conf["valid_osc_range"]
# calculate one osc as don't have access to scan object here
one_osc = (conf["valid_osc_range"][1] - conf["valid_osc_range"][0]) / (
conf["valid_image_range"][1] - (conf["valid_image_range"][0] - 1)
)
new_osc_range = (
(new_image_range[0] - current_image_range[0]) * one_osc,
(new_image_range[1] - current_image_range[0] + 1) * one_osc,
)
n_param, time_norm_fac, time_rot_int = initialise_smooth_input(
new_osc_range, one_osc, conf["time_rot_interval"]
)
if "decay" in self.components:
n_old_time_params = int(
len(self.components["decay"].parameters)
/ self.components["decay"].n_x_params
)
else:
n_old_time_params = int(
len(self.components["absorption"].parameters)
/ (
self.components["absorption"].n_x_params
* self.components["absorption"].n_y_params
)
)
offset = calculate_new_offset(
current_image_range[0],
new_image_range[0],
time_norm_fac,
n_old_time_params,
n_param,
)
if "absorption" in self.components:
params = self.components["absorption"].parameters
n_x_params = self.components["absorption"].n_x_params
n_y_params = self.components["absorption"].n_y_params
# can't do simple slice as 3-dim array
time_offset = offset * n_x_params * n_y_params
new_params = params[
time_offset : time_offset + (n_param * n_x_params * n_y_params)
]
self.components["absorption"].set_new_parameters(
new_params, shape=(n_x_params, n_y_params, n_param)
)
if "decay" in self.components:
params = self.components["decay"].parameters
n_decay_params = self.components["decay"].n_x_params
# can't do simple slice as 2-dim array
decay_offset = offset * n_decay_params
new_params = params[
decay_offset : decay_offset + (n_param * n_decay_params)
]
self.components["decay"].set_new_parameters(
new_params, shape=(n_decay_params, n_param)
)
self._configdict["n_time_param"] = n_param
self._configdict["time_norm_fac"] = time_norm_fac
self._configdict["time_rot_interval"] = time_rot_int
new_osc_range_0 = current_osc_range[0] + (
(new_image_range[0] - current_image_range[0]) * one_osc
)
new_osc_range_1 = current_osc_range[1] + (
(new_image_range[1] - current_image_range[1]) * one_osc
)
self._configdict["valid_osc_range"] = (new_osc_range_0, new_osc_range_1)
self.set_valid_image_range(new_image_range)
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""create an array-based scaling model."""
params = params.array
reflections = reflection_table.select(reflection_table["d"] > 0.0)
configdict = OrderedDict({"corrections": []})
# First initialise things common to more than one correction.
one_osc_width = experiment.scan.get_oscillation()[1]
osc_range = experiment.scan.get_oscillation_range()
configdict.update({"valid_osc_range": osc_range})
n_time_param, time_norm_fac, time_rot_int = initialise_smooth_input(
osc_range, one_osc_width, params.decay_interval
)
(xvalues, yvalues, _) = reflections["xyzobs.px.value"].parts()
(xmax, xmin) = (flex.max(xvalues) + 0.001, flex.min(xvalues) - 0.001)
(ymax, ymin) = (flex.max(yvalues) + 0.001, flex.min(yvalues) - 0.001)
parameters_dict = {}
if params.decay_correction:
configdict["corrections"].append("decay")
resmax = (1.0 / (flex.min(reflections["d"]) ** 2)) + 1e-6
resmin = (1.0 / (flex.max(reflections["d"]) ** 2)) - 1e-6
n_res_bins = params.n_resolution_bins
n_res_param, res_bin_width = calc_n_param_from_bins(
resmin, resmax, n_res_bins
)
configdict.update(
{
"n_res_param": n_res_param,
"n_time_param": n_time_param,
"resmin": resmin,
"res_bin_width": res_bin_width,
"time_norm_fac": time_norm_fac,
"time_rot_interval": time_rot_int,
}
)
parameters_dict["decay"] = {
"parameters": flex.double((n_time_param * n_res_param), 1.0),
"parameter_esds": None,
}
if params.absorption_correction:
configdict["corrections"].append("absorption")
nxbins = nybins = params.n_absorption_bins
n_x_param, x_bin_width = calc_n_param_from_bins(xmin, xmax, nxbins)
n_y_param, y_bin_width = calc_n_param_from_bins(ymin, ymax, nybins)
configdict.update(
{
"n_x_param": n_x_param,
"n_y_param": n_y_param,
"xmin": xmin,
"ymin": ymin,
"x_bin_width": x_bin_width,
"y_bin_width": y_bin_width,
"n_time_param": n_time_param,
"time_norm_fac": time_norm_fac,
"time_rot_interval": time_rot_int,
}
)
parameters_dict["absorption"] = {
"parameters": flex.double((n_x_param * n_y_param * n_time_param), 1.0),
"parameter_esds": None,
}
if params.modulation_correction:
configdict["corrections"].append("modulation")
nx_det_bins = ny_det_bins = params.n_modulation_bins
n_x_mod_param, x_det_bw = calc_n_param_from_bins(xmin, xmax, nx_det_bins)
n_y_mod_param, y_det_bw = calc_n_param_from_bins(ymin, ymax, ny_det_bins)
configdict.update(
{
"n_x_mod_param": n_x_mod_param,
"n_y_mod_param": n_y_mod_param,
"xmin": xmin,
"ymin": ymin,
"x_det_bin_width": x_det_bw,
"y_det_bin_width": y_det_bw,
}
)
parameters_dict["modulation"] = {
"parameters": flex.double((n_x_mod_param * n_y_mod_param), 1.0),
"parameter_esds": None,
}
return cls(parameters_dict, configdict)
[docs] @classmethod
def from_dict(cls, obj):
"""Create an :obj:`ArrayScalingModel` from a dictionary."""
if obj["__id__"] != cls.id_:
raise RuntimeError("expected __id__ %s, got %s" % (cls.id_, obj["__id__"]))
configdict = obj["configuration_parameters"]
is_scaled = obj["is_scaled"]
(dec_params, abs_params, mod_params) = (None, None, None)
(d_params_sds, a_params_sds, m_params_sds) = (None, None, None)
if "decay" in configdict["corrections"]:
dec_params = flex.double(obj["decay"]["parameters"])
if "est_standard_devs" in obj["decay"]:
d_params_sds = flex.double(obj["decay"]["est_standard_devs"])
if "absorption" in configdict["corrections"]:
abs_params = flex.double(obj["absorption"]["parameters"])
if "est_standard_devs" in obj["absorption"]:
a_params_sds = flex.double(obj["absorption"]["est_standard_devs"])
if "modulation" in configdict["corrections"]:
mod_params = flex.double(obj["modulation"]["parameters"])
if "est_standard_devs" in obj["modulation"]:
m_params_sds = flex.double(obj["modulation"]["est_standard_devs"])
parameters_dict = {
"decay": {"parameters": dec_params, "parameter_esds": d_params_sds},
"absorption": {"parameters": abs_params, "parameter_esds": a_params_sds},
"modulation": {"parameters": mod_params, "parameter_esds": m_params_sds},
}
return cls(parameters_dict, configdict, is_scaled)
[docs] def plot_model_components(self):
d = OrderedDict()
if "absorption" in self.components:
d.update(plot_array_absorption_plot(self))
if "decay" in self.components:
d.update(plot_array_decay_plot(self))
if "modulation" in self.components:
d.update(plot_array_modulation_plot(self))
return d
[docs]class KBScalingModel(ScalingModelBase):
"""A scaling model for a KB parameterisation."""
id_ = "KB"
phil_scope = phil.parse(kb_model_phil_str)
def __init__(self, parameters_dict, configdict, is_scaled=False):
"""Create the KB scaling model components."""
super(KBScalingModel, self).__init__(configdict, is_scaled)
if "scale" in configdict["corrections"]:
self._components["scale"] = SingleScaleFactor(
parameters_dict["scale"]["parameters"],
parameters_dict["scale"]["parameter_esds"],
)
if "decay" in configdict["corrections"]:
self._components["decay"] = SingleBScaleFactor(
parameters_dict["decay"]["parameters"],
parameters_dict["decay"]["parameter_esds"],
)
[docs] def configure_components(self, reflection_table, experiment, params):
"""Add the required reflection table data to the model components."""
if "scale" in self.components:
self.components["scale"].data = {"id": reflection_table["id"]}
if "decay" in self.components:
self.components["decay"].data = {
"d": reflection_table["d"],
"id": reflection_table["id"],
}
@property
def consecutive_refinement_order(self):
""":obj:`list`: a nested list of component names to indicate scaling order."""
return [["scale", "decay"]]
[docs] @classmethod
def from_dict(cls, obj):
"""Create an :obj:`KBScalingModel` from a dictionary."""
if obj["__id__"] != cls.id_:
raise RuntimeError("expected __id__ %s, got %s" % (cls.id_, obj["__id__"]))
configdict = obj["configuration_parameters"]
is_scaled = obj["is_scaled"]
(s_params, d_params) = (None, None)
(s_params_sds, d_params_sds) = (None, None)
if "scale" in configdict["corrections"]:
s_params = flex.double(obj["scale"]["parameters"])
if "est_standard_devs" in obj["scale"]:
s_params_sds = flex.double(obj["scale"]["est_standard_devs"])
if "decay" in configdict["corrections"]:
d_params = flex.double(obj["decay"]["parameters"])
if "est_standard_devs" in obj["decay"]:
d_params_sds = flex.double(obj["decay"]["est_standard_devs"])
parameters_dict = {
"scale": {"parameters": s_params, "parameter_esds": s_params_sds},
"decay": {"parameters": d_params, "parameter_esds": d_params_sds},
}
return cls(parameters_dict, configdict, is_scaled)
[docs] @classmethod
def from_data(cls, params, experiment, reflection_table):
"""Create the :obj:`KBScalingModel` from data."""
configdict = OrderedDict({"corrections": []})
parameters_dict = {}
if params.KB.decay_correction:
configdict["corrections"].append("decay")
parameters_dict["decay"] = {
"parameters": flex.double([0.0]),
"parameter_esds": None,
}
configdict["corrections"].append("scale")
parameters_dict["scale"] = {
"parameters": flex.double([1.0]),
"parameter_esds": None,
}
return cls(parameters_dict, configdict)
[docs]def calculate_new_offset(
current_image_0, new_image_0, new_norm_fac, n_old_param, n_new_param
):
"""Calculate the parameter offset for the new image range.
Returns:
int: An offset to apply when selecting the new parameters from the
existing parameters.
"""
if n_old_param == 2:
return 0 # cant have less than two params
batch_difference = (new_image_0 - current_image_0) * new_norm_fac
n_to_shift = int(batch_difference // 1)
if batch_difference % 1 > 0.5:
n_to_shift += 1
return min(n_old_param - n_new_param, n_to_shift) # cant shift by more
# than difference between old and new
[docs]def initialise_smooth_input(osc_range, one_osc_width, interval):
"""Calculate the required smoother parameters.
Using information about the sequence and the chosen parameterisation
interval, the required parameters for the smoother are determined.
Args:
osc_range (tuple): The (start, stop) of an oscillation in degrees.
one_osc_width (float): The oscillation width of a single image in degrees.
interval (float): The required maximum separation between parameters
in degrees.
Returns:
tuple: 3-element tuple containing;
n_params (:obj:`int`): The number of parameters to use.
norm_fac (:obj:`float`): The degrees to parameters space normalisation factor.
interval (:obj:`float`): The actual interval in degrees between the parameters.
"""
interval += 0.00001
if (osc_range[1] - osc_range[0]) < (2.0 * interval):
if (osc_range[1] - osc_range[0]) <= interval:
rot_int = osc_range[1] - osc_range[0]
n_param = 2
else:
rot_int = (osc_range[1] - osc_range[0]) / 2.0
n_param = 3
else:
n_bins = max(int((osc_range[1] - osc_range[0]) / interval) + 1, 3)
rot_int = (osc_range[1] - osc_range[0]) / float(n_bins)
n_param = n_bins + 2
norm_fac = 0.999 * one_osc_width / rot_int # to make sure normalise values
# fall within range of smoother.
return n_param, norm_fac, rot_int
[docs]def calc_n_param_from_bins(value_min, value_max, n_bins):
"""Return the correct number of bins for initialising the gaussian
smoothers."""
assert n_bins > 0
assert isinstance(n_bins, int)
bin_width = (value_max - value_min) / n_bins
if n_bins == 1:
n_param = 2
elif n_bins == 2:
n_param = 3
else:
n_param = n_bins + 2
return n_param, bin_width
model_phil_scope = phil.parse("")
_dxtbx_scaling_models = {
ep.name: ep for ep in pkg_resources.iter_entry_points("dxtbx.scaling_model_ext")
}
assert (
_dxtbx_scaling_models
), "No models registered with dxtbx.scaling_model_ext entry point"
model_phil_scope.adopt_scope(
phil.parse(
"model ="
+ " ".join(_dxtbx_scaling_models)
+ "\n .type = choice"
+ "\n .help = Set scaling model to be applied to input datasets"
+ "\n .expert_level = 0"
)
)
for entry_point_name, entry_point in _dxtbx_scaling_models.items():
ext_master_scope = phil.parse("%s .expert_level=1 {}" % entry_point_name)
ext_phil_scope = ext_master_scope.get_without_substitution(entry_point_name)
assert len(ext_phil_scope) == 1
ext_phil_scope = ext_phil_scope[0]
ext_phil_scope.adopt_scope(entry_point.load().phil_scope)
model_phil_scope.adopt_scope(ext_master_scope)
[docs]def plot_scaling_models(model_dict):
"""Return a dict of component plots for the model for plotting with plotly."""
entry_point = _dxtbx_scaling_models.get(model_dict["__id__"])
if entry_point:
model = entry_point.load().from_dict(model_dict)
return model.plot_model_components()
return OrderedDict()