Source code for ewoksid16a.tasks.fluotomo.nxtomo_sinogram

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Feb 25 16:19:55 2026

@author: blissadm
"""

import os
from datetime import datetime as dt

import numpy as np
import pint
from esrf_pathlib import ESRFPath
from ewokscore import Task
from nxtomo import NXtomo
from nxtomo.nxobject.nxdetector import ImageKey
from silx.io import h5py_utils
from silx.io import url

ureg = pint.UnitRegistry()


def _tostr(val):
    if isinstance(val, str):
        return val
    elif isinstance(val, bytes):
        return val.decode()
    else:
        return str(val)


DEFAULTS = {
    "include_positioners": True,
    "scan_list": None,
    "motors_mapping": {
        "x_translation": ["isy", "spy"],
        "y_translation": ["isz", "spz"],
        "z_translation": [
            "sx",
        ],
        "rotation_angle": [
            "axis_somega",
        ],
    },
    "motors_units": {
        "isy": "nm",
        "isz": "nm",
    },
}


[docs] class FluoSinogram2Nx( Task, input_names=[ "output_root_uri", "output_root_group", ], optional_input_names=[ "include_positioners", "scan_list", "motors_mapping", "motors_units", ], output_names=["output_file", "output_root_uri", "output_root_group"], ): """ This task takes a fulltomo sequence and creates the associated NXtomo file. """
[docs] def run(self): pars = {**DEFAULTS, **self.get_input_values()} output_root_uri = url.DataUrl(pars["output_root_uri"]) output_root_group = pars["output_root_group"] motors_mapping = pars["motors_mapping"] include_positioners = pars["include_positioners"] motors_units = pars["motors_units"] esrf_path = ESRFPath(output_root_uri.file_path()) scan_no = int(output_root_uri.data_path().split("/")[1].split(".")[0]) output_path = os.path.join(esrf_path.processed_dataset_path, "projections") output_file = os.path.join( output_path, f"{esrf_path.collection}_{esrf_path.dataset}_{scan_no:04d}.nx" ) if not os.path.isdir(output_path): os.makedirs(output_path, exist_ok=True) os.chmod( output_path, 0o770 ) # nosec B103: group write required for shared pipeline egy = None print(output_root_uri) with h5py_utils.open_item( output_root_uri.file_path(), output_root_uri.data_path() ) as scan: if ( "metadata" in scan["instrument"] and "InstrumentMonochromator_energy" in scan["instrument/metadata"] ): egy = ureg.Quantity( float( _tostr( scan["instrument/metadata/InstrumentMonochromator_energy"][ () ] ) ), "keV", ) elif "InstrumentMonochromator" in scan["instrument"]: egy = ureg.Quantity( float( _tostr(scan["instrument/InstrumentMonochromator/energy"][()]) ), "keV", ) positioners = {} positioners_units = {} rays = set() count_time = float( scan["instrument/fscan_parameters/acq_time"][()].decode() ) fast_pixel_size = float( scan["instrument/fscan_parameters/fast_step_size"][()].decode() ) slow_n = int(scan["instrument/fscan_parameters/slow_npoints"][()].decode()) fast_n = int(scan["instrument/fscan_parameters/slow_npoints"][()].decode()) fast_motor_name = scan["instrument/fscan_parameters/fast_motor"][ () ].decode() slow_motor_name = scan["instrument/fscan_parameters/slow_motor"][ () ].decode() for k in scan["instrument/positioners"]: ds = scan["instrument/positioners"][k] positioners[k] = np.empty((slow_n,), dtype=np.float64) * np.nan positioners_units[k] = motors_units.get(k, ds.attrs.get("units", None)) vals = ds[()] if isinstance(vals, np.ndarray) and len(vals) == slow_n * fast_n: vals = vals[::fast_n] positioners[k][:] = vals if slow_motor_name != "somega": raise RuntimeError("Expected rotation as slow motor!") fast_pixel_size = ureg.Quantity( fast_pixel_size, positioners_units[fast_motor_name] ) # Start populating the NXtomo object nxtomo = NXtomo() nxtomo.start_time = dt.now() nxtomo.bliss_original_files = (pars["output_root_uri"],) nxtomo.energy = egy # TODO nxtomo.instrument.detector.tomo_n = slow_n # nxtomo.instrument.detector.distance = nxtomo.instrument.detector.x_pixel_size = fast_pixel_size nxtomo.instrument.detector.y_pixel_size = fast_pixel_size nxtomo.sample.x_pixel_size = fast_pixel_size nxtomo.sample.y_pixel_size = fast_pixel_size # nxtomo.instrument.source.distance = # nxtomo.sample.propagation_distance = nxtomo.sample.name = esrf_path.collection nxtomo.title = esrf_path.dataset # nxtomo.control.data = nxtomo.instrument.name = esrf_path.beamline nxtomo.instrument.detector.count_time = pint.Quantity( np.array(count_time), "s" ) nxtomo.instrument.detector.image_key_control = [ ImageKey.PROJECTION, ] * slow_n for k, mots in motors_mapping.items(): values = pint.Quantity( np.zeros((slow_n,), dtype=np.float64), "degree" if k == "rotation_angle" else "mm", ) for m in mots: # print(m, positioners_units[m]) values += pint.Quantity(positioners[m], positioners_units[m]) setattr(nxtomo.sample, k, values) regrid = scan[f"{output_root_group}/regrid/results/massfractions"] rays = set() rays.add(regrid.attrs["signal"]) for sig in regrid.attrs["auxiliary_signals"]: rays.add(sig) for elm in rays: data = regrid[elm][()] data[np.isnan(data)] = 0 data.shape = data.shape[0], 1, data.shape[1] nxtomo.instrument.detector.data = ( data * 1e-7 ) # Conversion from ng/mm2 to g/cm2 nxtomo.end_time = dt.now() nxtomo.save(output_file, f"{output_root_group}_{elm}", overwrite=True) if include_positioners: for elm in rays: with h5py_utils.open_item( output_file, f"/{output_root_group}_{elm}", mode="a" ) as fd: grp = fd.require_group("instrument/positioners") grp.attrs["NX_class"] = "NXcollection" for k, v in positioners.items(): if k in grp: del grp[k] grp[k] = v if positioners_units[k] is not None: grp[k].attrs["units"] = positioners_units[k] self.outputs.output_root_uri = pars["output_root_uri"] self.outputs.output_root_group = pars["output_root_group"] self.outputs.output_file = output_file