from __future__ import annotations
from typing import Iterable
import natsort
import boost_adaptbx.boost.python
import dxtbx.format.image # noqa: F401, import dependency for unpickling
import dxtbx.format.Registry
from dxtbx.sequence_filenames import group_files_by_imageset, template_image_range
try:
from .dxtbx_imageset_ext import (
ExternalLookup,
ExternalLookupItemBool,
ExternalLookupItemDouble,
ImageGrid,
ImageSequence,
ImageSet,
ImageSetData,
)
except ModuleNotFoundError:
from dxtbx_imageset_ext import ( # type: ignore
ExternalLookup,
ExternalLookupItemBool,
ExternalLookupItemDouble,
ImageGrid,
ImageSequence,
ImageSet,
ImageSetData,
)
ext = boost_adaptbx.boost.python.import_ext("dxtbx_ext")
__all__ = (
"ExternalLookup",
"ExternalLookupItemBool",
"ExternalLookupItemDouble",
"ImageGrid",
"ImageSet",
"ImageSetData",
"ImageSetFactory",
"ImageSetLazy",
"ImageSequence",
"MemReader",
)
def _expand_template_to_sorted_filenames(
template: str, indices: Iterable[int]
) -> list[str]:
"""Expand a template string to a list of filenames.
Args:
template: The template string, with a block of '#' to replace
indices: The numeric indices to insert
"""
pfx = template.split("#")[0]
sfx = template.split("#")[-1]
count = template.count("#")
if count == 1:
# Special handling for a template with a single "#", which does not
# assume a zero-padded index.
filenames = [f"{pfx}{index}{sfx}" for index in indices]
else:
filenames = [f"{pfx}{index:0{count}}{sfx}" for index in indices]
return natsort.natsorted(filenames)
[docs]
class MemReader:
"""A reader for data already loaded in memory"""
[docs]
def __init__(self, images):
self._images = images
[docs]
def copy(self, paths):
"""
Experimental implementation where a copy of the reader also copies all
the data
"""
return MemReader(self._images)
[docs]
def paths(self):
return ["" for im in self._images]
[docs]
def identifiers(self):
return self.paths()
def __len__(self):
return len(self._images)
[docs]
def read(self, index):
format_instance = self._images[index]
return format_instance.get_raw_data()
[docs]
@staticmethod
def is_single_file_reader():
return False
[docs]
@staticmethod
def master_path():
return ""
@boost_adaptbx.boost.python.inject_into(ImageSet)
class _:
"""
A class to inject additional methods into the imageset class
"""
def __getitem__(self, item):
"""Get an item from the image set stream.
If the item is an index, read and return the image at the given index.
Otherwise, if the item is a slice, then create a new ImageSet object
with the given number of array indices from the slice.
Params:
item The index or slice
Returns:
An image or new ImageSet object
"""
if isinstance(item, slice):
start = item.start or 0
stop = item.stop or len(self)
if item.step is not None and item.step != 1:
raise IndexError("Step must be 1")
if self.data().has_single_file_reader():
reader = self.reader().copy(self.reader().paths(), stop - start)
else:
reader = self.reader().copy(self.reader().paths())
return self.partial_set(reader, start, stop)
else:
return self.get_corrected_data(item)
def __iter__(self):
"""Iterate over the array indices and read each image in turn."""
for i in range(len(self)):
yield self[i]
def get_vendortype(self, index):
"""Get the vendor information."""
return self.data().get_vendor()
def get_format_class(self):
"""Get format class name"""
return self.data().get_format_class()
def get_spectrum(self, index):
"""Get the spectrum if available"""
kwargs = self.params()
if self.data().has_single_file_reader():
format_instance = self.get_format_class().get_instance(
self.data().get_master_path(), **kwargs
)
else:
format_instance = self.get_format_class().get_instance(
self.get_path(index), **kwargs
)
try:
return format_instance.get_spectrum(self.indices()[index])
except TypeError:
return format_instance.get_spectrum()
def params(self):
"""Get the parameters"""
return self.data().get_params()
def get_detectorbase(self, index):
"""
A function to be injected into the imageset to get the detectorbase instance
"""
kwargs = self.params()
if self.data().has_single_file_reader():
format_instance = self.get_format_class().get_instance(
self.data().get_master_path(), **kwargs
)
return format_instance.get_detectorbase(self.indices()[index])
else:
format_instance = self.get_format_class().get_instance(
self.get_path(index), **kwargs
)
return format_instance.get_detectorbase()
def reader(self):
"""
Return the reader
"""
return self.data().reader()
def masker(self):
"""
Return the masker
"""
return self.data().masker()
def paths(self):
"""
Return the list of paths
"""
if self.data().has_single_file_reader():
return [self.get_path(i) for i in range(len(self))]
else:
return [self.reader().paths()[i] for i in self.indices()]
[docs]
class ImageSetLazy(ImageSet):
"""
Lazy ImageSet class that doesn't necessitate setting the models ahead of time.
Only when a particular model (like detector or beam) for an image is requested,
it sets the model using the format class and then returns the model
"""
def _get_item_from_parent_or_format(self, item_name, index):
"""
Obtain an $item_name (eg. detector, beam, ...) of the given index from
the parent class using get_detector, get_beam, ...
If the parent class returns None then lookup the item using the format
class (if defined) and store a local reference to the item using
self.set_detector, set_beam, ..
"""
if index is None:
index = 0
item = getattr(super(), "get_" + item_name)(index)
if item is None:
# If check_format=False was used, then _current_instance_ will not be set, so assume a None is correct
format_class = self.get_format_class()
if (
hasattr(format_class, "_current_instance_")
and format_class._current_instance_ is not None
):
format_instance = format_class._current_instance_
getter_function = getattr(format_instance, "get_" + item_name)
item = getter_function(self.indices()[index])
setter_function = getattr(self, "set_" + item_name)
setter_function(item, index)
return item
[docs]
def get_detector(self, index=None):
return self._get_item_from_parent_or_format("detector", index)
[docs]
def get_beam(self, index=None):
return self._get_item_from_parent_or_format("beam", index)
[docs]
def get_mask(self, index=None):
"""
ImageSet::get_mask internally dereferences a pointer to the _detector
member of ImageSetData, so we ensure the detector gets populated first.
"""
if getattr(super(), "get_detector")(index) is None:
self._load_models(index)
return self._get_item_from_parent_or_format("mask", index)
[docs]
def get_goniometer(self, index=None):
return self._get_item_from_parent_or_format("goniometer", index)
[docs]
def get_scan(self, index=None):
return self._get_item_from_parent_or_format("scan", index)
def _load_models(self, index):
if index is None:
index = 0
# Sets the list for detector, beam etc before being accessed by functions in imageset.h
self.get_detector(index)
self.get_beam(index)
self.get_goniometer(index)
self.get_scan(index)
def __getitem__(self, item):
if isinstance(item, slice):
start = item.start or 0
stop = item.stop or len(self)
if item.step is not None and item.step != 1:
raise IndexError("Step must be 1")
if self.data().has_single_file_reader():
reader = self.reader().copy(self.reader().paths(), stop - start)
else:
reader = self.reader().copy(self.reader().paths())
return ImageSetLazy(
self.data().partial_data(reader, start, stop),
indices=self.indices()[item],
)
self._load_models(item)
return super().__getitem__(item)
[docs]
def get_corrected_data(self, index):
self._load_models(index)
return super().get_corrected_data(index)
[docs]
def get_gain(self, index):
self._load_models(index)
return super().get_gain(index)
@boost_adaptbx.boost.python.inject_into(ImageSequence)
class _imagesequence:
def __getitem__(self, item):
"""Get an item from the sequence stream.
If the item is an index, read and return the image at the given index.
Otherwise, if the item is a slice, then create a new Sequence object
with the given number of array indices from the slice.
Params:
item The index or slice
Returns:
An image or new Sequence object
"""
if not isinstance(item, slice):
return self.get_corrected_data(item)
else:
if item.step is not None:
raise IndexError("Sequences must be sequential")
start = item.start or 0
stop = item.stop or len(self)
if self.data().has_single_file_reader():
reader = self.reader().copy(self.reader().paths(), stop - start)
else:
reader = self.reader().copy(self.reader().paths())
return self.partial_set(reader, start, stop)
def get_template(self):
"""Return the template"""
return self.data().get_template()
def _analyse_files(filenames):
"""Group images by filename into image sets.
Params:
filenames The list of filenames
Returns:
A list of (template, [indices], is_sequence)
"""
# Analyse filenames to figure out how many imagesets we have
filelist_per_imageset = group_files_by_imageset(filenames)
def _indices_sequential_ge_zero(indices):
"""Determine if indices are sequential."""
prev = indices[0]
if prev < 0:
return False
for curr in indices[1:]:
if curr != prev + 1:
return False
prev = curr
return True
def _is_imageset_a_sequence(template, indices):
"""Return True/False if the imageset is a sequence or not.
Where more than 1 image that follow sequential numbers are given
the images are catagorised as belonging to a sequence, otherwise they
belong to an image set.
"""
if len(indices) <= 1:
return False
indices = sorted(indices)
return _indices_sequential_ge_zero(indices)
# Label each group as either an imageset or a sequence.
file_groups = []
for template, indices in filelist_per_imageset.items():
# Check if this imageset is a sequence
is_sequence = _is_imageset_a_sequence(template, indices)
# Append the items to the group list
file_groups.append((template, indices, is_sequence))
return file_groups
# FIXME Lots of duplication in this class, need to tidy up
[docs]
class ImageSetFactory:
"""Factory to create imagesets and sequences."""
[docs]
@staticmethod
def new(filenames, check_headers=False, ignore_unknown=False):
"""Create an imageset or sequence
Params:
filenames A list of filenames
check_headers Check the headers to ensure all images are valid
ignore_unknown Ignore unknown formats
Returns:
A list of imagesets
"""
# Ensure we have enough images
if isinstance(filenames, list):
assert filenames
elif isinstance(filenames, str):
filenames = [filenames]
else:
raise RuntimeError("unknown argument passed to ImageSetFactory")
# Analyse the filenames and group the images into imagesets.
filelist_per_imageset = _analyse_files(filenames)
# For each file list denoting an image set, create the imageset
# and return as a list of imagesets. N.B sequences and image sets are
# returned in the same list.
imagesetlist = []
for filelist in filelist_per_imageset:
try:
if filelist[2] is True:
iset = ImageSetFactory._create_sequence(filelist, check_headers)
else:
iset = ImageSetFactory._create_imageset(filelist, check_headers)
imagesetlist.append(iset)
except Exception:
if not ignore_unknown:
raise
return imagesetlist
[docs]
@staticmethod
def from_template(
template,
image_range=None,
check_headers=False,
check_format=True,
beam=None,
detector=None,
goniometer=None,
scan=None,
):
"""Create a new sequence from a template.
Params:
template The template argument
image_range The image range
check_headers Check the headers to ensure all images are valid
Returns:
A list of sequences
"""
if not check_format:
assert not check_headers
# Check the template is valid
if "#" in template:
# Get the template image range
if image_range is None:
image_range = template_image_range(template)
# Set the image range
indices = range(image_range[0], image_range[1] + 1)
filenames = _expand_template_to_sorted_filenames(template, indices)
else:
if "master" not in template:
raise ValueError("Invalid template")
filenames = [template]
# Import here as Format and Imageset have cyclic dependencies
from dxtbx.format.Format import Format
# Get the format class
if check_format:
format_class = dxtbx.format.Registry.get_format_class_for_file(filenames[0])
else:
format_class = Format
# Create the sequence object
sequence = format_class.get_imageset(
filenames,
template=template,
as_sequence=True,
beam=beam,
detector=detector,
goniometer=goniometer,
scan=scan,
check_format=check_format,
)
return [sequence]
@staticmethod
def _create_imageset(filelist, check_headers):
"""Create an image set"""
# Extract info from filelist
template, indices, is_sequence = filelist
# Get the template format
if "#" in template:
filenames = _expand_template_to_sorted_filenames(template, indices)
else:
filenames = [template]
# Get the format object
format_class = dxtbx.format.Registry.get_format_class_for_file(filenames[0])
# Create and return the imageset
return format_class.get_imageset(filenames, as_imageset=True)
@staticmethod
def _create_sequence(filelist, check_headers):
"""Create a sequence"""
template, indices, is_sequence = filelist
# Expand the template if necessary
if "#" in template:
filenames = _expand_template_to_sorted_filenames(template, indices)
else:
filenames = [template]
# Get the format object
format_class = dxtbx.format.Registry.get_format_class_for_file(filenames[0])
return format_class.get_imageset(filenames, template=template, as_sequence=True)
[docs]
@staticmethod
def make_imageset(
filenames,
format_class=None,
check_format=True,
single_file_indices=None,
format_kwargs=None,
):
"""Create an image set"""
# Import here as Format and Imageset have cyclic dependencies
from dxtbx.format.Format import Format
# So does FormatMultiImage
from dxtbx.format.FormatMultiImage import FormatMultiImage
# Get the format object
if format_class is None:
if check_format:
format_class = dxtbx.format.Registry.get_format_class_for_file(
filenames[0]
)
else:
if single_file_indices is None or len(single_file_indices) == 0:
format_class = Format
else:
format_class = FormatMultiImage
return format_class.get_imageset(
filenames,
single_file_indices=single_file_indices,
as_imageset=True,
format_kwargs=format_kwargs,
check_format=check_format,
)
[docs]
@staticmethod
def make_sequence(
template,
indices,
format_class=None,
beam=None,
detector=None,
goniometer=None,
scan=None,
check_format=True,
format_kwargs=None,
):
"""Create a sequence"""
indices = sorted(indices)
# Get the template format
if "#" in template:
filenames = _expand_template_to_sorted_filenames(template, indices)
else:
filenames = [template]
# Set the image range
array_range = (min(indices) - 1, max(indices))
if scan is not None:
assert array_range == scan.get_array_range()
scan.set_batch_offset(array_range[0])
# Get the format object and reader
if format_class is None:
# Import here as Format and Imageset have cyclic dependencies
from dxtbx.format.Format import Format
if check_format:
format_class = dxtbx.format.Registry.get_format_class_for_file(
filenames[0]
)
else:
format_class = Format
return format_class.get_imageset(
filenames,
beam=beam,
detector=detector,
goniometer=goniometer,
scan=scan,
format_kwargs=format_kwargs,
template=template,
as_sequence=True,
check_format=check_format,
single_file_indices=list(range(*array_range)),
)
[docs]
@staticmethod
def imageset_from_anyset(imageset):
"""Create a new ImageSet object from an imageset object. Converts ImageSequence to ImageSet."""
if isinstance(imageset, ImageSetLazy):
return ImageSetLazy(imageset.data(), imageset.indices())
elif isinstance(imageset, ImageSequence) or isinstance(imageset, ImageSet):
return ImageSet(imageset.data(), imageset.indices())
else:
raise ValueError("Unrecognized imageset type: %s" % str(type(imageset)))