#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 10 21:46:16 2026
@author: blissadm
"""
# import time
import numpy as np
from ewokscore import Task
# from ewoksfluo.io.hdf5 import split_h5uri
from ewoksfluo.tasks import nexus_utils
from ewoksfluo.tasks.hdf5_utils import create_hdf5_link
from PyMca5.PyMcaIO import ConfigDict
from silx.io import h5py_utils
from silx.io import url
DEFAULTS = {
"stripped_detector_name": None,
"xrf_spectra_uri_template": "instrument/{}/data",
"photon_counter_name": None,
}
[docs]
class StripBackgroundFromSumfit(
Task,
input_names=[
"bliss_scan_uri",
"output_root_uri",
"detector_name",
"config_file",
],
optional_input_names=[
"stripped_detector_name",
"xrf_spectra_uri_template",
"photon_counter_name",
],
output_names=[
"bliss_scan_uri",
"output_root_uri",
"detector_name",
"config_file",
],
):
[docs]
def run(self):
params = {**DEFAULTS, **self.get_input_values()}
bliss_scan_uri = url.DataUrl(params["bliss_scan_uri"])
output_root_uri = url.DataUrl(params["output_root_uri"])
detector_name = params["detector_name"]
config_file = params["config_file"]
stripped_detector_name = params["stripped_detector_name"]
photon_counter_name = params["photon_counter_name"]
if stripped_detector_name is None:
stripped_detector_name = f"{detector_name}_stripped"
xrf_spectra_uri_template = params["xrf_spectra_uri_template"]
# Read min and max from config
cfg = ConfigDict.ConfigDict(filelist=config_file)
xmin = cfg["fit"]["xmin"]
xmax = cfg["fit"]["xmax"]
photon_counter = None
with h5py_utils.open_item(bliss_scan_uri.file_path(), bliss_scan_uri.data_path()) as scan: # type: ignore[reportGeneralTypeIssues]
data = np.array(scan[xrf_spectra_uri_template.format(detector_name)])
bkg = np.array(scan["sumfit/{}/spectrum/background".format(detector_name)])
if photon_counter_name is not None:
photon_counter = np.array(
scan[xrf_spectra_uri_template.format(photon_counter_name)]
)
# Generate background as long as data py expending head and tail values
lbkg = np.zeros((1, data.shape[1]), dtype=np.float32)
lbkg[:, xmin : xmax + 1] = bkg
lbkg[:, :xmin] = bkg[0]
lbkg[:, xmax + 1 :] = bkg[-1]
if photon_counter is not None:
# Read photon count from config and scale the background accordingly
flux = float(cfg["concentrations"]["flux"])
time = float(cfg["concentrations"]["time"])
cfg_photons = flux * time
photon_factor = photon_counter / cfg_photons
photon_factor.shape = photon_factor.shape[0], 1
lbkg = lbkg * photon_factor
print(lbkg.shape, data.shape)
newdata = (
data - lbkg
) # maybe needed to weight according to the flux at each pixel.
with nexus_utils.save_in_ewoks_process(
output_root_uri,
nexus_utils.now(),
{},
default_levels=(
output_root_uri.data_path(),
"stripped_spectra",
stripped_detector_name,
),
) as (process_group, already_existed):
outentry = process_group.parent.parent
if "mcastripped" in process_group:
del process_group["mcastripped"]
nxdata = nexus_utils.create_nxdata(
process_group, "mcastripped", signal="data"
)
nxdata.attrs["interpretation"] = "spectrum"
if "data" in nxdata:
del nxdata["data"]
dset = nxdata.create_dataset(
"data",
data=newdata,
chunks=(1, newdata.shape[1]),
compression="gzip",
shuffle=True,
)
if not already_existed:
nxdetector = outentry["instrument"].create_group(stripped_detector_name)
nxdetector.attrs["NX_class"] = "NXdetector"
create_hdf5_link(nxdetector, "data", dset)
create_hdf5_link(outentry["measurement"], stripped_detector_name, dset)
self.outputs.output_root_uri = (
f"{output_root_uri.file_path()}::{output_root_uri.data_path()}"
)
self.outputs.bliss_scan_uri = (
f"{bliss_scan_uri.file_path()}::{bliss_scan_uri.data_path()}"
)
self.outputs.detector_name = stripped_detector_name
self.outputs.config_file = config_file