Click here to go to the corresponding page for the latest version of DIALS
Source code for dxtbx.model.experiment_list
from __future__ import absolute_import, division, print_function
import pkg_resources
from dxtbx.model import Experiment, ExperimentList
from dxtbx.datablock import AutoEncoder
from dxtbx.datablock import BeamComparison
from dxtbx.datablock import DetectorComparison
from dxtbx.datablock import GoniometerComparison
from dxtbx.datablock import DataBlockFactory
from dxtbx.datablock import SweepDiff
[docs]class InvalidExperimentListError(RuntimeError):
"""
Indicates an error whilst validating the experiment list.
This means that there is some structural problem that prevents the given data
from representing a well-formed experiment list. This doesn't indicate e.g.
some problem with the data or model consistency.
"""
[docs]class ExperimentListDict(object):
"""A helper class for serializing the experiment list to dictionary (needed
to save the experiment list to JSON format."""
def __init__(self, obj, check_format=True, directory=None):
""" Initialise. Copy the dictionary. """
from copy import deepcopy
# Basic check: This is a dict-like object. This can happen if e.g. we
# were passed a DataBlock list instead of an ExperimentList dictionary
if isinstance(obj, list) or not hasattr(obj, "get"):
raise InvalidExperimentListError(
"Expected dictionary, not {}".format(type(obj))
)
self._obj = deepcopy(obj)
self._check_format = check_format
self._directory = directory
[docs] def decode(self):
""" Decode the dictionary into a list of experiments. """
# If this doesn't claim to be an ExperimentList, don't even try
if not self._obj.get("__id__", None) == "ExperimentList":
raise InvalidExperimentListError(
"Expected __id__ 'ExperimentList', but found {}".format(
repr(self._obj.get("__id__"))
)
)
# Extract lists of models referenced by experiments
self._blist = self._extract_models("beam")
self._dlist = self._extract_models("detector")
self._glist = self._extract_models("goniometer")
self._slist = self._extract_models("scan")
self._clist = self._extract_models("crystal")
self._plist = self._extract_models("profile")
self._scalelist = self._extract_models("scaling_model")
# Go through all the imagesets and make sure the dictionary
# references by an index rather than a file path. Experiments
# referencing the same imageset will get different objects
# due to the fact that we can have different models
self._ilist = self._extract_imagesets()
# Extract all the experiments
return self._extract_experiments()
def _extract_models(self, name):
""" Helper function. Extract the models. """
# The from dict function
from_dict = getattr(self, "_%s_from_dict" % name)
# Extract all the model list
mlist = self._obj.get(name, [])
# Convert the model from dictionary to concreate
# python class for the model.
mlist = [from_dict(d) for d in mlist]
# Dictionaries for file mappings
mmap = {}
# For each experiment, check the model is not specified by
# a path, if it is then get the dictionary of the model
# and insert it into the list. Replace the path reference
# with an index
for eobj in self._obj["experiment"]:
value = eobj.get(name)
if value is None:
continue
elif isinstance(value, str):
if value not in mmap:
mmap[value] = len(mlist)
mlist.append(
from_dict(ExperimentListDict._from_file(value, self._directory))
)
eobj[name] = mmap[value]
elif not isinstance(value, int):
raise TypeError("expected int or str, got %s" % type(value))
# Return the model list
return mlist
def _extract_imagesets(self):
""" Helper function, extract the imagesets. """
# Extract all the model list
mlist = self._obj.get("imageset", [])
# Dictionaries for file mappings
mmap = {}
# For each experiment, check the imageset is not specified by
# a path, if it is then get the dictionary of the imageset
# and insert it into the list. Replace the path reference
# with an index
for eobj in self._obj["experiment"]:
value = eobj.get("imageset")
if value is None:
continue
elif isinstance(value, str):
if value not in mmap:
mmap[value] = len(mlist)
mlist.append(ExperimentListDict._from_file(value, self._directory))
eobj["imageset"] = mmap[value]
elif not isinstance(value, int):
raise TypeError("expected int or str, got %s" % type(value))
# Return the model list
return mlist
def _extract_experiments(self):
""" Helper function. Extract the experiments. """
from dxtbx.imageset import ImageSweep, ImageSet, ImageGrid
from dxtbx.serialize.filename import load_path
import six.moves.cPickle as pickle
from dxtbx.format.image import ImageBool, ImageDouble
# Map of imageset/scan pairs
imagesets = {}
# For every experiment, use the given input to create
# a sensible experiment.
el = ExperimentList()
for eobj in self._obj["experiment"]:
# Get the models
identifier = eobj.get("identifier", "")
beam = ExperimentListDict.model_or_none(self._blist, eobj, "beam")
detector = ExperimentListDict.model_or_none(self._dlist, eobj, "detector")
goniometer = ExperimentListDict.model_or_none(
self._glist, eobj, "goniometer"
)
scan = ExperimentListDict.model_or_none(self._slist, eobj, "scan")
crystal = ExperimentListDict.model_or_none(self._clist, eobj, "crystal")
profile = ExperimentListDict.model_or_none(self._plist, eobj, "profile")
scaling_model = ExperimentListDict.model_or_none(
self._scalelist, eobj, "scaling_model"
)
key = (eobj.get("imageset"), eobj.get("scan"))
try:
imageset = imagesets[key]
except Exception:
imageset = ExperimentListDict.model_or_none(
self._ilist, eobj, "imageset"
)
# Create the imageset from the input data
if imageset is not None:
if "params" in imageset:
format_kwargs = imageset["params"]
else:
format_kwargs = {}
if "mask" in imageset and imageset["mask"] is not None:
mask_filename = load_path(
imageset["mask"], directory=self._directory
)
if self._check_format and mask_filename is not "":
with open(mask_filename, "rb") as fh:
mask = pickle.load(fh)
else:
mask = None
else:
mask_filename = None
mask = None
if "gain" in imageset and imageset["gain"] is not None:
gain_filename = load_path(
imageset["gain"], directory=self._directory
)
if self._check_format and gain_filename is not "":
with open(gain_filename, "rb") as fh:
gain = pickle.load(fh)
else:
gain = None
else:
gain_filename = None
gain = None
if "pedestal" in imageset and imageset["pedestal"] is not None:
pedestal_filename = load_path(
imageset["pedestal"], directory=self._directory
)
if self._check_format and pedestal_filename is not "":
with open(pedestal_filename, "rb") as fh:
pedestal = pickle.load(fh)
else:
pedestal = None
else:
pedestal_filename = None
pedestal = None
if "dx" in imageset and imageset["dx"] is not None:
dx_filename = load_path(
imageset["dx"], directory=self._directory
)
if dx_filename is not "":
with open(dx_filename, "rb") as fh:
dx = pickle.load(fh)
else:
dx = None
else:
dx_filename = None
dx = None
if "dy" in imageset and imageset["dy"] is not None:
dy_filename = load_path(
imageset["dy"], directory=self._directory
)
if dy_filename is not "":
with open(dy_filename, "rb") as fh:
dy = pickle.load(fh)
else:
dy = None
else:
dy_filename = None
dy = None
if imageset["__id__"] == "ImageSet":
imageset = self._make_stills(
imageset, format_kwargs=format_kwargs
)
elif imageset["__id__"] == "ImageGrid":
imageset = self._make_grid(
imageset, format_kwargs=format_kwargs
)
elif imageset["__id__"] == "ImageSweep":
imageset = self._make_sweep(
imageset,
beam=beam,
detector=detector,
goniometer=goniometer,
scan=scan,
format_kwargs=format_kwargs,
)
elif imageset["__id__"] == "MemImageSet":
imageset = self._make_mem_imageset(imageset)
else:
raise RuntimeError("Unknown imageset type")
# Set the external lookup
if imageset is not None:
if mask_filename is None:
mask_filename = ""
if gain_filename is None:
gain_filename = ""
if pedestal_filename is None:
pedestal_filename = ""
if dx_filename is None:
dx_filename = ""
if dy_filename is None:
dy_filename = ""
if mask is None:
mask = ImageBool()
else:
mask = ImageBool(mask)
if gain is None:
gain = ImageDouble()
else:
gain = ImageDouble(gain)
if pedestal is None:
pedestal = ImageDouble()
else:
pedestal = ImageDouble(pedestal)
if dx is None:
dx = ImageDouble()
else:
dx = ImageDouble(dx)
if dy is None:
dy = ImageDouble()
else:
dy = ImageDouble(dy)
imageset.external_lookup.mask.data = mask
imageset.external_lookup.mask.filename = mask_filename
imageset.external_lookup.gain.data = gain
imageset.external_lookup.gain.filename = gain_filename
imageset.external_lookup.pedestal.data = pedestal
imageset.external_lookup.pedestal.filename = pedestal_filename
imageset.external_lookup.dx.data = dx
imageset.external_lookup.dx.filename = dx_filename
imageset.external_lookup.dy.data = dy
imageset.external_lookup.dy.filename = dy_filename
# Update the imageset models
if isinstance(imageset, ImageSweep):
imageset.set_beam(beam)
imageset.set_detector(detector)
imageset.set_goniometer(goniometer)
imageset.set_scan(scan)
elif isinstance(imageset, ImageSet):
for i in range(len(imageset)):
imageset.set_beam(beam, i)
imageset.set_detector(detector, i)
imageset.set_goniometer(goniometer, i)
imageset.set_scan(scan, i)
elif isinstance(imageset, ImageGrid):
for i in range(len(imageset)):
imageset.set_beam(beam, i)
imageset.set_detector(detector, i)
imageset.set_goniometer(goniometer, i)
imageset.set_scan(scan, i)
else:
pass
if imageset is not None:
imageset.update_detector_px_mm_data()
# Add the imageset to the dict
imagesets[key] = imageset
# Append the experiment
el.append(
Experiment(
imageset=imageset,
beam=beam,
detector=detector,
goniometer=goniometer,
scan=scan,
crystal=crystal,
profile=profile,
scaling_model=scaling_model,
identifier=identifier,
)
)
# Return the experiment list
return el
def _make_mem_imageset(self, imageset):
""" Can't make a mem imageset from dict. """
return None
def _make_stills(self, imageset, format_kwargs=None):
""" Make a still imageset. """
from dxtbx.imageset import ImageSetFactory
from dxtbx.serialize.filename import load_path
filenames = [
load_path(p, directory=self._directory) for p in imageset["images"]
]
indices = None
if "single_file_indices" in imageset:
indices = imageset["single_file_indices"]
assert len(indices) == len(filenames)
return ImageSetFactory.make_imageset(
filenames,
None,
check_format=self._check_format,
single_file_indices=indices,
format_kwargs=format_kwargs,
)
def _make_grid(self, imageset, format_kwargs=None):
""" Make a still imageset. """
from dxtbx.imageset import ImageGrid
grid_size = imageset["grid_size"]
return ImageGrid.from_imageset(
self._make_stills(imageset, format_kwargs=format_kwargs), grid_size
)
def _make_sweep(
self,
imageset,
beam=None,
detector=None,
goniometer=None,
scan=None,
format_kwargs=None,
):
""" Make an image sweep. """
from dxtbx.sweep_filenames import template_image_range
from dxtbx.imageset import ImageSetFactory
from dxtbx.serialize.filename import load_path
from dxtbx.format.FormatMultiImage import FormatMultiImage
# Get the template format
template = load_path(imageset["template"], directory=self._directory)
# Get the number of images (if no scan is given we'll try
# to find all the images matching the template
if scan is None:
i0, i1 = template_image_range(template)
else:
i0, i1 = scan.get_image_range()
format_class = None
if self._check_format is False:
if "single_file_indices" in imageset:
format_class = FormatMultiImage
# Make a sweep from the input data
return ImageSetFactory.make_sweep(
template,
list(range(i0, i1 + 1)),
format_class=format_class,
check_format=self._check_format,
beam=beam,
detector=detector,
goniometer=goniometer,
scan=scan,
format_kwargs=format_kwargs,
)
[docs] @staticmethod
def model_or_none(mlist, eobj, name):
""" Get a model or None. """
index = eobj.get(name)
if index is not None:
return mlist[index]
return None
@staticmethod
def _beam_from_dict(obj):
""" Get a beam from a dictionary. """
from dxtbx.model import BeamFactory
return BeamFactory.from_dict(obj)
@staticmethod
def _detector_from_dict(obj):
""" Get the detector from a dictionary. """
from dxtbx.model import DetectorFactory
return DetectorFactory.from_dict(obj)
@staticmethod
def _goniometer_from_dict(obj):
""" Get the goniometer from a dictionary. """
from dxtbx.model import GoniometerFactory
return GoniometerFactory.from_dict(obj)
@staticmethod
def _scan_from_dict(obj):
""" Get the scan from a dictionary. """
from dxtbx.model import ScanFactory
return ScanFactory.from_dict(obj)
@staticmethod
def _crystal_from_dict(obj):
""" Get the crystal from a dictionary. """
from dxtbx.model import CrystalFactory
return CrystalFactory.from_dict(obj)
@staticmethod
def _profile_from_dict(obj):
""" Get the profile from a dictionary. """
from dxtbx.model import ProfileModelFactory
return ProfileModelFactory.from_dict(obj)
@staticmethod
def _scaling_model_from_dict(obj):
""" Get the scaling model from a dictionary. """
for entry_point in pkg_resources.iter_entry_points("dxtbx.scaling_model_ext"):
if entry_point.name == obj["__id__"]:
return entry_point.load().from_dict(obj)
@staticmethod
def _from_file(filename, directory=None):
""" Load a model dictionary from a file. """
from dxtbx.serialize.load import _decode_dict
from dxtbx.serialize.filename import load_path
import json
from os.path import dirname
filename = load_path(filename, directory=directory)
try:
with open(filename, "r") as infile:
return json.load(infile, object_hook=_decode_dict)
except IOError:
raise IOError("unable to read file, %s" % filename)
[docs]class ExperimentListDumper(object):
""" A class to help writing JSON files. """
def __init__(self, experiment_list):
""" Initialise """
self._experiment_list = experiment_list
[docs] def as_json(self, filename=None, compact=False, split=False):
""" Dump experiment list as json """
import json
from os.path import splitext
from libtbx.containers import OrderedDict
# Get the dictionary and get the JSON string
dictionary = self._experiment_list.to_dict()
# Split into separate files
if filename is not None and split:
# Get lists of models by filename
basepath = splitext(filename)[0]
ilist = [
("%s_imageset_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["imageset"])
]
blist = [
("%s_beam_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["beam"])
]
dlist = [
("%s_detector_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["detector"])
]
glist = [
("%s_goniometer_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["goniometer"])
]
slist = [
("%s_scan_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["scan"])
]
clist = [
("%s_crystal_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["crystal"])
]
plist = [
("%s_profile_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["profile"])
]
scalelist = [
("%s_scaling_model_%d.json" % (basepath, i), d)
for i, d in enumerate(dictionary["scaling_model"])
]
# Get the list of experiments
edict = OrderedDict(
[("__id__", "ExperimentList"), ("experiment", dictionary["experiment"])]
)
# Set paths rather than indices
for e in edict["experiment"]:
if "imageset" in e:
e["imageset"] = ilist[e["imageset"]][0]
if "beam" in e:
e["beam"] = blist[e["beam"]][0]
if "detector" in e:
e["detector"] = dlist[e["detector"]][0]
if "goniometer" in e:
e["goniometer"] = glist[e["goniometer"]][0]
if "scan" in e:
e["scan"] = slist[e["scan"]][0]
if "crystal" in e:
e["crystal"] = clist[e["crystal"]][0]
if "profile" in e:
e["profile"] = plist[e["profile"]][0]
if "scaling_model" in e:
e["scaling_model"] = scalelist[e["scaling_model"]][0]
to_write = (
ilist
+ blist
+ dlist
+ glist
+ slist
+ clist
+ plist
+ scalelist
+ [(filename, edict)]
)
else:
to_write = [(filename, dictionary)]
for fname, obj in to_write:
if compact:
separators = (",", ":")
indent = None
else:
separators = None
indent = 2
text = json.dumps(
obj,
separators=separators,
indent=indent,
ensure_ascii=True,
cls=AutoEncoder,
)
# If a filename is set then dump to file otherwise return string
if fname is not None:
with open(fname, "w") as outfile:
outfile.write(text)
else:
return text
[docs] def as_pickle(self, filename=None, **kwargs):
""" Dump experiment list as pickle. """
import six.moves.cPickle as pickle
# Get the pickle string
text = pickle.dumps(self._experiment_list, protocol=pickle.HIGHEST_PROTOCOL)
# Write the file
if filename:
with open(filename, "wb") as outfile:
outfile.write(text)
else:
return text
[docs] def as_file(self, filename, **kwargs):
""" Dump experiment list as file. """
from os.path import splitext
ext = splitext(filename)[1]
j_ext = [".json"]
p_ext = [".p", ".pkl", ".pickle"]
if ext.lower() in j_ext:
return self.as_json(filename, **kwargs)
elif ext.lower() in p_ext:
return self.as_pickle(filename, **kwargs)
else:
ext_str = "|".join(j_ext + p_ext)
raise RuntimeError("expected extension {%s}, got %s" % (ext_str, ext))
[docs]class ExperimentListFactory(object):
""" A class to help instantiate experiment lists. """
[docs] @staticmethod
def from_args(args, verbose=False, unhandled=None):
""" Try to load experiment from any recognised format. """
from dxtbx.datablock import DataBlockFactory
# Create a list for unhandled arguments
if unhandled is None:
unhandled = []
experiments = ExperimentList()
## First try as image files
# experiments = ExperimentListFactory.from_datablock(
# DataBlockFactory.from_args(args, verbose, unhandled1))
# Try to load from serialized formats
for filename in args:
try:
experiments.extend(
ExperimentListFactory.from_serialized_format(filename)
)
if verbose:
print("Loaded experiments from %s" % filename)
except Exception as e:
if verbose:
print("Could not load experiments from %s: %s" % (filename, str(e)))
unhandled.append(filename)
# Return the experiments
return experiments
[docs] @staticmethod
def from_filenames(
filenames,
verbose=False,
unhandled=None,
compare_beam=None,
compare_detector=None,
compare_goniometer=None,
scan_tolerance=None,
format_kwargs=None,
):
""" Create a list of data blocks from a list of directory or file names. """
experiments = ExperimentList()
for db in DataBlockFactory.from_filenames(
filenames,
verbose=verbose,
unhandled=unhandled,
compare_beam=compare_beam,
compare_detector=compare_detector,
compare_goniometer=compare_goniometer,
scan_tolerance=scan_tolerance,
format_kwargs=format_kwargs,
):
experiments.extend(
ExperimentListFactory.from_datablock_and_crystal(db, None)
)
return experiments
[docs] @staticmethod
def from_imageset_and_crystal(imageset, crystal):
""" Load an experiment list from an imageset and crystal. """
from dxtbx.imageset import ImageSweep
if isinstance(imageset, ImageSweep):
return ExperimentListFactory.from_sweep_and_crystal(imageset, crystal)
else:
return ExperimentListFactory.from_stills_and_crystal(imageset, crystal)
[docs] @staticmethod
def from_sweep_and_crystal(imageset, crystal):
""" Create an experiment list from sweep and crystal. """
return ExperimentList(
[
Experiment(
imageset=imageset,
beam=imageset.get_beam(),
detector=imageset.get_detector(),
goniometer=imageset.get_goniometer(),
scan=imageset.get_scan(),
crystal=crystal,
)
]
)
[docs] @staticmethod
def from_stills_and_crystal(imageset, crystal):
""" Create an experiment list from stills and crystal. """
from itertools import groupby
# Get a list of models for each image
beam, detector, gonio, scan = ([], [], [], [])
for i in xrange(len(imageset)):
try:
beam.append(imageset.get_beam(i))
except Exception:
beam.append(None)
try:
detector.append(imageset.get_detector(i))
except Exception:
detector.append(None)
try:
gonio.append(imageset.get_goniometer(i))
except Exception:
gonio.append(None)
try:
scan.append(imageset.get_scan(i))
except Exception:
scan.append(None)
models = zip(beam, detector, gonio, scan)
# Find subsets where all the models are the same
experiments = ExperimentList()
for m, indices in groupby(xrange(len(models)), lambda i: models[i]):
indices = list(indices)
experiments.append(
Experiment(
imageset=imageset[indices[0] : indices[-1] + 1],
beam=m[0],
detector=m[1],
goniometer=m[2],
scan=m[3],
crystal=crystal,
)
)
# Return experiments
return experiments
[docs] @staticmethod
def from_datablock_and_crystal(datablock, crystal):
""" Load an experiment list from a datablock. """
# Initialise the experiment list
experiments = ExperimentList()
# If we have a list, loop through
if isinstance(datablock, list):
for db in datablock:
experiments.extend(
ExperimentListFactory.from_datablock_and_crystal(db, crystal)
)
return experiments
# Add all the imagesets
for imageset in datablock.extract_imagesets():
experiments.extend(
ExperimentListFactory.from_imageset_and_crystal(imageset, crystal)
)
# Check the list is consistent
assert experiments.is_consistent()
# Return the experiments
return experiments
[docs] @staticmethod
def from_dict(obj, check_format=True, directory=None):
""" Load an experiment list from a dictionary. """
try:
experiments = ExperimentList()
for db in DataBlockFactory.from_dict(
obj, check_format=check_format, directory=directory
):
experiments.extend(
ExperimentListFactory.from_datablock_and_crystal(db, None)
)
except Exception:
experiments = None
# Decode the experiments from the dictionary
if experiments is None:
experiments = ExperimentListDict(
obj, check_format=check_format, directory=directory
).decode()
# Check the list is consistent
assert experiments.is_consistent()
# Return the experiments
return experiments
[docs] @staticmethod
def from_json(text, check_format=True, directory=None):
""" Load an experiment list from JSON. """
from dxtbx.serialize.load import _decode_dict
import json
return ExperimentListFactory.from_dict(
json.loads(text, object_hook=_decode_dict),
check_format=check_format,
directory=directory,
)
[docs] @staticmethod
def from_json_file(filename, check_format=True):
""" Load an experiment list from a json file. """
from os.path import dirname, abspath
filename = abspath(filename)
directory = dirname(filename)
with open(filename, "r") as infile:
return ExperimentListFactory.from_json(
infile.read(), check_format=check_format, directory=directory
)
[docs] @staticmethod
def from_pickle_file(filename):
""" Decode an experiment list from a pickle file. """
import six.moves.cPickle as pickle
with open(filename, "rb") as infile:
obj = pickle.load(infile)
assert isinstance(obj, ExperimentList)
return obj
[docs] @staticmethod
def from_xds(xds_inp, xds_other):
""" Generate an experiment list from XDS files. """
from dxtbx.serialize import xds
from dxtbx.datablock import DataBlockFactory
# Get the sweep from the XDS files
sweep = xds.to_imageset(xds_inp, xds_other)
# Get the crystal from the XDS files
crystal = xds.to_crystal(xds_other)
# Create the experiment list
experiments = ExperimentListFactory.from_imageset_and_crystal(sweep, crystal)
# Set the crystal in the experiment list
assert len(experiments) == 1
# Return the experiment list
return experiments
[docs] @staticmethod
def from_serialized_format(filename, check_format=True):
""" Try to load the experiment list from a serialized format. """
# First try as a JSON file
try:
return ExperimentListFactory.from_json_file(filename, check_format)
except Exception:
pass
# Now try as a pickle file
return ExperimentListFactory.from_pickle_file(filename)
[docs]class ExperimentListTemplateImporter(object):
""" A class to import an experiment list from a template. """
def __init__(self, templates, verbose=False, **kwargs):
from dxtbx.datablock import DataBlockTemplateImporter
importer = DataBlockTemplateImporter(templates, verbose=verbose, **kwargs)
self.experiments = ExperimentList()
for db in importer.datablocks:
self.experiments.extend(
ExperimentListFactory.from_datablock_and_crystal(db, None)
)