# -*- coding: utf-8 -*-
import hdf5plugin # noqa: F401
import numpy as np
from ewokscore import Task
from ewoksfluo.tasks import nexus_utils
from silx.io import h5py_utils
from silx.io import url
DEFAULTS = {
"output_root_group": None,
"fscan_parameters": "instrument/fscan_parameters",
"instrument_positioners_template": "instrument/positioners/{}",
}
[docs]
class FscanRegrid(
Task,
input_names=[
"bliss_scan_uri",
"output_root_uri",
"xrf_results_uri",
],
optional_input_names=[
"output_root_group",
"fscan_parameters",
"instrument_positioners_template",
],
output_names=[
"xrf_results_uri",
"bliss_scan_uri",
"output_root_uri",
"output_root_group",
],
):
"""Regrid XRF results from fscan parameters (No taking care of units of slow motor)."""
def _get_motor_unit(self, scan, motor, instrument_data_template):
if instrument_data_template.format(f"axis_{motor}") in scan:
ds = scan[instrument_data_template.format(f"axis_{motor}")]
# print(ds.attrs)
return ds.attrs.get("units", "")
elif instrument_data_template.format(motor) in scan:
ds = scan[instrument_data_template.format(motor)]
# print(ds.attrs)
return ds.attrs.get("units", "")
return ""
[docs]
def run(self):
start_time = nexus_utils.now()
params = {
**DEFAULTS,
**self.get_input_values(),
}
bliss_scan_uri = url.DataUrl(params["bliss_scan_uri"])
output_root_uri = url.DataUrl(params["output_root_uri"])
xrf_results_uri = url.DataUrl(params["xrf_results_uri"])
output_root_group = params["output_root_group"]
fscan_parameters = params["fscan_parameters"]
instrument_positioners_template = params["instrument_positioners_template"]
with h5py_utils.open_item(bliss_scan_uri.file_path(), bliss_scan_uri.data_path()) as scan: # type: ignore[reportGeneralTypeIssues]
fp = scan[fscan_parameters]
fastmot = fp["fast_motor"][()].decode()
slowmot = fp["slow_motor"][()].decode()
mode = fp["fast_motor_mode"][()].decode()
fast_n = int(fp["fast_npoints"][()].decode())
slow_n = int(fp["slow_npoints"][()].decode())
fast_start = float(fp["fast_start_pos"][()].decode())
slow_start = float(fp["slow_start_pos"][()].decode())
fast_step_size = float(fp["fast_step_size"][()].decode())
slow_step_size = float(fp["slow_step_size"][()].decode())
fastmotu = self._get_motor_unit(
scan, fastmot, instrument_positioners_template
)
slowmotu = self._get_motor_unit(
scan, slowmot, instrument_positioners_template
)
fast_motpos = fast_start + (np.arange(fast_n) + 0.5) * fast_step_size
slow_motpos = slow_start + (np.arange(slow_n)) * slow_step_size
# Load data
datas = {}
shape = None
dtype = None
dats = {"massfractions", "parameters", "uncertainties"}
with h5py_utils.open_item(xrf_results_uri.file_path(), xrf_results_uri.data_path()) as res: # type: ignore[reportGeneralTypeIssues]
for dd in dats:
if dd in res:
datas[dd] = {}
for k in res[dd]:
datas[dd][k] = res[dd][k][()]
if shape is None:
shape = datas[dd][k].shape
if dtype is None:
dtype = datas[dd][k].dtype
A = np.arange(fast_n * slow_n)
# A.shape = (fast_n, slow_n)
A.shape = (slow_n, fast_n)
if mode == "ZIGZAG":
A[1::2, :] = A[1::2, :][:, ::-1]
lvls = (
(
output_root_uri.data_path(),
output_root_group,
"regrid",
)
if output_root_group is not None
else (
output_root_uri.data_path(),
"regrid",
)
)
with nexus_utils.save_in_ewoks_process(
output_root_uri,
start_time,
process_config={
"mode": mode,
"fast_motor": fastmot,
"slow_motor": slowmot,
},
default_levels=lvls,
) as (process_group, already_existed):
if already_existed:
del process_group["results"]
results = process_group.require_group("results")
results.attrs["NX_class"] = "NXcollection"
for dd in datas:
groups = [k for k in datas[dd].keys()]
nxdata = nexus_utils.create_nxdata(
results,
dd,
signal=groups[0],
)
nxdata.attrs["auxiliary_signals"] = groups[1:]
nxdata.attrs["axes"] = [slowmot, fastmot]
nxdata.attrs["interpretation"] = "image"
dsfm = nxdata.create_dataset(fastmot, data=fast_motpos)
dssm = nxdata.create_dataset(slowmot, data=slow_motpos)
dsfm.attrs["long_name"] = f"{fastmot} ({fastmotu})"
dsfm.attrs["units"] = fastmotu
dssm.attrs["long_name"] = f"{slowmot} ({slowmotu})"
dssm.attrs["units"] = slowmotu
nxdata.create_dataset(f"{fastmot}_indices", data=np.int64(1))
nxdata.create_dataset(f"{slowmot}_indices", data=np.int64(0))
for k in datas[dd]:
dset = nxdata.create_dataset(k, shape=A.shape, dtype=dtype)
dset[:] = datas[dd][k][A]
self.outputs.bliss_scan_uri = params["bliss_scan_uri"]
self.outputs.xrf_results_uri = (
output_root_uri.file_path() + "::" + ("/".join(lvls)) + "/results"
)
self.outputs.output_root_uri = params["output_root_uri"]
self.outputs.output_root_group = params["output_root_group"]