This documentation page refers to a previous release of DIALS (2.2).
Click here to go to the corresponding page for the latest version of DIALS

Source code for dials.algorithms.integration.processor

# coding: utf-8

from __future__ import absolute_import, division, print_function

import itertools
import logging
import math
from dials.util import tabulate
from time import time

import psutil

import boost.python
import dials.algorithms.integration
import libtbx
from dials.array_family import flex
from dials_algorithms_integration_integrator_ext import (
    Executor,
    Group,
    GroupList,
    Job,
    JobList,
    ReflectionManager,
    ReflectionManagerPerImage,
    ShoeboxProcessor,
)

try:
    import resource
except ImportError:
    # resource does not exist on non-Linux, so can't float the import
    resource = None

__all__ = [
    "Block",
    "build_processor",
    "Debug",
    "ExecuteParallelTask",
    "Executor",
    "Group",
    "GroupList",
    "Job",
    "job",
    "JobList",
    "Lookup",
    "Manager",
    "ManagerRot",
    "ManagerStills",
    "MultiProcessing",
    "NullTask",
    "Parameters",
    "Processor",
    "Processor2D",
    "Processor3D",
    "ProcessorFlat3D",
    "ProcessorSingle2D",
    "ProcessorStills",
    "ReflectionManager",
    "ReflectionManagerPerImage",
    "Result",
    "Shoebox",
    "ShoeboxProcessor",
    "Task",
]

logger = logging.getLogger(__name__)


def _average_bbox_size(reflections):
    """Calculate the average bbox size for debugging"""

    bbox = reflections["bbox"]
    sel = flex.random_selection(len(bbox), min(len(bbox), 1000))
    subset_bbox = bbox.select(sel)
    xmin, xmax, ymin, ymax, zmin, zmax = subset_bbox.parts()
    xsize = flex.mean((xmax - xmin).as_double())
    ysize = flex.mean((ymax - ymin).as_double())
    zsize = flex.mean((zmax - zmin).as_double())
    return xsize, ysize, zsize


@boost.python.inject_into(Executor)
class _(object):
    @staticmethod
    def __getinitargs__():
        return ()


class _Job(object):
    def __init__(self):
        self.index = 0
        self.nthreads = 1


job = _Job()


class MultiProcessing(object):
    """
    Multi processing parameters
    """

    def __init__(self):
        self.method = "multiprocessing"
        self.nproc = 1
        self.njobs = 1
        self.nthreads = 1

    def update(self, other):
        self.method = other.method
        self.nproc = other.nproc
        self.njobs = other.njobs
        self.nthreads = other.nthreads


class Lookup(object):
    """
    Lookup parameters
    """

    def __init__(self):
        self.mask = None

    def update(self, other):
        self.mask = other.mask


class Block(object):
    """
    Block parameters
    """

    def __init__(self):
        self.size = libtbx.Auto
        self.units = "degrees"
        self.threshold = 0.99
        self.force = False
        self.max_memory_usage = 0.90

    def update(self, other):
        self.size = other.size
        self.units = other.units
        self.threshold = other.threshold
        self.force = other.force
        self.max_memory_usage = other.max_memory_usage


class Shoebox(object):
    """
    Shoebox parameters
    """

    def __init__(self):
        self.flatten = False
        self.partials = False

    def update(self, other):
        self.flatten = other.flatten
        self.partials = other.partials


class Debug(object):
    """
    Debug parameters
    """

    def __init__(self):
        self.output = False
        self.select = None
        self.split_experiments = True
        self.separate_files = True

    def update(self, other):
        self.output = other.output
        self.select = other.select
        self.split_experiments = other.split_experiments
        self.separate_files = other.separate_files


class Parameters(object):
    """
    Class to handle parameters for the processor
    """

    def __init__(self):
        """
        Initialize the parameters
        """
        self.mp = MultiProcessing()
        self.lookup = Lookup()
        self.block = Block()
        self.shoebox = Shoebox()
        self.debug = Debug()

    def update(self, other):
        """
        Update the parameters
        """
        self.mp.update(other.mp)
        self.lookup.update(other.lookup)
        self.block.update(other.block)
        self.shoebox.update(other.shoebox)
        self.debug.update(other.debug)


class ExecuteParallelTask(object):
    """
    Helper class to run things on cluster
    """

    def __call__(self, task):
        from dials.util import log

        log.config_simple_cached()
        result = task()
        handlers = logging.getLogger("dials").handlers
        assert len(handlers) == 1, "Invalid number of logging handlers"
        return result, handlers[0].messages()


class Processor(object):
    """Processor interface class."""

    def __init__(self, manager):
        """
        Initialise the processor.

        The processor requires a manager class implementing the Manager interface.
        This class executes all the workers in separate threads and accumulates the
        results to expose to the user.

        :param manager: The processing manager
        :param params: The phil parameters
        """
        self.manager = manager

    @property
    def executor(self):
        """
        Get the executor

        :return: The executor
        """
        return self.manager.executor

    @executor.setter
    def executor(self, function):
        """
        Set the executor

        :param function: The executor
        """
        self.manager.executor = function

    def process(self):
        """
        Do all the processing tasks.

        :return: The processing results
        """
        from dials.util.mp import multi_node_parallel_map
        import platform

        start_time = time()
        self.manager.initialize()
        mp_method = self.manager.params.mp.method
        mp_njobs = self.manager.params.mp.njobs
        mp_nproc = self.manager.params.mp.nproc
        if (
            mp_njobs * mp_nproc
        ) > 1 and platform.system() == "Windows":  # platform.system() forks which is bad for MPI, so don't use it unless nproc > 1
            logger.warning(
                "Multiprocessing is not available on windows. Setting nproc = 1\n"
            )
            mp_nproc = 1
            mp_njobs = 1
        assert mp_nproc > 0, "Invalid number of processors"
        if mp_nproc * mp_njobs > len(self.manager):
            mp_nproc = min(mp_nproc, len(self.manager))
            mp_njobs = int(math.ceil(len(self.manager) / mp_nproc))
        logger.info(self.manager.summary())
        if mp_njobs > 1:
            assert mp_method != "none" and mp_method is not None
            logger.info(
                " Using %s with %d parallel job(s) and %d processes per node\n"
                % (mp_method, mp_njobs, mp_nproc)
            )
        else:
            logger.info(" Using multiprocessing with %d parallel job(s)\n" % (mp_nproc))
        if mp_njobs * mp_nproc > 1:

            def process_output(result):
                for message in result[1]:
                    logger.log(message.levelno, message.msg)
                self.manager.accumulate(result[0])
                result[0].reflections = None
                result[0].data = None

            multi_node_parallel_map(
                func=ExecuteParallelTask(),
                iterable=list(self.manager.tasks()),
                njobs=mp_njobs,
                nproc=mp_nproc,
                callback=process_output,
                cluster_method=mp_method,
                preserve_order=True,
                preserve_exception_message=True,
            )
        else:
            for task in self.manager.tasks():
                self.manager.accumulate(task())
        self.manager.finalize()
        end_time = time()
        self.manager.time.user_time = end_time - start_time
        result1, result2 = self.manager.result()
        return result1, result2, self.manager.time


class Result(object):
    """
    A class representing a processing result.
    """

    def __init__(self, index, reflections, data=None):
        """
        Initialise the data.

        :param index: The processing job index
        :param reflections: The processed reflections
        :param data: Other processed data
        """
        self.index = index
        self.reflections = reflections
        self.data = data


class NullTask(object):
    """
    A class to perform a null task.
    """

    def __init__(self, index, reflections):
        """
        Initialise the task

        :param index: The index of the processing job
        :param experiments: The list of experiments
        :param reflections: The list of reflections
        """
        self.index = index
        self.reflections = reflections

    def __call__(self):
        """
        Do the processing.

        :return: The processed data
        """
        result = Result(self.index, self.reflections, None)
        result.read_time = 0
        result.extract_time = 0
        result.process_time = 0
        result.total_time = 0
        return result


class Task(object):
    """
    A class to perform a processing task.
    """

    def __init__(self, index, job, experiments, reflections, params, executor=None):
        """
        Initialise the task.

        :param index: The index of the processing job
        :param experiments: The list of experiments
        :param reflections: The list of reflections
        :param params: The processing parameters
        :param job: The frames to integrate
        :param flatten: Flatten the shoeboxes
        :param save_shoeboxes: Save the shoeboxes to file
        :param executor: The executor class
        """
        assert executor is not None, "No executor given"
        assert len(reflections) > 0, "Zero reflections given"
        self.index = index
        self.job = job
        self.experiments = experiments
        self.reflections = reflections
        self.params = params
        self.executor = executor

    def __call__(self):
        """
        Do the processing.

        :return: The processed data
        """
        from dials.model.data import make_image

        # Get the start time
        start_time = time()

        # Set the global process ID
        job.index = self.index

        # Check all reflections have same imageset and get it
        exp_id = list(set(self.reflections["id"]))
        imageset = self.experiments[exp_id[0]].imageset
        for i in exp_id[1:]:
            assert (
                self.experiments[i].imageset == imageset
            ), "Task can only handle 1 imageset"

        # Get the sub imageset
        frame0, frame1 = self.job

        try:
            allowed_range = imageset.get_array_range()
        except Exception:
            allowed_range = 0, len(imageset)

        try:
            # range increasing
            assert frame0 < frame1

            # within an increasing range
            assert allowed_range[1] > allowed_range[0]

            # we are processing data which is within range
            assert frame0 >= allowed_range[0]
            assert frame1 <= allowed_range[1]

            # I am 99% sure this is implied by all the code above
            assert (frame1 - frame0) <= len(imageset)
            imageset = imageset[frame0:frame1]
        except Exception as e:
            raise RuntimeError("Programmer Error: bad array range: %s" % str(e))

        try:
            frame0, frame1 = imageset.get_array_range()
        except Exception:
            frame0, frame1 = (0, len(imageset))

        self.executor.initialize(frame0, frame1, self.reflections)

        # Set the shoeboxes (don't allocate)
        self.reflections["shoebox"] = flex.shoebox(
            self.reflections["panel"],
            self.reflections["bbox"],
            allocate=False,
            flatten=self.params.shoebox.flatten,
        )

        # Create the processor
        processor = ShoeboxProcessor(
            self.reflections,
            len(imageset.get_detector()),
            frame0,
            frame1,
            self.params.debug.output,
        )

        # Loop through the imageset, extract pixels and process reflections
        read_time = 0.0
        for i in range(len(imageset)):
            st = time()
            image = imageset.get_corrected_data(i)
            if imageset.is_marked_for_rejection(i):
                mask = tuple(flex.bool(im.accessor(), False) for im in image)
            else:
                mask = imageset.get_mask(i)
                if self.params.lookup.mask is not None:
                    assert len(mask) == len(self.params.lookup.mask), (
                        "Mask/Image are incorrect size %d %d"
                        % (len(mask), len(self.params.lookup.mask))
                    )
                    mask = tuple(
                        m1 & m2 for m1, m2 in zip(self.params.lookup.mask, mask)
                    )

            read_time += time() - st
            processor.next(make_image(image, mask), self.executor)
            del image
            del mask
        assert processor.finished(), "Data processor is not finished"

        # Optionally save the shoeboxes
        if self.params.debug.output and self.params.debug.separate_files:
            output = self.reflections
            if self.params.debug.select is not None:
                output = output.select(self.params.debug.select(output))
            if self.params.debug.split_experiments:
                output = output.split_by_experiment_id()
                for table in output:
                    i = table["id"][0]
                    table.as_file("shoeboxes_%d_%d.refl" % (self.index, i))
            else:
                output.as_file("shoeboxes_%d.refl" % self.index)

        # Delete the shoeboxes
        if self.params.debug.separate_files or not self.params.debug.output:
            del self.reflections["shoebox"]

        # Finalize the executor
        self.executor.finalize()

        # Return the result
        result = Result(self.index, self.reflections, self.executor.data())
        result.read_time = read_time
        result.extract_time = processor.extract_time()
        result.process_time = processor.process_time()
        result.total_time = time() - start_time
        return result


class Manager(object):
    """
    A class to manage processing book-keeping
    """

    def __init__(self, experiments, reflections, params):
        """
        Initialise the manager.

        :param experiments: The list of experiments
        :param reflections: The list of reflections
        :param params: The phil parameters
        """

        # Initialise the callbacks
        self.executor = None

        # Save some data
        self.experiments = experiments
        self.reflections = reflections

        # Other data
        self.data = {}

        # Save some parameters
        self.params = params

        # Set the finalized flag to False
        self.finalized = False

        # Initialise the timing information
        self.time = dials.algorithms.integration.TimingInfo()

    def initialize(self):
        """
        Initialise the processing
        """
        # Get the start time
        start_time = time()

        # Ensure the reflections contain bounding boxes
        assert "bbox" in self.reflections, "Reflections have no bbox"

        # Compute the block size and processors
        self.compute_blocks()
        self.compute_jobs()
        self.split_reflections()
        self.compute_processors()

        # Create the reflection manager
        self.manager = ReflectionManager(self.jobs, self.reflections)

        # Parallel reading of HDF5 from the same handle is not allowed. Python
        # multiprocessing is a bit messed up and used fork on linux so need to
        # close and reopen file.
        for exp in self.experiments:
            if exp.imageset.reader().is_single_file_reader():
                exp.imageset.reader().nullify_format_instance()

        # Set the initialization time
        self.time.initialize = time() - start_time

    def task(self, index):
        """
        Get a task.
        """
        job = self.manager.job(index)
        frames = job.frames()
        expr_id = job.expr()
        assert expr_id[1] > expr_id[0], "Invalid experiment id"
        assert expr_id[0] >= 0, "Invalid experiment id"
        assert expr_id[1] <= len(self.experiments), "Invalid experiment id"
        experiments = self.experiments  # [expr_id[0]:expr_id[1]]
        reflections = self.manager.split(index)
        if len(reflections) == 0:
            logger.warning("No reflections in job %d ***", index)
            task = NullTask(index=index, reflections=reflections)
        else:
            task = Task(
                index=index,
                job=frames,
                experiments=experiments,
                reflections=reflections,
                params=self.params,
                executor=self.executor,
            )
        return task

    def tasks(self):
        """
        Iterate through the tasks.
        """
        for i in range(len(self)):
            yield self.task(i)

    def accumulate(self, result):
        """Accumulate the results."""
        self.data[result.index] = result.data
        self.manager.accumulate(result.index, result.reflections)
        self.time.read += result.read_time
        self.time.extract += result.extract_time
        self.time.process += result.process_time
        self.time.total += result.total_time

    def finalize(self):
        """
        Finalize the processing and finish.
        """
        # Get the start time
        start_time = time()

        # Check manager is finished
        assert self.manager.finished(), "Manager is not finished"

        # Update the time and finalized flag
        self.time.finalize = time() - start_time
        self.finalized = True

    def result(self):
        """
        Return the result.

        :return: The result
        """
        assert self.finalized, "Manager is not finalized"
        return self.manager.data(), self.data

    def finished(self):
        """
        Return if all tasks have finished.

        :return: True/False all tasks have finished
        """
        return self.finalized and self.manager.finished()

    def __len__(self):
        """
        Return the number of tasks.

        :return: the number of tasks
        """
        return len(self.manager)

    def compute_blocks(self):
        """
        Compute the processing block size.
        """

        if self.params.block.size == libtbx.Auto:
            if (
                self.params.mp.nproc * self.params.mp.njobs == 1
                and not self.params.debug.output
                and not self.params.block.force
            ):
                self.params.block.size = None
            else:
                assert self.params.block.threshold > 0, "Threshold must be > 0"
                assert self.params.block.threshold <= 1.0, "Threshold must be < 1"
                nframes = sorted([b[5] - b[4] for b in self.reflections["bbox"]])
                cutoff = int(self.params.block.threshold * len(nframes))
                block_size = nframes[cutoff] * 2
                self.params.block.size = block_size
                self.params.block.units = "frames"

    def compute_jobs(self):
        """
        Compute the jobs
        """
        groups = itertools.groupby(
            range(len(self.experiments)),
            lambda x: (id(self.experiments[x].imageset), id(self.experiments[x].scan)),
        )
        self.jobs = JobList()
        for key, indices in groups:
            indices = list(indices)
            i0 = indices[0]
            i1 = indices[-1] + 1
            expr = self.experiments[i0]
            scan = expr.scan
            imgs = expr.imageset
            array_range = (0, len(imgs))
            if scan is not None:
                assert len(imgs) >= len(scan), "Invalid scan range"
                array_range = scan.get_array_range()
            if self.params.block.size is None:
                block_size_frames = array_range[1] - array_range[0]
            elif self.params.block.units == "radians":
                phi0, dphi = scan.get_oscillation(deg=False)
                block_size_frames = int(math.ceil(self.params.block.size / dphi))
            elif self.params.block.units == "degrees":
                phi0, dphi = scan.get_oscillation()
                block_size_frames = int(math.ceil(self.params.block.size / dphi))
            elif self.params.block.units == "frames":
                block_size_frames = int(math.ceil(self.params.block.size))
            else:
                raise RuntimeError(
                    "Unknown block_size units %r" % self.params.block.units
                )
            self.jobs.add((i0, i1), array_range, block_size_frames)
        assert len(self.jobs) > 0, "Invalid number of jobs"

    def split_reflections(self):
        """
        Split the reflections into partials or over job boundaries
        """

        # Optionally split the reflection table into partials, otherwise,
        # split over job boundaries
        if self.params.shoebox.partials:
            num_full = len(self.reflections)
            self.reflections.split_partials()
            num_partial = len(self.reflections)
            assert num_partial >= num_full, "Invalid number of partials"
            if num_partial > num_full:
                logger.info(
                    " Split %d reflections into %d partial reflections\n"
                    % (num_full, num_partial)
                )
        else:
            num_full = len(self.reflections)
            self.jobs.split(self.reflections)
            num_partial = len(self.reflections)
            assert num_partial >= num_full, "Invalid number of partials"
            if num_partial > num_full:
                num_split = num_partial - num_full
                logger.info(
                    " Split %d reflections overlapping job boundaries\n" % num_split
                )

        # Compute the partiality
        self.reflections.compute_partiality(self.experiments)

    def compute_processors(self):
        """
        Compute the number of processors
        """

        # Get the maximum shoebox memory to estimate memory use for one process
        memory_required_per_process = flex.max(
            self.jobs.shoebox_memory(self.reflections, self.params.shoebox.flatten)
        )

        # Obtain information about system memory
        available_memory = psutil.virtual_memory().available
        available_swap = psutil.swap_memory().free
        available_incl_swap = available_memory + available_swap
        available_limit = available_incl_swap * self.params.block.max_memory_usage
        available_immediate_limit = (
            available_memory * self.params.block.max_memory_usage
        )

        # Compile a memory report
        report = [
            "Memory situation report:",
        ]

        def _report(description, value):
            report.append("  %-50s:%5.1f GB" % (description, value))

        _report("Available system memory (excluding swap)", available_memory / 1e9)
        _report("Available swap memory", available_swap / 1e9)
        _report("Available system memory (including swap)", available_incl_swap / 1e9)
        _report(
            "Maximum memory for processing (including swap)", available_limit / 1e9,
        )
        _report(
            "Maximum memory for processing (excluding swap)",
            available_immediate_limit / 1e9,
        )
        _report("Memory required per process", memory_required_per_process / 1e9)

        # Check if a ulimit applies
        # Note that resource may be None on non-Linux platforms.
        # We can't use psutil as platform-independent solution in this instance due to
        # https://github.com/conda-forge/psutil-feedstock/issues/47
        rlimit = getattr(resource, "RLIMIT_VMEM", getattr(resource, "RLIMIT_AS", None))
        if rlimit:
            try:
                ulimit = resource.getrlimit(rlimit)[0]
                if ulimit <= 0:
                    report.append("  no memory ulimit set")
                else:
                    ulimit_used = psutil.Process().memory_info().rss
                    _report("Memory ulimit detected", ulimit / 1e9)
                    _report("Memory ulimit in use", ulimit_used / 1e9)
                    available_memory = max(
                        0, min(available_memory, ulimit - ulimit_used)
                    )
                    available_incl_swap = max(
                        0, min(available_incl_swap, ulimit - ulimit_used)
                    )
                    available_immediate_limit = (
                        available_memory * self.params.block.max_memory_usage
                    )
                    _report("Available system memory (limited)", available_memory / 1e9)
                    _report(
                        "Available system memory (incl. swap; limited)",
                        available_incl_swap / 1e9,
                    )
                    _report(
                        "Maximum memory for processing (exc. swap; limited)",
                        available_immediate_limit / 1e9,
                    )
            except Exception as e:
                logger.debug(
                    "Could not obtain ulimit values due to %s", str(e), exc_info=True
                )

        output_level = logging.INFO

        # Limit the number of parallel processes by amount of available memory
        if self.params.mp.method == "multiprocessing" and self.params.mp.nproc > 1:

            # Compute expected memory usage and warn if not enough
            njobs = available_immediate_limit / memory_required_per_process
            if njobs >= self.params.mp.nproc:
                # There is enough memory. Take no action
                pass
            elif njobs >= 1:
                # There is enough memory to run, but not as many processes as requested
                output_level = logging.WARNING
                report.append(
                    "Reducing number of processes from %d to %d due to memory constraints."
                    % (self.params.mp.nproc, int(njobs))
                )
                self.params.mp.nproc = int(njobs)
            elif (
                available_incl_swap * self.params.block.max_memory_usage
                >= memory_required_per_process
            ):
                # There is enough memory to run, but only if we count swap.
                output_level = logging.WARNING
                report.append(
                    "Reducing number of processes from %d to 1 due to memory constraints."
                    % self.params.mp.nproc,
                )
                report.append("Running this process will rely on swap usage!")
                self.params.mp.nproc = 1
            else:
                # There is not enough memory to run
                output_level = logging.ERROR

        report.append("")
        logger.log(output_level, "\n".join(report))

        if output_level >= logging.ERROR:
            raise RuntimeError(
                """
          Not enough memory to run integration jobs.  This could be caused by a
          highly mosaic crystal model.  Possible solutions include increasing the
          percentage of memory allowed for shoeboxes or decreasing the block size.
          The average shoebox size is %d x %d pixels x %d images - is your crystal
          really this mosaic?
          """
                % _average_bbox_size(self.reflections)
            )

    def summary(self):
        """
        Get a summary of the processing
        """
        # Compute the task table
        if self.experiments.all_stills():
            rows = [["#", "Group", "Frame From", "Frame To", "# Reflections"]]
            for i in range(len(self)):
                job = self.manager.job(i)
                group = job.index()
                f0, f1 = job.frames()
                n = self.manager.num_reflections(i)
                rows.append([str(i), str(group), str(f0), str(f1), str(n)])
        elif self.experiments.all_sequences():
            rows = [
                [
                    "#",
                    "Group",
                    "Frame From",
                    "Frame To",
                    "Angle From",
                    "Angle To",
                    "# Reflections",
                ]
            ]
            for i in range(len(self)):
                job = self.manager.job(i)
                group = job.index()
                expr = job.expr()
                f0, f1 = job.frames()
                scan = self.experiments[expr[0]].scan
                p0 = scan.get_angle_from_array_index(f0)
                p1 = scan.get_angle_from_array_index(f1)
                n = self.manager.num_reflections(i)
                rows.append(
                    [str(i), str(group), str(f0), str(f1), str(p0), str(p1), str(n)]
                )
        else:
            raise RuntimeError("Experiments must be all sequences or all stills")

        # The job table
        task_table = tabulate(rows, headers="firstrow")

        # The format string
        if self.params.block.size is None:
            block_size = "auto"
        else:
            block_size = str(self.params.block.size)
        fmt = (
            "Processing reflections in the following blocks of images:\n"
            "\n"
            " block_size: %s %s\n"
            "\n"
            "%s\n"
        )
        return fmt % (block_size, self.params.block.units, task_table)


class ManagerRot(Manager):
    """Specialize the manager for oscillation data using the oscillation pre and
    post processors."""

    def __init__(self, experiments, reflections, params):
        """Initialise the pre-processor, post-processor and manager."""

        # Ensure we have the correct type of data
        if not experiments.all_sequences():
            raise RuntimeError(
                """
        An inappropriate processing algorithm may have been selected!
         Trying to perform rotation processing when not all experiments
         are indicated as rotation experiments.
      """
            )

        # Initialise the manager
        super(ManagerRot, self).__init__(experiments, reflections, params)


class ManagerStills(Manager):
    """Specialize the manager for stills data using the stills pre and post
    processors."""

    def __init__(self, experiments, reflections, params):
        """Initialise the pre-processor, post-processor and manager."""

        # Ensure we have the correct type of data
        if not experiments.all_stills():
            raise RuntimeError(
                """
        An inappropriate processing algorithm may have been selected!
         Trying to perform stills processing when not all experiments
         are indicated as stills experiments.
      """
            )

        # Initialise the manager
        super(ManagerStills, self).__init__(experiments, reflections, params)


[docs]class Processor3D(Processor): """Top level processor for 3D processing.""" def __init__(self, experiments, reflections, params): """Initialise the manager and the processor.""" # Set some parameters params.shoebox.partials = False params.shoebox.flatten = False # Create the processing manager manager = ManagerRot(experiments, reflections, params) # Initialise the processor super(Processor3D, self).__init__(manager)
[docs]class ProcessorFlat3D(Processor): """Top level processor for flat 3D processing.""" def __init__(self, experiments, reflections, params): """Initialise the manager and the processor.""" # Set some parameters params.shoebox.flatten = True params.shoebox.partials = False # Create the processing manager manager = ManagerRot(experiments, reflections, params) # Initialise the processor super(ProcessorFlat3D, self).__init__(manager)
[docs]class Processor2D(Processor): """Top level processor for 2D processing.""" def __init__(self, experiments, reflections, params): """Initialise the manager and the processor.""" # Set some parameters params.shoebox.partials = True # Create the processing manager manager = ManagerRot(experiments, reflections, params) # Initialise the processor super(Processor2D, self).__init__(manager)
[docs]class ProcessorSingle2D(Processor): """Top level processor for still image processing.""" def __init__(self, experiments, reflections, params): """Initialise the manager and the processor.""" # Set some of the parameters params.block.size = 1 params.block.units = "frames" params.shoebox.partials = True params.shoebox.flatten = False # Create the processing manager manager = ManagerRot(experiments, reflections, params) # Initialise the processor super(ProcessorSingle2D, self).__init__(manager)
[docs]class ProcessorStills(Processor): """Top level processor for still image processing.""" def __init__(self, experiments, reflections, params): """Initialise the manager and the processor.""" # Set some parameters params.block.size = 1 params.block.units = "frames" params.shoebox.partials = False params.shoebox.flatten = False # Create the processing manager manager = ManagerStills(experiments, reflections, params) # Initialise the processor super(ProcessorStills, self).__init__(manager)
def build_processor(Class, experiments, reflections, params=None): """ A function to simplify building the processor :param Class: The input class :param experiments: The input experiments :param reflections: The reflections :param params: Optional input parameters """ _params = Parameters() if params is not None: _params.update(params) return Class(experiments, reflections, _params)