Source code for ewoksid16a.tasks.fluo.sumspectrumfit

import os
import time

# import time
from threading import Lock

import matplotlib.pyplot as plt
import numpy as np
from ewokscore import Task

# from ewoksfluo.io.hdf5 import split_h5uri
from ewoksfluo.tasks import nexus_utils
from PyMca5.PyMcaIO import ConfigDict
from PyMca5.PyMcaPhysics.xrf.ClassMcaTheory import ClassMcaTheory
from PyMca5.PyMcaPhysics.xrf.ConcentrationsTool import ConcentrationsTool
from silx.io import h5py_utils
from silx.io import url

# from ewoksfluo.tasks.hdf5_utils import create_hdf5_link
# from ewoksfluo.tasks.hdf5_utils import link_bliss_scan
# from esrf_pathlib import ESRFPath

_mcaLock = Lock()

DEFAULTS = {
    "gallery_filename": None,
    "instrument_data_template": "instrument/{}/data",
    "config_file_timeout": 3600,
}


[docs] class AdvancedFitSumSingleDetector( Task, input_names=[ "bliss_scan_uri", "detector_name", "config_file", "output_root_uri", "photon_counter_name", "output_root_group", ], optional_input_names=[ "gallery_filename", "instrument_data_template", "config_file_timeout", ], output_names=[ "bliss_scan_uri", "detector_name", "output_root_uri", "config_file", "output_root_group", ], ):
[docs] def run(self): pars = {**DEFAULTS, **self.get_input_values()} bliss_scan_uri = url.DataUrl(pars["bliss_scan_uri"]) output_root_uri = url.DataUrl(pars["output_root_uri"]) detector_name = pars["detector_name"] config_file = pars["config_file"] dettmpl = pars["instrument_data_template"] gallery_filename = pars["gallery_filename"] photon_counter = pars["photon_counter_name"] config_timeout = pars["config_file_timeout"] output_root_group = pars["output_root_group"] # Wait for config file t0 = time.perf_counter() while not os.path.isfile(config_file): t1 = time.perf_counter() if t1 - t0 > config_timeout: raise TimeoutError("Timeout waiting for config file.") time.sleep(5) with h5py_utils.open_item( bliss_scan_uri.file_path(), bliss_scan_uri.data_path() ) as scan: data = np.array(scan[dettmpl.format(detector_name)], dtype=np.float64) photons = np.array(scan[dettmpl.format(photon_counter)], dtype=np.float64) avg_dwtime = float(scan["instrument/fscan_parameters/acq_time"][()]) avg_photons = np.mean(photons) avg_flux = avg_photons / avg_dwtime rel_factor = photons / avg_photons rel_factor.shape = rel_factor.shape[0], 1 ndata = min(rel_factor.shape[0], data.shape[0]) rel_factor = rel_factor[:ndata] data = data[:ndata] data = np.sum(data * rel_factor, keepdims=True, axis=0) / data.shape[0] output_config_file = os.path.join( os.path.dirname(output_root_uri.file_path()), f"{detector_name}.cfg" ) cfg = ConfigDict.ConfigDict(filelist=config_file) cfg["concentrations"]["flux"] = avg_flux cfg["concentrations"]["time"] = avg_dwtime cfg.write(output_config_file) matdat = cfg["attenuators"]["Matrix"] # print(matrix) # matdat = matrix.replace(" ", "").split(",") matrix_density = float(matdat[2]) # g/cm3 matrix_thickness = float(matdat[3]) # cm matrix_factor = matrix_density * matrix_thickness * 1e7 # ng/mm**2 with open(output_config_file, "r") as fd: config_txt = fd.read() with _mcaLock: mcafit = ClassMcaTheory(output_config_file) mcafit.setData(x=np.arange(data.shape[1]), y=data) mcafit.estimate() p, fit = mcafit.startfit(digest=1) # ct = ConcentrationsTool(mcafit.config, {"result": fit}) # conc = ct.processFitResult() # print(conc) concentrationsTool = ConcentrationsTool() # the concentrations tool has its own configuration concentrationsConfig = concentrationsTool.configure() # however, the simplest way to use it is to use the configuration # set when sending the fit configuration concentrationsConfig = fit["config"]["concentrations"] # or the one used with the fit # concentrationsConfig = mcafit.configure()["concentrations"] conc, info = concentrationsTool.processFitResult( config=concentrationsConfig, fitresult={"result": fit}, elementsfrommatrix=False, fluorates=mcafit._fluoRates, # internal speed up addinfo=True, ) # additional important info # print(p) # import yaml # with open("/tmp_14_days/chevremo/ewoksfit.yml", 'w') as fd: # yaml.dump(fit, fd) egy = fit["energy"] bkg = fit["continuum"] grps = fit["groups"] yfit = fit["yfit"] ydata = fit["ydata"] with nexus_utils.save_in_ewoks_process( output_root_uri, nexus_utils.now(), {"fit_config": config_txt, "matrix_factor": matrix_factor}, default_levels=(output_root_uri.data_path(), output_root_group, "sumfit"), ) as (process_group, already_existed): data_grp = process_group.require_group("fitted_par") data_grp.attrs["NX_class"] = "NXcollection" for k, v in zip(fit["parameters"], fit["fittedpar"]): if k in data_grp: del data_grp[k] data_grp[k] = v data_grp = process_group.require_group("sigma_par") data_grp.attrs["NX_class"] = "NXcollection" for k, v in zip(fit["parameters"], fit["sigmapar"]): if k in data_grp: del data_grp[k] data_grp[k] = v data_grp = process_group.require_group("areal_density") data_grp.attrs["NX_class"] = "NXcollection" for k, v in conc["mass fraction"].items(): if k in data_grp: del data_grp[k] data_grp[k] = v * matrix_factor data_grp[k].attrs["units"] = "ng/mm2" data_grp = process_group.require_group("fitarea") data_grp.attrs["NX_class"] = "NXcollection" for k, v in conc["fitarea"].items(): if k in data_grp: del data_grp[k] data_grp[k] = v _res = process_group.require_group("spectrum") _res.attrs["NX_class"] = "NXdata" _res.attrs["signal"] = "fit" _res.attrs["auxiliary_signals"] = [ "background", "spectrum", ] + grps _res.attrs["axes"] = ["energy"] # Remove previous result if any addnames = ("fit", "spectrum", "background", "energy", "pileup") for n in addnames: if n in _res: del _res[n] egyds = _res.create_dataset("energy", data=egy) egyds.attrs["units"] = "keV" _res.create_dataset("fit", data=yfit) _res.create_dataset("spectrum", data=ydata) _res.create_dataset("background", data=bkg) if "pileup" in fit: _res.create_dataset("pileup", data=fit["pileup"]) for g in grps: if g in _res: del _res[g] _res.create_dataset(g, data=fit[f"y{g}"]) # Set defaults process_group.attrs["default"] = "spectrum" process_group.parent.attrs["default"] = "sumfit" process_group.parent.parent.attrs["default"] = output_root_group if gallery_filename is not None: figpath = os.path.join( os.path.dirname(output_root_uri.file_path()), "gallery" ) gallery_filename = os.path.join(figpath, os.path.basename(gallery_filename)) os.makedirs(figpath, exist_ok=True) os.chmod(figpath, 0o770) # nosec B103: group write required! fig, ax = plt.subplots(figsize=(8, 5), layout="constrained") colors = plt.get_cmap("nipy_spectral")(np.linspace(0.1, 0.9, len(grps))) ax.plot(egy, ydata, "-", color="0.7", label="Data", linewidth=4) # ax.plot(egy, yfit, 'r-', label="Fit", linewidth=2) for c, g in zip(colors, sorted(grps)): ax.plot(egy, fit[f"y{g}"] + bkg, "--", label=g, linewidth=1.5, color=c) if "pileup" in fit: ax.plot( egy, fit["pileup"] + bkg, ":", color="tab:blue", label="Pileup", linewidth=1.5, ) ax.plot(egy, bkg, "b", label="Background", linewidth=2) fig.legend(loc="outside right upper") ax.set_yscale("log") ax.set_xlabel("Energy (keV)") fig.savefig(gallery_filename) self.outputs.output_root_uri = ( f"{output_root_uri.file_path()}::{output_root_uri.data_path()}" ) self.outputs.bliss_scan_uri = ( f"{bliss_scan_uri.file_path()}::{bliss_scan_uri.data_path()}" ) self.outputs.detector_name = detector_name self.outputs.config_file = config_file self.outputs.output_root_group = output_root_group