# import h5py
# import hdf5plugin # noqa: F401
import logging
import numpy as np
from ewokscore import Task
from ewoksfluo.io.hdf5 import ReadHdf5File
from ewoksfluo.tasks import nexus_utils
from silx.io import h5py_utils
from silx.io import url
logger = logging.getLogger(__name__)
DEFAULTS = {
"fscan_parameters": "instrument/fscan_parameters",
"instrument_data_template": "instrument/{}/data",
"vertical_motor_names": ["spz", "spzp"],
}
[docs]
class SpectralRegrid(
Task,
input_names=[
"bliss_scan_uri",
"output_root_uri",
"output_root_group",
"detector_name",
],
optional_input_names=[
"fscan_parameters",
"instrument_data_template",
"vertical_motor_names",
],
output_names=[
"bliss_scan_uri",
"output_root_uri",
"output_root_group",
"detector_name",
],
):
"""Regrid raw spectrums"""
[docs]
def run(self):
start_time = nexus_utils.now()
params = {
**DEFAULTS,
**self.get_input_values(),
}
bliss_scan_uri = url.DataUrl(params["bliss_scan_uri"])
output_root_uri = url.DataUrl(params["output_root_uri"])
output_root_group = params["output_root_group"]
detector_name = params["detector_name"]
fscan_parameters = params["fscan_parameters"]
instrument_data_template = params["instrument_data_template"]
vertical_motor_names = params["vertical_motor_names"]
logger.warning(f"{detector_name=} {bliss_scan_uri=}")
with h5py_utils.open_item(bliss_scan_uri.file_path(), bliss_scan_uri.data_path()) as scan: # type: ignore[reportGeneralTypeIssues]
fp = scan[fscan_parameters]
fastmot = fp["fast_motor"][()].decode()
slowmot = fp["slow_motor"][()].decode()
mode = fp["fast_motor_mode"][()].decode()
fast_n = int(fp["fast_npoints"][()].decode())
slow_n = int(fp["slow_npoints"][()].decode())
cnt = scan[instrument_data_template.format(detector_name)]
cnt_shape = cnt.shape
cnt_dtype = cnt.dtype
A = np.arange(fast_n * slow_n)
# A.shape = (fast_n, slow_n)
A.shape = (slow_n, fast_n)
is_vertical_scan = False
if fastmot in vertical_motor_names:
A = A.swapaxes(0, 1)
is_vertical_scan = True
if mode == "ZIGZAG":
A[1::2, :] = A[1::2, :][:, ::-1]
logger.warning(f"{detector_name=} {output_root_uri=}")
with nexus_utils.save_in_ewoks_process(
output_root_uri,
start_time,
process_config={
"is_vertical_scan": is_vertical_scan,
"mode": mode,
"fast_motor": fastmot,
"slow_motor": slowmot,
},
default_levels=(
output_root_uri.data_path(),
output_root_group,
"spectral_regrid",
),
) as (process_group, already_existed):
if not already_existed:
nxdata = nexus_utils.create_nxdata(
process_group, "gridspectrum", signal="data"
)
nxdata.attrs["interpretation"] = "image"
dset = nxdata.create_dataset(
"data",
shape=(cnt_shape[1], *A.shape),
dtype=cnt_dtype,
chunks=(1, *A.shape),
shuffle=True,
compression="gzip",
)
with ReadHdf5File(
bliss_scan_uri.file_path()
) as fd: # Maybe the file is already opened in append mode by the save_in_ewoks_process.
scan = fd[bliss_scan_uri.data_path()]
src = scan[instrument_data_template.format(detector_name)][()]
if src.shape[0] < fast_n * slow_n: # Fix in case of user abortion
_src = src
src = np.zeros((fast_n * slow_n, src.shape[1]), src.dtype)
src[: _src.shape[0]] = _src
# ds = np.empty(shape=(cnt_shape[1], *A.shape), dtype=cnt_dtype)
# for i in range(cnt_shape[0]):
# ds[(slice(None), *np.unravel_index(i, A.shape))] = src[A.flat[i]]
# Vectorized version
ds = np.moveaxis(src[A], -1, 0)
dset[:] = ds
self.outputs.bliss_scan_uri = bliss_scan_uri
self.outputs.output_root_uri = output_root_uri
self.outputs.output_root_group = output_root_group
self.outputs.detector_name = detector_name