import os
import time
# import time
from threading import Lock
import matplotlib.pyplot as plt
import numpy as np
from ewokscore import Task
# from ewoksfluo.io.hdf5 import split_h5uri
from ewoksfluo.tasks import nexus_utils
from PyMca5.PyMcaIO import ConfigDict
from PyMca5.PyMcaPhysics.xrf.ClassMcaTheory import ClassMcaTheory
from PyMca5.PyMcaPhysics.xrf.ConcentrationsTool import ConcentrationsTool
from silx.io import h5py_utils
from silx.io import url
# from ewoksfluo.tasks.hdf5_utils import create_hdf5_link
# from ewoksfluo.tasks.hdf5_utils import link_bliss_scan
# from esrf_pathlib import ESRFPath
_mcaLock = Lock()
DEFAULTS = {
"gallery_filename": None,
"instrument_data_template": "instrument/{}/data",
"config_file_timeout": 3600,
}
[docs]
class AdvancedFitSumSingleDetector(
Task,
input_names=[
"bliss_scan_uri",
"detector_name",
"config_file",
"output_root_uri",
"photon_counter_name",
"output_root_group",
],
optional_input_names=[
"gallery_filename",
"instrument_data_template",
"config_file_timeout",
],
output_names=[
"bliss_scan_uri",
"detector_name",
"output_root_uri",
"config_file",
"output_root_group",
],
):
[docs]
def run(self):
pars = {**DEFAULTS, **self.get_input_values()}
bliss_scan_uri = url.DataUrl(pars["bliss_scan_uri"])
output_root_uri = url.DataUrl(pars["output_root_uri"])
detector_name = pars["detector_name"]
config_file = pars["config_file"]
dettmpl = pars["instrument_data_template"]
gallery_filename = pars["gallery_filename"]
photon_counter = pars["photon_counter_name"]
config_timeout = pars["config_file_timeout"]
output_root_group = pars["output_root_group"]
# Wait for config file
t0 = time.perf_counter()
while not os.path.isfile(config_file):
t1 = time.perf_counter()
if t1 - t0 > config_timeout:
raise TimeoutError("Timeout waiting for config file.")
time.sleep(5)
with h5py_utils.open_item(
bliss_scan_uri.file_path(), bliss_scan_uri.data_path()
) as scan:
data = np.array(scan[dettmpl.format(detector_name)], dtype=np.float64)
photons = np.array(scan[dettmpl.format(photon_counter)], dtype=np.float64)
avg_dwtime = float(scan["instrument/fscan_parameters/acq_time"][()])
avg_photons = np.mean(photons)
avg_flux = avg_photons / avg_dwtime
rel_factor = photons / avg_photons
rel_factor.shape = rel_factor.shape[0], 1
ndata = min(rel_factor.shape[0], data.shape[0])
rel_factor = rel_factor[:ndata]
data = data[:ndata]
data = np.sum(data * rel_factor, keepdims=True, axis=0) / data.shape[0]
output_config_file = os.path.join(
os.path.dirname(output_root_uri.file_path()), f"{detector_name}.cfg"
)
cfg = ConfigDict.ConfigDict(filelist=config_file)
cfg["concentrations"]["flux"] = avg_flux
cfg["concentrations"]["time"] = avg_dwtime
cfg.write(output_config_file)
matdat = cfg["attenuators"]["Matrix"]
# print(matrix)
# matdat = matrix.replace(" ", "").split(",")
matrix_density = float(matdat[2]) # g/cm3
matrix_thickness = float(matdat[3]) # cm
matrix_factor = matrix_density * matrix_thickness * 1e7 # ng/mm**2
with open(output_config_file, "r") as fd:
config_txt = fd.read()
with _mcaLock:
mcafit = ClassMcaTheory(output_config_file)
mcafit.setData(x=np.arange(data.shape[1]), y=data)
mcafit.estimate()
p, fit = mcafit.startfit(digest=1)
# ct = ConcentrationsTool(mcafit.config, {"result": fit})
# conc = ct.processFitResult()
# print(conc)
concentrationsTool = ConcentrationsTool()
# the concentrations tool has its own configuration
concentrationsConfig = concentrationsTool.configure()
# however, the simplest way to use it is to use the configuration
# set when sending the fit configuration
concentrationsConfig = fit["config"]["concentrations"]
# or the one used with the fit
# concentrationsConfig = mcafit.configure()["concentrations"]
conc, info = concentrationsTool.processFitResult(
config=concentrationsConfig,
fitresult={"result": fit},
elementsfrommatrix=False,
fluorates=mcafit._fluoRates, # internal speed up
addinfo=True,
) # additional important info
# print(p)
# import yaml
# with open("/tmp_14_days/chevremo/ewoksfit.yml", 'w') as fd:
# yaml.dump(fit, fd)
egy = fit["energy"]
bkg = fit["continuum"]
grps = fit["groups"]
yfit = fit["yfit"]
ydata = fit["ydata"]
with nexus_utils.save_in_ewoks_process(
output_root_uri,
nexus_utils.now(),
{"fit_config": config_txt, "matrix_factor": matrix_factor},
default_levels=(output_root_uri.data_path(), output_root_group, "sumfit"),
) as (process_group, already_existed):
data_grp = process_group.require_group("fitted_par")
data_grp.attrs["NX_class"] = "NXcollection"
for k, v in zip(fit["parameters"], fit["fittedpar"]):
if k in data_grp:
del data_grp[k]
data_grp[k] = v
data_grp = process_group.require_group("sigma_par")
data_grp.attrs["NX_class"] = "NXcollection"
for k, v in zip(fit["parameters"], fit["sigmapar"]):
if k in data_grp:
del data_grp[k]
data_grp[k] = v
data_grp = process_group.require_group("areal_density")
data_grp.attrs["NX_class"] = "NXcollection"
for k, v in conc["mass fraction"].items():
if k in data_grp:
del data_grp[k]
data_grp[k] = v * matrix_factor
data_grp[k].attrs["units"] = "ng/mm2"
data_grp = process_group.require_group("fitarea")
data_grp.attrs["NX_class"] = "NXcollection"
for k, v in conc["fitarea"].items():
if k in data_grp:
del data_grp[k]
data_grp[k] = v
_res = process_group.require_group("spectrum")
_res.attrs["NX_class"] = "NXdata"
_res.attrs["signal"] = "fit"
_res.attrs["auxiliary_signals"] = [
"background",
"spectrum",
] + grps
_res.attrs["axes"] = ["energy"]
# Remove previous result if any
addnames = ("fit", "spectrum", "background", "energy", "pileup")
for n in addnames:
if n in _res:
del _res[n]
egyds = _res.create_dataset("energy", data=egy)
egyds.attrs["units"] = "keV"
_res.create_dataset("fit", data=yfit)
_res.create_dataset("spectrum", data=ydata)
_res.create_dataset("background", data=bkg)
if "pileup" in fit:
_res.create_dataset("pileup", data=fit["pileup"])
for g in grps:
if g in _res:
del _res[g]
_res.create_dataset(g, data=fit[f"y{g}"])
# Set defaults
process_group.attrs["default"] = "spectrum"
process_group.parent.attrs["default"] = "sumfit"
process_group.parent.parent.attrs["default"] = output_root_group
if gallery_filename is not None:
figpath = os.path.join(
os.path.dirname(output_root_uri.file_path()), "gallery"
)
gallery_filename = os.path.join(figpath, os.path.basename(gallery_filename))
os.makedirs(figpath, exist_ok=True)
os.chmod(figpath, 0o770) # nosec B103: group write required!
fig, ax = plt.subplots(figsize=(8, 5), layout="constrained")
colors = plt.get_cmap("nipy_spectral")(np.linspace(0.1, 0.9, len(grps)))
ax.plot(egy, ydata, "-", color="0.7", label="Data", linewidth=4)
# ax.plot(egy, yfit, 'r-', label="Fit", linewidth=2)
for c, g in zip(colors, sorted(grps)):
ax.plot(egy, fit[f"y{g}"] + bkg, "--", label=g, linewidth=1.5, color=c)
if "pileup" in fit:
ax.plot(
egy,
fit["pileup"] + bkg,
":",
color="tab:blue",
label="Pileup",
linewidth=1.5,
)
ax.plot(egy, bkg, "b", label="Background", linewidth=2)
fig.legend(loc="outside right upper")
ax.set_yscale("log")
ax.set_xlabel("Energy (keV)")
fig.savefig(gallery_filename)
self.outputs.output_root_uri = (
f"{output_root_uri.file_path()}::{output_root_uri.data_path()}"
)
self.outputs.bliss_scan_uri = (
f"{bliss_scan_uri.file_path()}::{bliss_scan_uri.data_path()}"
)
self.outputs.detector_name = detector_name
self.outputs.config_file = config_file
self.outputs.output_root_group = output_root_group