Source code for ewoksid16a.tasks.fluo.weight

from collections.abc import Iterable

import hdf5plugin  # noqa: F401
import numpy as np
from ewokscore import Task
from ewoksfluo.io import hdf5
from ewoksfluo.io import output_uri
from ewoksfluo.tasks import xrf_results
from silx.io import h5py_utils
from silx.io import url

# from ewokscore import Task


# from ewoksfluo.io.hdf5 import ReadHdf5File
# from ewoksfluo.tasks import xrf_results

DEFAULTS = {"output_root_group": "weighted"}


[docs] class WeightedSumResults( Task, input_names=[ "bliss_scan_uris", "output_root_uris", "xrf_results_uris", "output_root_groups", "output_root_uri", ], optional_input_names=["output_root_group"], output_names=[ "xrf_results_uri", "bliss_scan_uri", "output_root_uri", "output_root_group", ], ): """Add single-scan XRF results of multiple detectors""" def _read_xrf_results(self, uri): params, massfr = {}, {} uri = url.DataUrl(uri) with h5py_utils.open_item(uri.file_path(), uri.data_path()) as results: for group in results["massfractions"]: params[group] = np.array(results["parameters"][group]) massfr[group] = np.array(results["massfractions"][group]) return massfr, params def _dict_op(self, fun, a, b): ka = set(a.keys()) kb = set(b.keys()) s = dict() for k in set.intersection(ka, kb): s[k] = fun(a[k], b[k]) return s
[docs] def dict_op(self, fun, *op): if len(op) <= 1: return op res = self._dict_op(fun, op[0], op[1]) for i in range(2, len(op)): res = self._dict_op(fun, res, op[i]) return res
[docs] def dict_plus(self, *op): return self.dict_op(lambda a, b: a + b, *op)
[docs] def dict_mult(self, *op): return self.dict_op(lambda a, b: a * b, *op)
[docs] def run(self) -> None: params = {**DEFAULTS, **self.get_input_values()} xrf_results_uris = params["xrf_results_uris"] output_root_uri = params["output_root_uri"] output_root_group = params["output_root_group"] summed_fit = None summed_prod = None for xrf_results_uri in xrf_results_uris: massfraction, parameters = self._read_xrf_results(xrf_results_uri) if summed_fit is None: summed_fit = parameters else: summed_fit = self.dict_plus(summed_fit, parameters) if summed_prod is None: summed_prod = self.dict_mult(massfraction, parameters) else: summed_prod = self.dict_plus( summed_prod, self.dict_mult(massfraction, parameters) ) weighted = self._dict_op(lambda a, b: a / b, summed_prod, summed_fit) for group in weighted: weighted[group][np.isnan(weighted[group])] = 0 config = { "xrf_results_uris": xrf_results_uris, "bliss_scan_uris": params["bliss_scan_uris"], } _, scan_h5path = hdf5.split_h5uri(output_root_uri) output_root_uri = output_uri.compose_full_output_uri( self.inputs.output_root_uri, default_output_data_path=scan_h5path, extra_data_paths=(self.inputs.output_root_group, "weighted"), ) xrf_results.save_xrf_results( output_root_uri, config, summed_fit, None, weighted, ) self.outputs.bliss_scan_uri = self.inputs.output_root_uri self.outputs.output_root_uri = self.inputs.output_root_uri self.outputs.output_root_group = output_root_group self.outputs.xrf_results_uri = output_root_uri + "/results"
[docs] class UrisAggregator( Task, optional_input_names=[ "bliss_scan_uri_0", "output_root_uri_0", "output_root_group_0", "detector_name_0", "xrf_results_uri_0", "bliss_scan_uri_1", "output_root_uri_1", "output_root_group_1", "detector_name_1", "xrf_results_uri_1", ], output_names=[ "bliss_scan_uri", "output_root_uri", "output_root_group", "detector_name", "xrf_results_uri", ], ):
[docs] def run(self): names = ( "bliss_scan_uri", "output_root_uri", "detector_name", "xrf_results_uri", "output_root_group", ) for n in names: output = [] for i in range(2): val = self.get_input_value(f"{n}_{i}", []) if isinstance(val, str): output += [ val, ] elif isinstance(val, Iterable): output += [*val] else: raise RuntimeError(f"{val} must be string or iterable of strings") print(n, output) setattr(self.outputs, f"{n}", output)