Source code for ewoksid16a.tasks.fluo.spectral_regrid

# import h5py
# import hdf5plugin  # noqa: F401
import logging

import numpy as np
from ewokscore import Task
from ewoksfluo.io.hdf5 import ReadHdf5File
from ewoksfluo.tasks import nexus_utils
from silx.io import h5py_utils
from silx.io import url

logger = logging.getLogger(__name__)

DEFAULTS = {
    "fscan_parameters": "instrument/fscan_parameters",
    "instrument_data_template": "instrument/{}/data",
    "vertical_motor_names": ["spz", "spzp"],
}


[docs] class SpectralRegrid( Task, input_names=[ "bliss_scan_uri", "output_root_uri", "output_root_group", "detector_name", ], optional_input_names=[ "fscan_parameters", "instrument_data_template", "vertical_motor_names", ], output_names=[ "bliss_scan_uri", "output_root_uri", "output_root_group", "detector_name", ], ): """Regrid raw spectrums"""
[docs] def run(self): start_time = nexus_utils.now() params = { **DEFAULTS, **self.get_input_values(), } bliss_scan_uri = url.DataUrl(params["bliss_scan_uri"]) output_root_uri = url.DataUrl(params["output_root_uri"]) output_root_group = params["output_root_group"] detector_name = params["detector_name"] fscan_parameters = params["fscan_parameters"] instrument_data_template = params["instrument_data_template"] vertical_motor_names = params["vertical_motor_names"] logger.warning(f"{detector_name=} {bliss_scan_uri=}") with h5py_utils.open_item(bliss_scan_uri.file_path(), bliss_scan_uri.data_path()) as scan: # type: ignore[reportGeneralTypeIssues] fp = scan[fscan_parameters] fastmot = fp["fast_motor"][()].decode() slowmot = fp["slow_motor"][()].decode() mode = fp["fast_motor_mode"][()].decode() fast_n = int(fp["fast_npoints"][()].decode()) slow_n = int(fp["slow_npoints"][()].decode()) cnt = scan[instrument_data_template.format(detector_name)] cnt_shape = cnt.shape cnt_dtype = cnt.dtype A = np.arange(fast_n * slow_n) # A.shape = (fast_n, slow_n) A.shape = (slow_n, fast_n) is_vertical_scan = False if fastmot in vertical_motor_names: A = A.swapaxes(0, 1) is_vertical_scan = True if mode == "ZIGZAG": A[1::2, :] = A[1::2, :][:, ::-1] logger.warning(f"{detector_name=} {output_root_uri=}") with nexus_utils.save_in_ewoks_process( output_root_uri, start_time, process_config={ "is_vertical_scan": is_vertical_scan, "mode": mode, "fast_motor": fastmot, "slow_motor": slowmot, }, default_levels=( output_root_uri.data_path(), output_root_group, "spectral_regrid", ), ) as (process_group, already_existed): if not already_existed: nxdata = nexus_utils.create_nxdata( process_group, "gridspectrum", signal="data" ) nxdata.attrs["interpretation"] = "image" dset = nxdata.create_dataset( "data", shape=(cnt_shape[1], *A.shape), dtype=cnt_dtype, chunks=(1, *A.shape), shuffle=True, compression="gzip", ) with ReadHdf5File( bliss_scan_uri.file_path() ) as fd: # Maybe the file is already opened in append mode by the save_in_ewoks_process. scan = fd[bliss_scan_uri.data_path()] src = scan[instrument_data_template.format(detector_name)][()] if src.shape[0] < fast_n * slow_n: # Fix in case of user abortion _src = src src = np.zeros((fast_n * slow_n, src.shape[1]), src.dtype) src[: _src.shape[0]] = _src # ds = np.empty(shape=(cnt_shape[1], *A.shape), dtype=cnt_dtype) # for i in range(cnt_shape[0]): # ds[(slice(None), *np.unravel_index(i, A.shape))] = src[A.flat[i]] # Vectorized version ds = np.moveaxis(src[A], -1, 0) dset[:] = ds self.outputs.bliss_scan_uri = bliss_scan_uri self.outputs.output_root_uri = output_root_uri self.outputs.output_root_group = output_root_group self.outputs.detector_name = detector_name