Click here to go to the corresponding page for the latest version of DIALS
Source code for dxtbx.imageset
from __future__ import absolute_import, division, print_function
from builtins import range
import boost.python
import dxtbx.format.image # noqa: F401, import dependency for unpickling
import dxtbx.format.Registry
from dxtbx.sequence_filenames import group_files_by_imageset, template_image_range
from dxtbx_imageset_ext import (
ExternalLookup,
ExternalLookupItemBool,
ExternalLookupItemDouble,
ImageGrid,
ImageSequence,
ImageSet,
ImageSetData,
)
ext = boost.python.import_ext("dxtbx_ext")
__all__ = (
"ExternalLookup",
"ExternalLookupItemBool",
"ExternalLookupItemDouble",
"ImageGrid",
"ImageSet",
"ImageSetData",
"ImageSetFactory",
"ImageSetLazy",
"ImageSequence",
"MemReader",
)
[docs]class MemReader(object):
"""A reader for data already loaded in memory"""
def __init__(self, images):
self._images = images
def __len__(self):
return len(self._images)
[docs] def read(self, index):
format_instance = self._images[index]
return format_instance.get_raw_data()
@boost.python.inject_into(ImageSet)
class _(object):
"""
A class to inject additional methods into the imageset class
"""
def __getitem__(self, item):
"""Get an item from the image set stream.
If the item is an index, read and return the image at the given index.
Otherwise, if the item is a slice, then create a new ImageSet object
with the given number of array indices from the slice.
Params:
item The index or slice
Returns:
An image or new ImageSet object
"""
if isinstance(item, slice):
start = item.start or 0
stop = item.stop or len(self)
if item.step is not None and item.step != 1:
raise IndexError("Step must be 1")
return self.partial_set(start, stop)
else:
return self.get_corrected_data(item)
def __iter__(self):
"""Iterate over the array indices and read each image in turn."""
for i in range(len(self)):
yield self[i]
def get_vendortype(self, index):
"""Get the vendor information."""
return self.data().get_vendor()
def get_format_class(self):
"""Get format class name"""
return self.data().get_format_class()
def params(self):
"""Get the parameters"""
return self.data().get_params()
def get_detectorbase(self, index):
"""
A function to be injected into the imageset to get the detectorbase instance
"""
kwargs = self.params()
if self.data().has_single_file_reader():
format_instance = self.get_format_class().get_instance(
self.data().get_master_path(), **kwargs
)
return format_instance.get_detectorbase(self.indices()[index])
else:
format_instance = self.get_format_class().get_instance(
self.get_path(index), **kwargs
)
return format_instance.get_detectorbase()
def reader(self):
"""
Return the reader
"""
return self.data().reader()
def masker(self):
"""
Return the masker
"""
return self.data().masker()
def paths(self):
"""
Return the list of paths
"""
return [self.get_path(i) for i in range(len(self))]
[docs]class ImageSetLazy(ImageSet):
"""
Lazy ImageSet class that doesn't necessitate setting the models ahead of time.
Only when a particular model (like detector or beam) for an image is requested,
it sets the model using the format class and then returns the model
"""
def _get_item_from_parent_or_format(self, item_name, index):
"""
Obtain an $item_name (eg. detector, beam, ...) of the given index from
the parent class using get_detector, get_beam, ...
If the parent class returns None then lookup the item using the format
class (if defined) and store a local reference to the item using
self.set_detector, set_beam, ..
"""
if index is None:
index = 0
item = getattr(super(ImageSetLazy, self), "get_" + item_name)(index)
if item is None:
# If check_format=False was used, then _current_instance_ will not be set, so assume a None is correct
format_class = self.get_format_class()
if (
hasattr(format_class, "_current_instance_")
and format_class._current_instance_ is not None
):
format_instance = format_class._current_instance_
getter_function = getattr(format_instance, "get_" + item_name)
item = getter_function(self.indices()[index])
setter_function = getattr(self, "set_" + item_name)
setter_function(item, index)
return item
[docs] def get_detector(self, index=None):
return self._get_item_from_parent_or_format("detector", index)
[docs] def get_goniometer(self, index=None):
return self._get_item_from_parent_or_format("goniometer", index)
def _load_models(self, index):
if index is None:
index = 0
# Sets the list for detector, beam etc before being accessed by functions in imageset.h
self.get_detector(index)
self.get_beam(index)
self.get_goniometer(index)
self.get_scan(index)
def __getitem__(self, item):
if isinstance(item, slice):
return ImageSetLazy(self.data(), indices=self.indices()[item])
self._load_models(item)
return super(ImageSetLazy, self).__getitem__(item)
[docs] def get_corrected_data(self, index):
self._load_models(index)
return super(ImageSetLazy, self).get_corrected_data(index)
[docs] def get_gain(self, index):
self._load_models(index)
return super(ImageSetLazy, self).get_gain(index)
@boost.python.inject_into(ImageSequence)
class _(object):
def __getitem__(self, item):
"""Get an item from the sequence stream.
If the item is an index, read and return the image at the given index.
Otherwise, if the item is a slice, then create a new Sequence object
with the given number of array indices from the slice.
Params:
item The index or slice
Returns:
An image or new Sequence object
"""
if isinstance(item, slice):
offset = self.get_scan().get_batch_offset()
if item.step is not None:
raise IndexError("Sequences must be sequential")
# nasty workaround for https://github.com/dials/dials/issues/1153
# slices with -1 in them are meaningful :-/ so grab the original
# constructor arguments of the slice object.
# item.start and item.stop may have been compromised at this point.
if offset < 0:
start, stop, step = item.__reduce__()[1]
if start is None:
start = 0
else:
start -= offset
if stop is None:
stop = len(self)
else:
stop -= offset
return self.partial_set(start, stop)
else:
start = item.start or 0
stop = item.stop or (len(self) + offset)
return self.partial_set(start - offset, stop - offset)
else:
return self.get_corrected_data(item)
def get_template(self):
"""Return the template"""
return self.data().get_template()
def _analyse_files(filenames):
"""Group images by filename into image sets.
Params:
filenames The list of filenames
Returns:
A list of (template, [indices], is_sequence)
"""
# Analyse filenames to figure out how many imagesets we have
filelist_per_imageset = group_files_by_imageset(filenames)
def _indices_sequential_ge_zero(indices):
"""Determine if indices are sequential."""
prev = indices[0]
if prev < 0:
return False
for curr in indices[1:]:
if curr != prev + 1:
return False
prev = curr
return True
def _is_imageset_a_sequence(template, indices):
"""Return True/False if the imageset is a sequence or not.
Where more than 1 image that follow sequential numbers are given
the images are catagorised as belonging to a sequence, otherwise they
belong to an image set.
"""
if len(indices) <= 1:
return False
indices = sorted(indices)
return _indices_sequential_ge_zero(indices)
# Label each group as either an imageset or a sequence.
file_groups = []
for template, indices in filelist_per_imageset.items():
# Check if this imageset is a sequence
is_sequence = _is_imageset_a_sequence(template, indices)
# Append the items to the group list
file_groups.append((template, indices, is_sequence))
# Return the groups of files
return file_groups
# FIXME Lots of duplication in this class, need to tidy up
[docs]class ImageSetFactory(object):
"""Factory to create imagesets and sequences."""
[docs] @staticmethod
def new(filenames, check_headers=False, ignore_unknown=False):
"""Create an imageset or sequence
Params:
filenames A list of filenames
check_headers Check the headers to ensure all images are valid
ignore_unknown Ignore unknown formats
Returns:
A list of imagesets
"""
# Ensure we have enough images
if isinstance(filenames, list):
assert filenames
elif isinstance(filenames, str):
filenames = [filenames]
else:
raise RuntimeError("unknown argument passed to ImageSetFactory")
# Analyse the filenames and group the images into imagesets.
filelist_per_imageset = _analyse_files(filenames)
# For each file list denoting an image set, create the imageset
# and return as a list of imagesets. N.B sequences and image sets are
# returned in the same list.
imagesetlist = []
for filelist in filelist_per_imageset:
try:
if filelist[2] is True:
iset = ImageSetFactory._create_sequence(filelist, check_headers)
else:
iset = ImageSetFactory._create_imageset(filelist, check_headers)
imagesetlist.append(iset)
except Exception:
if not ignore_unknown:
raise
# Return the imageset list
return imagesetlist
[docs] @staticmethod
def from_template(
template,
image_range=None,
check_headers=False,
check_format=True,
beam=None,
detector=None,
goniometer=None,
scan=None,
):
"""Create a new sequence from a template.
Params:
template The template argument
image_range The image range
check_headers Check the headers to ensure all images are valid
Returns:
A list of sequences
"""
if not check_format:
assert not check_headers
# Check the template is valid
if template.count("#") == 0:
if "master" not in template:
raise ValueError("Invalid template")
filenames = [template]
else:
# Get the template format
pfx = template.split("#")[0]
sfx = template.split("#")[-1]
template_format = "%s%%0%dd%s" % (
pfx.replace("%", "%%"),
template.count("#"),
sfx.replace("%", "%%"),
)
# Get the template image range
if image_range is None:
image_range = template_image_range(template)
# Set the image range
array_range = range(image_range[0] - 1, image_range[1])
# Create the sequence file list
filenames = [template_format % (i + 1) for i in array_range]
# Import here as Format and Imageset have cyclic dependencies
from dxtbx.format.Format import Format
# Get the format class
if check_format:
format_class = dxtbx.format.Registry.get_format_class_for_file(filenames[0])
else:
format_class = Format
# Create the sequence object
sequence = format_class.get_imageset(
filenames,
template=template,
as_sequence=True,
beam=beam,
detector=detector,
goniometer=goniometer,
scan=scan,
check_format=check_format,
)
# Return the sequence
return [sequence]
@staticmethod
def _create_imageset(filelist, check_headers):
"""Create an image set"""
# Extract info from filelist
template, indices, is_sequence = filelist
# Get the template format
count = template.count("#")
if count > 0:
pfx = template.split("#")[0]
sfx = template.split("#")[-1]
template_format = "%s%%0%dd%s" % (
pfx.replace("%", "%%"),
template.count("#"),
sfx.replace("%", "%%"),
)
filenames = [template_format % index for index in indices]
else:
filenames = [template]
# Sort the filenames
filenames = sorted(filenames)
# Get the format object
format_class = dxtbx.format.Registry.get_format_class_for_file(filenames[0])
# Create and return the imageset
return format_class.get_imageset(filenames, as_imageset=True)
@staticmethod
def _create_sequence(filelist, check_headers):
"""Create a sequence"""
# Extract info from filelist
template, indices, is_sequence = filelist
# Get the template format
count = template.count("#")
if count > 0:
pfx = template.split("#")[0]
sfx = template.split("#")[-1]
template_format = "%s%%0%dd%s" % (
pfx.replace("%", "%%"),
template.count("#"),
sfx.replace("%", "%%"),
)
filenames = [template_format % index for index in indices]
else:
filenames = [template]
# Sort the filenames
filenames = sorted(filenames)
# Get the format object
format_class = dxtbx.format.Registry.get_format_class_for_file(filenames[0])
# Get the template format
pfx = template.split("#")[0]
sfx = template.split("#")[-1]
template_format = "%s%%0%dd%s" % (
pfx.replace("%", "%%"),
template.count("#"),
sfx.replace("%", "%%"),
)
# Set the image range
array_range = range(min(indices) - 1, max(indices))
# Create the sequence file list
filenames = [template_format % (i + 1) for i in array_range]
sequence = format_class.get_imageset(
filenames, template=template, as_sequence=True
)
# Return the sequence
return sequence
[docs] @staticmethod
def make_imageset(
filenames,
format_class=None,
check_format=True,
single_file_indices=None,
format_kwargs=None,
):
"""Create an image set"""
# Import here as Format and Imageset have cyclic dependencies
from dxtbx.format.Format import Format
# So does FormatMultiImage
from dxtbx.format.FormatMultiImage import FormatMultiImage
# Get the format object
if format_class is None:
if check_format:
format_class = dxtbx.format.Registry.get_format_class_for_file(
filenames[0]
)
else:
if single_file_indices is None or len(single_file_indices) == 0:
format_class = Format
else:
format_class = FormatMultiImage
return format_class.get_imageset(
filenames,
single_file_indices=single_file_indices,
as_imageset=True,
format_kwargs=format_kwargs,
check_format=check_format,
)
[docs] @staticmethod
def make_sequence(
template,
indices,
format_class=None,
beam=None,
detector=None,
goniometer=None,
scan=None,
check_format=True,
format_kwargs=None,
):
"""Create a sequence"""
indices = sorted(indices)
# Get the template format
count = template.count("#")
if count > 0:
pfx = template.split("#")[0]
sfx = template.split("#")[-1]
template_format = "%s%%0%dd%s" % (
pfx.replace("%", "%%"),
template.count("#"),
sfx.replace("%", "%%"),
)
filenames = [template_format % index for index in indices]
else:
template_format = None
filenames = [template]
# Sort the filenames
filenames = sorted(filenames)
# Set the image range
array_range = (min(indices) - 1, max(indices))
if scan is not None:
assert array_range == scan.get_array_range()
scan.set_batch_offset(array_range[0])
# Get the format object and reader
if format_class is None:
# Import here as Format and Imageset have cyclic dependencies
from dxtbx.format.Format import Format
if check_format:
format_class = dxtbx.format.Registry.get_format_class_for_file(
filenames[0]
)
else:
format_class = Format
sequence = format_class.get_imageset(
filenames,
beam=beam,
detector=detector,
goniometer=goniometer,
scan=scan,
format_kwargs=format_kwargs,
template=template,
as_sequence=True,
check_format=check_format,
single_file_indices=list(range(*array_range)),
)
# Return the sequence
return sequence
[docs] @staticmethod
def imageset_from_anyset(imageset):
"""Create a new ImageSet object from an imageset object. Converts ImageSequence to ImageSet."""
if isinstance(imageset, ImageSetLazy):
return ImageSetLazy(imageset.data(), imageset.indices())
elif isinstance(imageset, ImageSequence) or isinstance(imageset, ImageSet):
return ImageSet(imageset.data(), imageset.indices())
else:
raise ValueError("Unrecognized imageset type: %s" % str(type(imageset)))