Source code for ewoksid16a.tasks.common.dataportal
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Jul 6 20:10:52 2025
@author: blissadm
"""
import os
from esrf_pathlib import ESRFPath
from ewokscore import Task
from pyicat_plus.client.main import IcatClient
from silx.io.url import DataUrl
[docs]
class PublishProcessed(
Task,
input_names=["output_root_uri", "icat_url"],
optional_input_names=["bliss_scan_uri", "metadata"],
output_names=["output_root_uri"],
):
"""Publish processed data to data portal"""
[docs]
def run(self):
output_root_uri = DataUrl(self.inputs.output_root_uri)
bliss_scan_uri = self.get_input_value("bliss_scan_uri", None)
processed_data_folder = os.path.dirname(output_root_uri.file_path())
processed_data = ESRFPath(processed_data_folder)
if bliss_scan_uri is None:
raw_data = processed_data.raw_dataset_file
else:
bliss_scan_uri = DataUrl(bliss_scan_uri)
raw_data = ESRFPath(bliss_scan_uri.file_path())
metadata = self.get_input_value("metadata", {})
if "Sample_name" not in metadata:
metadata["Sample_name"] = raw_data.collection
client = IcatClient(metadata_urls=self.inputs["icat_url"])
client.store_processed_data(
beamline=raw_data.beamline,
proposal=raw_data.proposal,
dataset=f"{raw_data.collection}_{raw_data.dataset}",
path=processed_data_folder,
metadata=metadata,
raw=raw_data.raw_dataset_path,
)
self.outputs.output_root_uri = (
output_root_uri.file_path() + "::" + (output_root_uri.data_path() or "/")
)
def _fix_data_path(self, path):
if path.startswith("/mnt/multipath-shares/data"):
path = path.replace("/mnt/multipath-shares/data", "/data")
return path