from collections.abc import Iterable
import hdf5plugin # noqa: F401
import numpy as np
from ewokscore import Task
from ewoksfluo.io import hdf5
from ewoksfluo.io import output_uri
from ewoksfluo.tasks import xrf_results
from silx.io import h5py_utils
from silx.io import url
# from ewokscore import Task
# from ewoksfluo.io.hdf5 import ReadHdf5File
# from ewoksfluo.tasks import xrf_results
DEFAULTS = {"output_root_group": "weighted"}
[docs]
class WeightedSumResults(
Task,
input_names=[
"bliss_scan_uris",
"output_root_uris",
"xrf_results_uris",
"output_root_groups",
"output_root_uri",
],
optional_input_names=["output_root_group"],
output_names=[
"xrf_results_uri",
"bliss_scan_uri",
"output_root_uri",
"output_root_group",
],
):
"""Add single-scan XRF results of multiple detectors"""
def _read_xrf_results(self, uri):
params, massfr = {}, {}
uri = url.DataUrl(uri)
with h5py_utils.open_item(uri.file_path(), uri.data_path()) as results:
for group in results["massfractions"]:
params[group] = np.array(results["parameters"][group])
massfr[group] = np.array(results["massfractions"][group])
return massfr, params
def _dict_op(self, fun, a, b):
ka = set(a.keys())
kb = set(b.keys())
s = dict()
for k in set.intersection(ka, kb):
s[k] = fun(a[k], b[k])
return s
[docs]
def dict_op(self, fun, *op):
if len(op) <= 1:
return op
res = self._dict_op(fun, op[0], op[1])
for i in range(2, len(op)):
res = self._dict_op(fun, res, op[i])
return res
[docs]
def dict_plus(self, *op):
return self.dict_op(lambda a, b: a + b, *op)
[docs]
def dict_mult(self, *op):
return self.dict_op(lambda a, b: a * b, *op)
[docs]
def run(self) -> None:
params = {**DEFAULTS, **self.get_input_values()}
xrf_results_uris = params["xrf_results_uris"]
output_root_uri = params["output_root_uri"]
output_root_group = params["output_root_group"]
summed_fit = None
summed_prod = None
for xrf_results_uri in xrf_results_uris:
massfraction, parameters = self._read_xrf_results(xrf_results_uri)
if summed_fit is None:
summed_fit = parameters
else:
summed_fit = self.dict_plus(summed_fit, parameters)
if summed_prod is None:
summed_prod = self.dict_mult(massfraction, parameters)
else:
summed_prod = self.dict_plus(
summed_prod, self.dict_mult(massfraction, parameters)
)
weighted = self._dict_op(lambda a, b: a / b, summed_prod, summed_fit)
for group in weighted:
weighted[group][np.isnan(weighted[group])] = 0
config = {
"xrf_results_uris": xrf_results_uris,
"bliss_scan_uris": params["bliss_scan_uris"],
}
_, scan_h5path = hdf5.split_h5uri(output_root_uri)
output_root_uri = output_uri.compose_full_output_uri(
self.inputs.output_root_uri,
default_output_data_path=scan_h5path,
extra_data_paths=(self.inputs.output_root_group, "weighted"),
)
xrf_results.save_xrf_results(
output_root_uri,
config,
summed_fit,
None,
weighted,
)
self.outputs.bliss_scan_uri = self.inputs.output_root_uri
self.outputs.output_root_uri = self.inputs.output_root_uri
self.outputs.output_root_group = output_root_group
self.outputs.xrf_results_uri = output_root_uri + "/results"
[docs]
class UrisAggregator(
Task,
optional_input_names=[
"bliss_scan_uri_0",
"output_root_uri_0",
"output_root_group_0",
"detector_name_0",
"xrf_results_uri_0",
"bliss_scan_uri_1",
"output_root_uri_1",
"output_root_group_1",
"detector_name_1",
"xrf_results_uri_1",
],
output_names=[
"bliss_scan_uri",
"output_root_uri",
"output_root_group",
"detector_name",
"xrf_results_uri",
],
):
[docs]
def run(self):
names = (
"bliss_scan_uri",
"output_root_uri",
"detector_name",
"xrf_results_uri",
"output_root_group",
)
for n in names:
output = []
for i in range(2):
val = self.get_input_value(f"{n}_{i}", [])
if isinstance(val, str):
output += [
val,
]
elif isinstance(val, Iterable):
output += [*val]
else:
raise RuntimeError(f"{val} must be string or iterable of strings")
print(n, output)
setattr(self.outputs, f"{n}", output)