Source code for ewoksid16a.tasks.common.dataportal

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jul  6 20:10:52 2025

@author: blissadm
"""

import os

from esrf_pathlib import ESRFPath
from ewokscore import Task
from pyicat_plus.client.main import IcatClient
from silx.io.url import DataUrl


[docs] class PublishProcessed( Task, input_names=["output_root_uri", "icat_url"], optional_input_names=["bliss_scan_uri", "metadata"], output_names=["output_root_uri"], ): """Publish processed data to data portal"""
[docs] def run(self): output_root_uri = DataUrl(self.inputs.output_root_uri) bliss_scan_uri = self.get_input_value("bliss_scan_uri", None) processed_data_folder = os.path.dirname(output_root_uri.file_path()) processed_data = ESRFPath(processed_data_folder) if bliss_scan_uri is None: raw_data = processed_data.raw_dataset_file else: bliss_scan_uri = DataUrl(bliss_scan_uri) raw_data = ESRFPath(bliss_scan_uri.file_path()) metadata = self.get_input_value("metadata", {}) if "Sample_name" not in metadata: metadata["Sample_name"] = raw_data.collection client = IcatClient(metadata_urls=self.inputs["icat_url"]) client.store_processed_data( beamline=raw_data.beamline, proposal=raw_data.proposal, dataset=f"{raw_data.collection}_{raw_data.dataset}", path=processed_data_folder, metadata=metadata, raw=raw_data.raw_dataset_path, ) self.outputs.output_root_uri = ( output_root_uri.file_path() + "::" + (output_root_uri.data_path() or "/") )
def _fix_data_path(self, path): if path.startswith("/mnt/multipath-shares/data"): path = path.replace("/mnt/multipath-shares/data", "/data") return path