#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Feb 25 16:19:55 2026
@author: blissadm
"""
import os
from datetime import datetime as dt
import numpy as np
import pint
from esrf_pathlib import ESRFPath
from ewokscore import Task
from nxtomo import NXtomo
from nxtomo.nxobject.nxdetector import ImageKey
from silx.io import h5py_utils
ureg = pint.UnitRegistry()
def _tostr(val):
if isinstance(val, str):
return val
elif isinstance(val, bytes):
return val.decode()
else:
return str(val)
DEFAULTS = {
"include_positioners": True,
"scan_list": None,
"motors_mapping": {
"x_translation": ["isy", "spy"],
"y_translation": ["isz", "spz"],
"z_translation": [
"sx",
],
"rotation_angle": [
"somega",
],
},
"motors_units": {
"isy": "nm",
"isz": "nm",
},
"wait_scan": None,
}
[docs]
class Fluo2Nx(
Task,
input_names=[
"masterfile_path",
"output_file",
"detector_name",
],
optional_input_names=[
"include_positioners",
"scan_list",
"motors_mapping",
"motors_units",
"wait_scan",
],
output_names=["output_file", "detector_name"],
):
"""
This task takes a fulltomo sequence and creates the associated NXtomo file.
"""
[docs]
def run(self):
pars = {**DEFAULTS, **self.get_input_values()}
masterfile_path = pars["masterfile_path"]
scan_list = pars["scan_list"]
output_file = pars["output_file"]
motors_mapping = pars["motors_mapping"]
detector_name = pars["detector_name"]
include_positioners = pars["include_positioners"]
motors_units = pars["motors_units"]
wait_scan = pars["wait_scan"]
output_path = os.path.abspath(os.path.dirname(output_file))
esrf_path = ESRFPath(masterfile_path)
if not os.path.isdir(output_path):
os.makedirs(output_path, exist_ok=True)
os.chmod(
output_path, 0o770
) # nosec B103: group write required for shared pipeline
shape = None
egy = None
if wait_scan is not None:
with h5py_utils.open_item(
masterfile_path,
f"{wait_scan}.1_fluofit",
retry_invalid=True,
retry_timeout=3600,
) as fd:
pass
with h5py_utils.open_item(masterfile_path, "/") as fd:
# Look for the most represented set of shapes
shapes = {}
for k in fd:
if not k.endswith("_fluofit"):
continue
scan_no = int(k.split(".")[0])
if scan_list is not None and scan_no not in scan_list:
continue
scan = fd[k]
regrid = scan[f"{detector_name}/regrid/results/massfractions"]
for elm in regrid:
shp = regrid[elm].shape
if len(shp) == 2:
break
if shp not in shapes:
shapes[shp] = {"N": 0, "scans": []}
shapes[shp]["N"] += 1
shapes[shp]["scans"] += [
scan_no,
]
nmax = 0
for k, v in shapes.items():
if v["N"] > nmax:
shape = k
scan_list = v["scans"]
# At this point, we have a consistent list of scans with the same shape.
scan_list = sorted(scan_list)
# Get positioners
positioners = {}
positioners_units = {}
rays = set()
fast_pixel_size = None
slow_pixel_size = None
fast_motor_name = None
slow_motor_name = None
count_time = []
for i, scan_no in enumerate(scan_list):
scan = fd[f"{scan_no}.1_fluofit"]
if egy is None:
if (
"metadata" in scan["instrument"]
and "InstrumentMonochromator_energy"
in scan["instrument/metadata"]
):
egy = ureg.Quantity(
float(
_tostr(
scan[
"instrument/metadata/InstrumentMonochromator_energy"
][()]
)
),
"keV",
)
elif "InstrumentMonochromator" in scan["instrument"]:
egy = ureg.Quantity(
float(
_tostr(
scan["instrument/InstrumentMonochromator/energy"][
()
]
)
),
"keV",
)
for k in scan["instrument/positioners_start"]:
ds = scan["instrument/positioners_start"][k]
if k not in positioners:
positioners[k] = (
np.empty((len(scan_list),), dtype=np.float64) * np.nan
)
positioners_units[k] = motors_units.get(
k, ds.attrs.get("units", None)
)
positioners[k][i] = ds[()]
regrid = scan[f"{detector_name}/regrid/results/massfractions"]
for elm in regrid:
if regrid[elm].shape == shape:
rays.add(elm)
count_time += [
float(scan["instrument/fscan_parameters/acq_time"][()].decode()),
]
if fast_pixel_size is None:
fast_pixel_size = float(
scan["instrument/fscan_parameters/fast_step_size"][()].decode()
)
if slow_pixel_size is None:
slow_pixel_size = float(
scan["instrument/fscan_parameters/slow_step_size"][()].decode()
)
if fast_motor_name is None:
fast_motor_name = scan["instrument/fscan_parameters/fast_motor"][
()
].decode()
if slow_motor_name is None:
slow_motor_name = scan["instrument/fscan_parameters/slow_motor"][
()
].decode()
if fast_motor_name == "spzp":
x_pixel_size = ureg.Quantity(
slow_pixel_size, positioners_units[slow_motor_name]
)
y_pixel_size = ureg.Quantity(
fast_pixel_size, positioners_units[fast_motor_name]
)
else:
x_pixel_size = ureg.Quantity(
fast_pixel_size, positioners_units[fast_motor_name]
)
y_pixel_size = ureg.Quantity(
slow_pixel_size, positioners_units[slow_motor_name]
)
# Start populating the NXtomo object
nxtomo = NXtomo()
nxtomo.start_time = dt.now()
nxtomo.bliss_original_files = tuple(
f"{masterfile_path}::{s}.1_fluofit" for s in scan_list
)
nxtomo.energy = egy
# TODO
nxtomo.instrument.detector.tomo_n = len(scan_list)
# nxtomo.instrument.detector.distance =
nxtomo.instrument.detector.x_pixel_size = x_pixel_size
nxtomo.instrument.detector.y_pixel_size = y_pixel_size
nxtomo.sample.x_pixel_size = x_pixel_size
nxtomo.sample.y_pixel_size = y_pixel_size
# nxtomo.instrument.source.distance =
# nxtomo.sample.propagation_distance =
nxtomo.sample.name = esrf_path.collection
nxtomo.title = esrf_path.dataset
# nxtomo.control.data =
nxtomo.instrument.name = esrf_path.beamline
nxtomo.instrument.detector.count_time = pint.Quantity(
np.array(count_time), "s"
)
nxtomo.instrument.detector.image_key_control = [
ImageKey.PROJECTION,
] * len(scan_list)
for k, mots in motors_mapping.items():
values = pint.Quantity(
np.zeros((len(scan_list),), dtype=np.float64),
"degree" if k == "rotation_angle" else "mm",
)
for m in mots:
# print(m, positioners_units[m])
values += pint.Quantity(positioners[m], positioners_units[m])
setattr(nxtomo.sample, k, values)
# Read data and save
data = np.empty((len(scan_list), *shape), dtype=np.float32)
for elm in rays:
data *= np.nan
for i, scan_no in enumerate(scan_list):
scan = fd[f"{scan_no}.1_fluofit"]
regrid = scan[f"{detector_name}/regrid/results/massfractions"]
if elm in regrid:
data[i] = regrid[elm][()]
data[np.isnan(data)] = 0
nxtomo.instrument.detector.data = (
data * 1e-7
) # Conversion from ng/mm2 to g/cm2
nxtomo.end_time = dt.now()
nxtomo.save(output_file, f"{detector_name}_{elm}", overwrite=True)
print(f"{detector_name}_{elm} saved")
if include_positioners:
for elm in rays:
with h5py_utils.open_item(
output_file, f"{detector_name}_{elm}", mode="a"
) as fd:
grp = fd.require_group("instrument/positioners")
grp.attrs["NX_class"] = "NXcollection"
for k, v in positioners.items():
if k in grp:
del grp[k]
grp[k] = v
if positioners_units[k] is not None:
grp[k].attrs["units"] = positioners_units[k]
self.outputs.output_file = output_file
self.outputs.detector_name = detector_name