Source code for ewoksid16a.tasks.fluo.fscan_regrid

# -*- coding: utf-8 -*-


import hdf5plugin  # noqa: F401
import numpy as np
from ewokscore import Task
from ewoksfluo.tasks import nexus_utils
from silx.io import h5py_utils
from silx.io import url

DEFAULTS = {
    "output_root_group": None,
    "fscan_parameters": "instrument/fscan_parameters",
    "instrument_positioners_template": "instrument/positioners/{}",
}


[docs] class FscanRegrid( Task, input_names=[ "bliss_scan_uri", "output_root_uri", "xrf_results_uri", ], optional_input_names=[ "output_root_group", "fscan_parameters", "instrument_positioners_template", ], output_names=[ "xrf_results_uri", "bliss_scan_uri", "output_root_uri", "output_root_group", ], ): """Regrid XRF results from fscan parameters (No taking care of units of slow motor).""" def _get_motor_unit(self, scan, motor, instrument_data_template): if instrument_data_template.format(f"axis_{motor}") in scan: ds = scan[instrument_data_template.format(f"axis_{motor}")] # print(ds.attrs) return ds.attrs.get("units", "") elif instrument_data_template.format(motor) in scan: ds = scan[instrument_data_template.format(motor)] # print(ds.attrs) return ds.attrs.get("units", "") return ""
[docs] def run(self): start_time = nexus_utils.now() params = { **DEFAULTS, **self.get_input_values(), } bliss_scan_uri = url.DataUrl(params["bliss_scan_uri"]) output_root_uri = url.DataUrl(params["output_root_uri"]) xrf_results_uri = url.DataUrl(params["xrf_results_uri"]) output_root_group = params["output_root_group"] fscan_parameters = params["fscan_parameters"] instrument_positioners_template = params["instrument_positioners_template"] with h5py_utils.open_item(bliss_scan_uri.file_path(), bliss_scan_uri.data_path()) as scan: # type: ignore[reportGeneralTypeIssues] fp = scan[fscan_parameters] fastmot = fp["fast_motor"][()].decode() slowmot = fp["slow_motor"][()].decode() mode = fp["fast_motor_mode"][()].decode() fast_n = int(fp["fast_npoints"][()].decode()) slow_n = int(fp["slow_npoints"][()].decode()) fast_start = float(fp["fast_start_pos"][()].decode()) slow_start = float(fp["slow_start_pos"][()].decode()) fast_step_size = float(fp["fast_step_size"][()].decode()) slow_step_size = float(fp["slow_step_size"][()].decode()) fastmotu = self._get_motor_unit( scan, fastmot, instrument_positioners_template ) slowmotu = self._get_motor_unit( scan, slowmot, instrument_positioners_template ) fast_motpos = fast_start + (np.arange(fast_n) + 0.5) * fast_step_size slow_motpos = slow_start + (np.arange(slow_n)) * slow_step_size # Load data datas = {} shape = None dtype = None dats = {"massfractions", "parameters", "uncertainties"} with h5py_utils.open_item(xrf_results_uri.file_path(), xrf_results_uri.data_path()) as res: # type: ignore[reportGeneralTypeIssues] for dd in dats: if dd in res: datas[dd] = {} for k in res[dd]: datas[dd][k] = res[dd][k][()] if shape is None: shape = datas[dd][k].shape if dtype is None: dtype = datas[dd][k].dtype A = np.arange(fast_n * slow_n) # A.shape = (fast_n, slow_n) A.shape = (slow_n, fast_n) if mode == "ZIGZAG": A[1::2, :] = A[1::2, :][:, ::-1] lvls = ( ( output_root_uri.data_path(), output_root_group, "regrid", ) if output_root_group is not None else ( output_root_uri.data_path(), "regrid", ) ) with nexus_utils.save_in_ewoks_process( output_root_uri, start_time, process_config={ "mode": mode, "fast_motor": fastmot, "slow_motor": slowmot, }, default_levels=lvls, ) as (process_group, already_existed): if already_existed: del process_group["results"] results = process_group.require_group("results") results.attrs["NX_class"] = "NXcollection" for dd in datas: groups = [k for k in datas[dd].keys()] nxdata = nexus_utils.create_nxdata( results, dd, signal=groups[0], ) nxdata.attrs["auxiliary_signals"] = groups[1:] nxdata.attrs["axes"] = [slowmot, fastmot] nxdata.attrs["interpretation"] = "image" dsfm = nxdata.create_dataset(fastmot, data=fast_motpos) dssm = nxdata.create_dataset(slowmot, data=slow_motpos) dsfm.attrs["long_name"] = f"{fastmot} ({fastmotu})" dsfm.attrs["units"] = fastmotu dssm.attrs["long_name"] = f"{slowmot} ({slowmotu})" dssm.attrs["units"] = slowmotu nxdata.create_dataset(f"{fastmot}_indices", data=np.int64(1)) nxdata.create_dataset(f"{slowmot}_indices", data=np.int64(0)) for k in datas[dd]: dset = nxdata.create_dataset(k, shape=A.shape, dtype=dtype) dset[:] = datas[dd][k][A] self.outputs.bliss_scan_uri = params["bliss_scan_uri"] self.outputs.xrf_results_uri = ( output_root_uri.file_path() + "::" + ("/".join(lvls)) + "/results" ) self.outputs.output_root_uri = params["output_root_uri"] self.outputs.output_root_group = params["output_root_group"]