#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Feb 25 16:19:55 2026
@author: blissadm
"""
import os
from datetime import datetime as dt
import numpy as np
import pint
from esrf_pathlib import ESRFPath
from ewokscore import Task
from nxtomo import NXtomo
from nxtomo.nxobject.nxdetector import ImageKey
from silx.io import h5py_utils
from silx.io import url
ureg = pint.UnitRegistry()
def _tostr(val):
if isinstance(val, str):
return val
elif isinstance(val, bytes):
return val.decode()
else:
return str(val)
DEFAULTS = {
"include_positioners": True,
"scan_list": None,
"motors_mapping": {
"x_translation": ["isy", "spy"],
"y_translation": ["isz", "spz"],
"z_translation": [
"sx",
],
"rotation_angle": [
"axis_somega",
],
},
"motors_units": {
"isy": "nm",
"isz": "nm",
},
}
[docs]
class FluoSinogram2Nx(
Task,
input_names=[
"output_root_uri",
"output_root_group",
],
optional_input_names=[
"include_positioners",
"scan_list",
"motors_mapping",
"motors_units",
],
output_names=["output_file", "output_root_uri", "output_root_group"],
):
"""
This task takes a fulltomo sequence and creates the associated NXtomo file.
"""
[docs]
def run(self):
pars = {**DEFAULTS, **self.get_input_values()}
output_root_uri = url.DataUrl(pars["output_root_uri"])
output_root_group = pars["output_root_group"]
motors_mapping = pars["motors_mapping"]
include_positioners = pars["include_positioners"]
motors_units = pars["motors_units"]
esrf_path = ESRFPath(output_root_uri.file_path())
scan_no = int(output_root_uri.data_path().split("/")[1].split(".")[0])
output_path = os.path.join(esrf_path.processed_dataset_path, "projections")
output_file = os.path.join(
output_path, f"{esrf_path.collection}_{esrf_path.dataset}_{scan_no:04d}.nx"
)
if not os.path.isdir(output_path):
os.makedirs(output_path, exist_ok=True)
os.chmod(
output_path, 0o770
) # nosec B103: group write required for shared pipeline
egy = None
print(output_root_uri)
with h5py_utils.open_item(
output_root_uri.file_path(), output_root_uri.data_path()
) as scan:
if (
"metadata" in scan["instrument"]
and "InstrumentMonochromator_energy" in scan["instrument/metadata"]
):
egy = ureg.Quantity(
float(
_tostr(
scan["instrument/metadata/InstrumentMonochromator_energy"][
()
]
)
),
"keV",
)
elif "InstrumentMonochromator" in scan["instrument"]:
egy = ureg.Quantity(
float(
_tostr(scan["instrument/InstrumentMonochromator/energy"][()])
),
"keV",
)
positioners = {}
positioners_units = {}
rays = set()
count_time = float(
scan["instrument/fscan_parameters/acq_time"][()].decode()
)
fast_pixel_size = float(
scan["instrument/fscan_parameters/fast_step_size"][()].decode()
)
slow_n = int(scan["instrument/fscan_parameters/slow_npoints"][()].decode())
fast_n = int(scan["instrument/fscan_parameters/slow_npoints"][()].decode())
fast_motor_name = scan["instrument/fscan_parameters/fast_motor"][
()
].decode()
slow_motor_name = scan["instrument/fscan_parameters/slow_motor"][
()
].decode()
for k in scan["instrument/positioners"]:
ds = scan["instrument/positioners"][k]
positioners[k] = np.empty((slow_n,), dtype=np.float64) * np.nan
positioners_units[k] = motors_units.get(k, ds.attrs.get("units", None))
vals = ds[()]
if isinstance(vals, np.ndarray) and len(vals) == slow_n * fast_n:
vals = vals[::fast_n]
positioners[k][:] = vals
if slow_motor_name != "somega":
raise RuntimeError("Expected rotation as slow motor!")
fast_pixel_size = ureg.Quantity(
fast_pixel_size, positioners_units[fast_motor_name]
)
# Start populating the NXtomo object
nxtomo = NXtomo()
nxtomo.start_time = dt.now()
nxtomo.bliss_original_files = (pars["output_root_uri"],)
nxtomo.energy = egy
# TODO
nxtomo.instrument.detector.tomo_n = slow_n
# nxtomo.instrument.detector.distance =
nxtomo.instrument.detector.x_pixel_size = fast_pixel_size
nxtomo.instrument.detector.y_pixel_size = fast_pixel_size
nxtomo.sample.x_pixel_size = fast_pixel_size
nxtomo.sample.y_pixel_size = fast_pixel_size
# nxtomo.instrument.source.distance =
# nxtomo.sample.propagation_distance =
nxtomo.sample.name = esrf_path.collection
nxtomo.title = esrf_path.dataset
# nxtomo.control.data =
nxtomo.instrument.name = esrf_path.beamline
nxtomo.instrument.detector.count_time = pint.Quantity(
np.array(count_time), "s"
)
nxtomo.instrument.detector.image_key_control = [
ImageKey.PROJECTION,
] * slow_n
for k, mots in motors_mapping.items():
values = pint.Quantity(
np.zeros((slow_n,), dtype=np.float64),
"degree" if k == "rotation_angle" else "mm",
)
for m in mots:
# print(m, positioners_units[m])
values += pint.Quantity(positioners[m], positioners_units[m])
setattr(nxtomo.sample, k, values)
regrid = scan[f"{output_root_group}/regrid/results/massfractions"]
rays = set()
rays.add(regrid.attrs["signal"])
for sig in regrid.attrs["auxiliary_signals"]:
rays.add(sig)
for elm in rays:
data = regrid[elm][()]
data[np.isnan(data)] = 0
data.shape = data.shape[0], 1, data.shape[1]
nxtomo.instrument.detector.data = (
data * 1e-7
) # Conversion from ng/mm2 to g/cm2
nxtomo.end_time = dt.now()
nxtomo.save(output_file, f"{output_root_group}_{elm}", overwrite=True)
if include_positioners:
for elm in rays:
with h5py_utils.open_item(
output_file, f"/{output_root_group}_{elm}", mode="a"
) as fd:
grp = fd.require_group("instrument/positioners")
grp.attrs["NX_class"] = "NXcollection"
for k, v in positioners.items():
if k in grp:
del grp[k]
grp[k] = v
if positioners_units[k] is not None:
grp[k].attrs["units"] = positioners_units[k]
self.outputs.output_root_uri = pars["output_root_uri"]
self.outputs.output_root_group = pars["output_root_group"]
self.outputs.output_file = output_file