from configparser import ConfigParser
import numpy as np
from ewokscore import Task
from ewoksfluo.tasks import nexus_utils
from ewoksfluo.tasks.hdf5_utils import create_hdf5_link
from ewoksfluo.tasks.hdf5_utils import link_bliss_scan
from silx.io import h5py_utils
DEFAULTS = {
"counter_uri_template": "instrument/%s/data",
"output_counter_name": "photons_i0",
"max_ref": 1e15,
"min_ref": 1e8,
"gamma_coef": 2.0,
"bliss_slow_subscan": 2,
"output_root_group": None,
}
[docs]
class NormalizeCurrent(
Task,
input_names=[
"bliss_scan_uri",
"output_root_uri",
"fastscan_counter_name",
"slowscan_counter_name",
"dwelltime_counter_name",
"diode_factor",
],
optional_input_names=[
"gamma_coef",
"min_ref",
"max_ref",
"counter_uri_template",
"output_counter_name",
"bliss_slow_subscan",
"output_root_group",
],
output_names=[
"bliss_scan_uri",
"output_root_uri",
"output_root_group",
"counter_name",
],
):
[docs]
def run(self):
start_time = nexus_utils.now()
params = {**DEFAULTS, **self.get_input_values()}
bliss_scan_uri = params["bliss_scan_uri"]
bliss_slow_subscan = params["bliss_slow_subscan"]
output_root_uri = params["output_root_uri"]
counter_template = params["counter_uri_template"]
min_ref = params["min_ref"]
max_ref = params["max_ref"]
gamma_coef = params["gamma_coef"]
diode_factor = params["diode_factor"] * 1e12 # From ph/pA to ph/A
output_root_group = params["output_root_group"]
slow_counter_name = params["slowscan_counter_name"]
fast_counter_name = params["fastscan_counter_name"]
dwelltime_counter_name = params["dwelltime_counter_name"]
output_counter_name = params["output_counter_name"]
fast_scan_uris = bliss_scan_uri.split("::")
filename = fast_scan_uris[0]
fast_scanno = fast_scan_uris[1]
slow_scanno = fast_scanno.split(".")[0] + f".{bliss_slow_subscan}"
# Read data from slow scan
with h5py_utils.open_item(
filename, slow_scanno
) as slow_scan: # type: ignore[reportGeneralTypeIssues]
ref_values = np.array(slow_scan[counter_template % slow_counter_name])
# Read data from fast scan
with h5py_utils.open_item(filename, fast_scanno) as fast_scan: # type: ignore[reportGeneralTypeIssues]
# epoch = np.array(fd[ref_counter_template%self.inputs.get("data_epoch_counter", "epoch_trig")])
values = np.array(fast_scan[counter_template % fast_counter_name])
dwelltime = np.mean(fast_scan[counter_template % dwelltime_counter_name])
# Filter slow scan values
ref_msk = np.logical_and(
ref_values * diode_factor > min_ref, ref_values * diode_factor < max_ref
) # When shutter was open and not saturated
ref_values = ref_values[ref_msk]
# Compute the conversion factor
gamma = np.mean(values) / (np.mean(ref_values) * dwelltime)
real_gamma_exp = np.log10(gamma / gamma_coef)
gamma_exp = np.round(real_gamma_exp)
gamma = gamma_coef * 10**gamma_exp
if output_root_group is not None:
lvls = (fast_scanno, output_root_group, output_counter_name)
else:
lvls = (fast_scanno, output_counter_name)
# Save in process group
with nexus_utils.save_in_ewoks_process(
output_root_uri,
start_time,
process_config={
"real_gamma_exp": real_gamma_exp,
"gamma_exp": gamma_exp,
"diode_factor": diode_factor / 1e12,
"gamma_coef": gamma_coef,
},
default_levels=lvls,
) as (process_group, already_existed):
outentry = process_group
for i in range(len(lvls) - 1):
outentry = outentry.parent
if not already_existed:
link_bliss_scan(outentry, bliss_scan_uri)
nxdata = nexus_utils.create_nxdata(
process_group, "normcounter", signal="data"
)
dset = nxdata.create_dataset("data", data=values / gamma * diode_factor)
dset.attrs["interpretation"] = "spectrum"
nxdetector = outentry["instrument"].create_group(output_counter_name)
nxdetector.attrs["NX_class"] = "NXdetector"
create_hdf5_link(nxdetector, "data", dset)
create_hdf5_link(outentry["measurement"], output_counter_name, dset)
output_root_uri = f"{outentry.file.filename}::{outentry.name}"
self.outputs.bliss_scan_uri = output_root_uri
self.outputs.output_root_uri = output_root_uri
self.outputs.output_root_group = output_root_group
self.outputs.counter_name = output_counter_name
[docs]
class NormalizationFactorFromConfig(
Task,
input_names=["bliss_scan_uri", "output_root_uri", "config_file"],
optional_input_names=[
"xrf_results_uri",
"detector_name",
"output_root_group",
],
output_names=[
"bliss_scan_uri",
"detector_name",
"output_root_uri",
"xrf_results_uri",
"output_root_group",
"counter_normalization_template",
"counter_normalization_factor",
"config_file",
],
):
[docs]
def get_from_cfg(self, cfg):
cp = ConfigParser()
cp.read(cfg)
flux = cp.getfloat("concentrations", "flux")
dwelltime = cp.getfloat("concentrations", "time")
matrix = cp.get("attenuators", "Matrix")
matdat = matrix.replace(" ", "").split(",")
matrix_composition = matdat[1]
matrix_density = float(matdat[2])
matrix_thickness = float(matdat[3])
print("INFO FROM CONFIG FILE")
print("Config input flux: {0:g} (ph/s)".format(flux))
print("Config dwell time: {0} (s)".format(dwelltime))
print("Matrix composition: {0}".format(matrix_composition))
print("Matrix density: {0} (g/cm**3)".format(matrix_density))
print("Matrix thickness: {0} (cm)".format(matrix_thickness))
areal_dens_ratio = matrix_density * matrix_thickness * 1e7 # ng/mm**2
return flux, areal_dens_ratio, dwelltime
[docs]
def run(self):
self.outputs.bliss_scan_uri = self.inputs.bliss_scan_uri
self.outputs.xrf_results_uri = self.get_input_value("xrf_results_uri", None)
self.outputs.output_root_uri = self.inputs.output_root_uri
self.outputs.detector_name = self.get_input_value("detector_name", None)
self.outputs.config_file = self.inputs.config_file
self.outputs.output_root_group = self.get_input_value("output_root_group", None)
cfg_flux, areal_dens_ratio, cfg_dwelltime = self.get_from_cfg(
self.inputs.config_file
)
factor = cfg_flux * cfg_dwelltime * areal_dens_ratio
self.outputs.counter_normalization_factor = factor
self.outputs.counter_normalization_template = (
f"{factor:.04e}/<instrument/{{}}/data>"
)