Source code for ewoksid16a.tasks.fluo.strip_background

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 10 21:46:16 2026

@author: blissadm
"""

# import time

import numpy as np
from ewokscore import Task

# from ewoksfluo.io.hdf5 import split_h5uri
from ewoksfluo.tasks import nexus_utils
from ewoksfluo.tasks.hdf5_utils import create_hdf5_link
from PyMca5.PyMcaIO import ConfigDict
from silx.io import h5py_utils
from silx.io import url

DEFAULTS = {
    "stripped_detector_name": None,
    "xrf_spectra_uri_template": "instrument/{}/data",
    "photon_counter_name": None,
}


[docs] class StripBackgroundFromSumfit( Task, input_names=[ "bliss_scan_uri", "output_root_uri", "detector_name", "config_file", ], optional_input_names=[ "stripped_detector_name", "xrf_spectra_uri_template", "photon_counter_name", ], output_names=[ "bliss_scan_uri", "output_root_uri", "detector_name", "config_file", ], ):
[docs] def run(self): params = {**DEFAULTS, **self.get_input_values()} bliss_scan_uri = url.DataUrl(params["bliss_scan_uri"]) output_root_uri = url.DataUrl(params["output_root_uri"]) detector_name = params["detector_name"] config_file = params["config_file"] stripped_detector_name = params["stripped_detector_name"] photon_counter_name = params["photon_counter_name"] if stripped_detector_name is None: stripped_detector_name = f"{detector_name}_stripped" xrf_spectra_uri_template = params["xrf_spectra_uri_template"] # Read min and max from config cfg = ConfigDict.ConfigDict(filelist=config_file) xmin = cfg["fit"]["xmin"] xmax = cfg["fit"]["xmax"] photon_counter = None with h5py_utils.open_item(bliss_scan_uri.file_path(), bliss_scan_uri.data_path()) as scan: # type: ignore[reportGeneralTypeIssues] data = np.array(scan[xrf_spectra_uri_template.format(detector_name)]) bkg = np.array(scan["sumfit/{}/spectrum/background".format(detector_name)]) if photon_counter_name is not None: photon_counter = np.array( scan[xrf_spectra_uri_template.format(photon_counter_name)] ) # Generate background as long as data py expending head and tail values lbkg = np.zeros((1, data.shape[1]), dtype=np.float32) lbkg[:, xmin : xmax + 1] = bkg lbkg[:, :xmin] = bkg[0] lbkg[:, xmax + 1 :] = bkg[-1] if photon_counter is not None: # Read photon count from config and scale the background accordingly flux = float(cfg["concentrations"]["flux"]) time = float(cfg["concentrations"]["time"]) cfg_photons = flux * time photon_factor = photon_counter / cfg_photons photon_factor.shape = photon_factor.shape[0], 1 lbkg = lbkg * photon_factor print(lbkg.shape, data.shape) newdata = ( data - lbkg ) # maybe needed to weight according to the flux at each pixel. with nexus_utils.save_in_ewoks_process( output_root_uri, nexus_utils.now(), {}, default_levels=( output_root_uri.data_path(), "stripped_spectra", stripped_detector_name, ), ) as (process_group, already_existed): outentry = process_group.parent.parent if "mcastripped" in process_group: del process_group["mcastripped"] nxdata = nexus_utils.create_nxdata( process_group, "mcastripped", signal="data" ) nxdata.attrs["interpretation"] = "spectrum" if "data" in nxdata: del nxdata["data"] dset = nxdata.create_dataset( "data", data=newdata, chunks=(1, newdata.shape[1]), compression="gzip", shuffle=True, ) if not already_existed: nxdetector = outentry["instrument"].create_group(stripped_detector_name) nxdetector.attrs["NX_class"] = "NXdetector" create_hdf5_link(nxdetector, "data", dset) create_hdf5_link(outentry["measurement"], stripped_detector_name, dset) self.outputs.output_root_uri = ( f"{output_root_uri.file_path()}::{output_root_uri.data_path()}" ) self.outputs.bliss_scan_uri = ( f"{bliss_scan_uri.file_path()}::{bliss_scan_uri.data_path()}" ) self.outputs.detector_name = stripped_detector_name self.outputs.config_file = config_file