Source code for ewoksid16a.tasks.fluotomo.nxtomo

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Feb 25 16:19:55 2026

@author: blissadm
"""

import os
from datetime import datetime as dt

import numpy as np
import pint
from esrf_pathlib import ESRFPath
from ewokscore import Task
from nxtomo import NXtomo
from nxtomo.nxobject.nxdetector import ImageKey
from silx.io import h5py_utils

ureg = pint.UnitRegistry()


def _tostr(val):
    if isinstance(val, str):
        return val
    elif isinstance(val, bytes):
        return val.decode()
    else:
        return str(val)


DEFAULTS = {
    "include_positioners": True,
    "scan_list": None,
    "motors_mapping": {
        "x_translation": ["isy", "spy"],
        "y_translation": ["isz", "spz"],
        "z_translation": [
            "sx",
        ],
        "rotation_angle": [
            "somega",
        ],
    },
    "motors_units": {
        "isy": "nm",
        "isz": "nm",
    },
    "wait_scan": None,
}


[docs] class Fluo2Nx( Task, input_names=[ "masterfile_path", "output_file", "detector_name", ], optional_input_names=[ "include_positioners", "scan_list", "motors_mapping", "motors_units", "wait_scan", ], output_names=["output_file", "detector_name"], ): """ This task takes a fulltomo sequence and creates the associated NXtomo file. """
[docs] def run(self): pars = {**DEFAULTS, **self.get_input_values()} masterfile_path = pars["masterfile_path"] scan_list = pars["scan_list"] output_file = pars["output_file"] motors_mapping = pars["motors_mapping"] detector_name = pars["detector_name"] include_positioners = pars["include_positioners"] motors_units = pars["motors_units"] wait_scan = pars["wait_scan"] output_path = os.path.abspath(os.path.dirname(output_file)) esrf_path = ESRFPath(masterfile_path) if not os.path.isdir(output_path): os.makedirs(output_path, exist_ok=True) os.chmod( output_path, 0o770 ) # nosec B103: group write required for shared pipeline shape = None egy = None if wait_scan is not None: with h5py_utils.open_item( masterfile_path, f"{wait_scan}.1_fluofit", retry_invalid=True, retry_timeout=3600, ) as fd: pass with h5py_utils.open_item(masterfile_path, "/") as fd: # Look for the most represented set of shapes shapes = {} for k in fd: if not k.endswith("_fluofit"): continue scan_no = int(k.split(".")[0]) if scan_list is not None and scan_no not in scan_list: continue scan = fd[k] regrid = scan[f"{detector_name}/regrid/results/massfractions"] for elm in regrid: shp = regrid[elm].shape if len(shp) == 2: break if shp not in shapes: shapes[shp] = {"N": 0, "scans": []} shapes[shp]["N"] += 1 shapes[shp]["scans"] += [ scan_no, ] nmax = 0 for k, v in shapes.items(): if v["N"] > nmax: shape = k scan_list = v["scans"] # At this point, we have a consistent list of scans with the same shape. scan_list = sorted(scan_list) # Get positioners positioners = {} positioners_units = {} rays = set() fast_pixel_size = None slow_pixel_size = None fast_motor_name = None slow_motor_name = None count_time = [] for i, scan_no in enumerate(scan_list): scan = fd[f"{scan_no}.1_fluofit"] if egy is None: if ( "metadata" in scan["instrument"] and "InstrumentMonochromator_energy" in scan["instrument/metadata"] ): egy = ureg.Quantity( float( _tostr( scan[ "instrument/metadata/InstrumentMonochromator_energy" ][()] ) ), "keV", ) elif "InstrumentMonochromator" in scan["instrument"]: egy = ureg.Quantity( float( _tostr( scan["instrument/InstrumentMonochromator/energy"][ () ] ) ), "keV", ) for k in scan["instrument/positioners_start"]: ds = scan["instrument/positioners_start"][k] if k not in positioners: positioners[k] = ( np.empty((len(scan_list),), dtype=np.float64) * np.nan ) positioners_units[k] = motors_units.get( k, ds.attrs.get("units", None) ) positioners[k][i] = ds[()] regrid = scan[f"{detector_name}/regrid/results/massfractions"] for elm in regrid: if regrid[elm].shape == shape: rays.add(elm) count_time += [ float(scan["instrument/fscan_parameters/acq_time"][()].decode()), ] if fast_pixel_size is None: fast_pixel_size = float( scan["instrument/fscan_parameters/fast_step_size"][()].decode() ) if slow_pixel_size is None: slow_pixel_size = float( scan["instrument/fscan_parameters/slow_step_size"][()].decode() ) if fast_motor_name is None: fast_motor_name = scan["instrument/fscan_parameters/fast_motor"][ () ].decode() if slow_motor_name is None: slow_motor_name = scan["instrument/fscan_parameters/slow_motor"][ () ].decode() if fast_motor_name == "spzp": x_pixel_size = ureg.Quantity( slow_pixel_size, positioners_units[slow_motor_name] ) y_pixel_size = ureg.Quantity( fast_pixel_size, positioners_units[fast_motor_name] ) else: x_pixel_size = ureg.Quantity( fast_pixel_size, positioners_units[fast_motor_name] ) y_pixel_size = ureg.Quantity( slow_pixel_size, positioners_units[slow_motor_name] ) # Start populating the NXtomo object nxtomo = NXtomo() nxtomo.start_time = dt.now() nxtomo.bliss_original_files = tuple( f"{masterfile_path}::{s}.1_fluofit" for s in scan_list ) nxtomo.energy = egy # TODO nxtomo.instrument.detector.tomo_n = len(scan_list) # nxtomo.instrument.detector.distance = nxtomo.instrument.detector.x_pixel_size = x_pixel_size nxtomo.instrument.detector.y_pixel_size = y_pixel_size nxtomo.sample.x_pixel_size = x_pixel_size nxtomo.sample.y_pixel_size = y_pixel_size # nxtomo.instrument.source.distance = # nxtomo.sample.propagation_distance = nxtomo.sample.name = esrf_path.collection nxtomo.title = esrf_path.dataset # nxtomo.control.data = nxtomo.instrument.name = esrf_path.beamline nxtomo.instrument.detector.count_time = pint.Quantity( np.array(count_time), "s" ) nxtomo.instrument.detector.image_key_control = [ ImageKey.PROJECTION, ] * len(scan_list) for k, mots in motors_mapping.items(): values = pint.Quantity( np.zeros((len(scan_list),), dtype=np.float64), "degree" if k == "rotation_angle" else "mm", ) for m in mots: # print(m, positioners_units[m]) values += pint.Quantity(positioners[m], positioners_units[m]) setattr(nxtomo.sample, k, values) # Read data and save data = np.empty((len(scan_list), *shape), dtype=np.float32) for elm in rays: data *= np.nan for i, scan_no in enumerate(scan_list): scan = fd[f"{scan_no}.1_fluofit"] regrid = scan[f"{detector_name}/regrid/results/massfractions"] if elm in regrid: data[i] = regrid[elm][()] data[np.isnan(data)] = 0 nxtomo.instrument.detector.data = ( data * 1e-7 ) # Conversion from ng/mm2 to g/cm2 nxtomo.end_time = dt.now() nxtomo.save(output_file, f"{detector_name}_{elm}", overwrite=True) print(f"{detector_name}_{elm} saved") if include_positioners: for elm in rays: with h5py_utils.open_item( output_file, f"{detector_name}_{elm}", mode="a" ) as fd: grp = fd.require_group("instrument/positioners") grp.attrs["NX_class"] = "NXcollection" for k, v in positioners.items(): if k in grp: del grp[k] grp[k] = v if positioners_units[k] is not None: grp[k].attrs["units"] = positioners_units[k] self.outputs.output_file = output_file self.outputs.detector_name = detector_name