Content-Length: 623692 | pFad | http://github.com/crim-ca/weaver/commit/2a5a3eeeb167a176171b666d6576bc19ebe112cd

E6 [wip] add CWL PROV support - basic only (no extra weaver-specific tra… · crim-ca/weaver@2a5a3ee · GitHub
Skip to content

Commit

Permalink
[wip] add CWL PROV support - basic only (no extra weaver-specific tra…
Browse files Browse the repository at this point in the history
…cking) (relates to #673)
  • Loading branch information
fmigneault committed Dec 6, 2024
1 parent b2dbe38 commit 2a5a3ee
Showing 8 changed files with 275 additions and 15 deletions.
5 changes: 5 additions & 0 deletions config/weaver.ini.example
Original file line number Diff line number Diff line change
@@ -100,6 +100,11 @@ weaver.cwl_egid =
weaver.cwl_processes_dir =
weaver.cwl_processes_register_error = false

# provenance functionality
# if disabled, provenance details will not be collected when running Application Packages and Workflows
# if disabled, the '/jobs/{jobId}/prov' endpoint will always report missing information since unavailable
weaver.cwl_prov = true

# --- Weaver WPS settings ---
weaver.wps = true
weaver.wps_url =
16 changes: 16 additions & 0 deletions docs/source/processes.rst
Original file line number Diff line number Diff line change
@@ -2424,6 +2424,22 @@ Job Provenance
implement ``GET /jobs/{jobID}/run`` and/or ``GET /jobs/{jobID}/prov``
(see https://github.com/crim-ca/weaver/issues/673)


Configure ``PROV`` runtime options.

Provenance is information about entities, activities, and people involved in producing a
piece of data or thing, which can be used to form assessments about its quality, reliability or trustworthiness.

.. seealso::
- https://www.w3.org/TR/prov-overview/
- https://cwltool.readthedocs.io/en/latest/CWLProv.html
- https://docs.ogc.org/DRAFTS/24-051.html#_requirements_class_provenance

.. |prov-o-resources| image:: https://www.w3.org/TR/2013/REC-prov-o-20130430/diagrams/starting-points.svg
:alt: |prov-ontology| Resources
:target: `prov-ontology`_


.. _proc_op_job_stats:

Job Statistics
2 changes: 2 additions & 0 deletions docs/source/references.rst
Original file line number Diff line number Diff line change
@@ -172,6 +172,8 @@
.. _openeo-api: https://openeo.org/documentation/1.0/developers/api/reference.html
.. |OpenAPI-spec| replace:: OpenAPI Specification
.. _OpenAPI-spec: https://spec.openapis.org/oas/v3.1.0
.. |prov-ontology| replace:: PROV-O: The PROV Ontology
.. _prov-ontology: https://www.w3.org/TR/2013/REC-prov-o-20130430/
.. |pywps| replace:: PyWPS
.. _pywps: https://github.com/geopython/pywps/
.. |pywps-status| replace:: Progress and Status Report
53 changes: 44 additions & 9 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@
)
from weaver.visibility import Visibility
from weaver.warning import NonBreakingExceptionWarning, UnsupportedOperationWarning
from weaver.wps.utils import get_wps_client, get_wps_url
from weaver.wps.utils import get_wps_client, get_wps_url, get_wps_output_dir
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.utils import get_wps_restapi_base_url

@@ -94,6 +94,7 @@
AnyExecuteReturnPreference,
AnyExecuteTransmissionMode
)
from weaver.formats import AnyContentType
from weaver.processes.constants import ProcessSchemaType
from weaver.processes.types import AnyProcessType
from weaver.quotation.status import AnyQuoteStatus
@@ -1403,18 +1404,20 @@ def response(self, response):
response = xml_util.tostring(response)
self["response"] = response

def _job_url(self, base_url):
# type: (str) -> str
def process_url(self, container=None):
# type: (Optional[AnySettingsContainer], Optional[str]) -> str
settings = get_settings(container)
base_url = get_wps_restapi_base_url(settings)
if self.service is not None:
base_url += sd.provider_service.path.format(provider_id=self.service)
job_path = sd.process_job_service.path.format(process_id=self.process, job_id=self.id)
return base_url + job_path
proc_url = sd.process_service.path.format(process_id=self.process)
return base_url + proc_url

def job_url(self, container=None, extra_path=None):
# type: (Optional[AnySettingsContainer], Optional[str]) -> str
settings = get_settings(container)
base_url = get_wps_restapi_base_url(settings)
return self._job_url(base_url) + (extra_path or "")
proc_url = self.process_url(container)
job_url = sd.jobs_service.path.format(job_id=self.id)
return proc_url + job_url + (extra_path or "")

def status_url(self, container=None):
# type: (Optional[AnySettingsContainer]) -> str
@@ -1466,6 +1469,36 @@ def result_path(self, job_id=None, output_id=None, file_name=None):
result_job_path = os.path.join(result_job_path, file_name)
return result_job_path

def prov_url(self, container=None, extra_path=None):
# type: (Optional[AnySettingsContainer], Optional[str]) -> str
extra_path = "/prov" + (extra_path or "")
return self.job_url(container=container, extra_path=extra_path)

def prov_path(self, container=None, extra_path=None, prov_format=None):
# type: (Optional[AnySettingsContainer], Optional[str], AnyContentType) -> str
"""
Obtain the relative path of the ``PROV`` contents.
"""
job_path = self.result_path()
prov_path = f"{job_path}-prov"
_prov_path_mapping = {
(None, None): prov_path, # the directory itself with all metadata
("/prov", None): f"{prov_path}/metadata/provenance/primary.cwlprov.json",
("/prov", ContentType.APP_JSON): f"{prov_path}/metadata/provenance/primary.cwlprov.json",
("/prov", ContentType.APP_JSON_LD): f"{prov_path}/metadata/provenance/primary.cwlprov.jsonld",
("/prov", ContentType.APP_XML): f"{prov_path}/metadata/provenance/primary.cwlprov.xml",
("/prov", ContentType.TEXT_XML): f"{prov_path}/metadata/provenance/primary.cwlprov.xml",
("/prov", ContentType.TEXT_PROVN): f"{prov_path}/metadata/provenance/primary.cwlprov.provn",
("/prov", ContentType.TEXT_TURTLE): f"{prov_path}/metadata/provenance/primary.cwlprov.ttl",
("/prov", ContentType.APP_NT): f"{prov_path}/metadata/provenance/primary.cwlprov.nt",
}
key = (extra_path, prov_format)
resolved_path = _prov_path_mapping.get(key)
if resolved_path:
out_dir = get_wps_output_dir(container)
return os.path.join(out_dir, resolved_path)
return resolved_path

def links(self, container=None, self_link=None):
# type: (Optional[AnySettingsContainer], Optional[str]) -> List[Link]
"""
@@ -1480,7 +1513,7 @@ def links(self, container=None, self_link=None):
settings = get_settings(container)
html_on = settings.get("weaver.wps_restapi_html", True)
base_url = get_wps_restapi_base_url(settings)
job_url = self._job_url(base_url) # full URL
job_url = self.job_url(settings) # full URL
job_path = base_url + sd.job_service.path.format(job_id=self.id)
job_exec = f"{job_url.rsplit('/', 1)[0]}/execution"
job_list = base_url + sd.jobs_service.path
@@ -1519,6 +1552,8 @@ def links(self, container=None, self_link=None):
"title": "Job results of successful process execution (direct output values mapping)."},
{"href": f"{job_url}/statistics", "rel": "statistics", # unofficial
"title": "Job statistics collected following process execution."},
{"href": f"{job_url}/prov", "rel": "provenance", # unofficial
"title": "Job provenance collected following process execution."},
])
else:
job_links.append({
4 changes: 4 additions & 0 deletions weaver/formats.py
Original file line number Diff line number Diff line change
@@ -96,11 +96,13 @@ class ContentType(Constants):
APP_GZIP = "application/gzip"
APP_HDF5 = "application/x-hdf5"
APP_JSON = "application/json"
APP_JSON_LD = "application/ld+json"
APP_RAW_JSON = "application/raw+json"
APP_OAS_JSON = "application/vnd.oai.openapi+json; version=3.0"
APP_OGC_PKG_JSON = "application/ogcapppkg+json"
APP_OGC_PKG_YAML = "application/ogcapppkg+yaml"
APP_NETCDF = "application/x-netcdf"
APP_NT = "application/n-triples"
APP_OCTET_STREAM = "application/octet-stream"
APP_PDF = "application/pdf"
APP_TAR = "application/x-tar" # map to existing gzip for CWL
@@ -125,6 +127,8 @@ class ContentType(Constants):
TEXT_PLAIN = "text/plain"
TEXT_RICHTEXT = "text/richtext"
TEXT_XML = "text/xml"
TEXT_PROVN = "text/provenance-notation"
TEXT_TURTLE = "text/turtle"
VIDEO_MPEG = "video/mpeg"

# special handling
151 changes: 146 additions & 5 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
- `WPS-REST schemas <https://github.com/opengeospatial/wps-rest-binding>`_
- :mod:`weaver.wps_restapi.api` conformance details
"""
import hashlib

import copy
import json
@@ -22,18 +23,24 @@
import time
import uuid
from typing import TYPE_CHECKING, cast, overload
from urllib.parse import parse_qsl, urlparse
from urllib.parse import parse_qsl

import colander
import cwltool
import cwltool.docker
import cwltool.process
import yaml
import prov.constants
from cwltool.context import LoadingContext, RuntimeContext
from cwltool.cwlprov import provenance_constants as cwl_prov_const
from cwltool.cwlprov.ro import ResearchObject
from cwltool.cwlprov.writablebagfile import close_ro
from cwltool.factory import Factory as CWLFactory, WorkflowStatus as CWLException
from cwltool.process import shortname, use_custom_schema
from cwltool.secrets import SecretStore
from cwltool.stdfsaccess import StdFsAccess
from pyramid.httpexceptions import HTTPOk, HTTPServiceUnavailable
from pyramid.settings import asbool
from pywps import Process
from pywps.inout.basic import SOURCE_TYPE, DataHandler, FileHandler, IOHandler, NoneIOHandler
from pywps.inout.formats import Format
@@ -46,7 +53,9 @@
from pywps.validator.base import emptyvalidator
from pywps.validator.mode import MODE
from requests.structures import CaseInsensitiveDict
from urllib.parse import urlparse

from weaver.__meta__ import __version__ as weaver_version
from weaver.compat import cache
from weaver.config import WeaverConfiguration, WeaverFeature, get_weaver_configuration
from weaver.database import get_db
@@ -150,6 +159,7 @@
get_sane_name,
get_secure_directory_name,
get_settings,
get_weaver_url,
list_directory_recursive,
null,
open_module_resource_file,
@@ -1811,6 +1821,130 @@ def setup_runtime(self):
}
return runtime_params

def setup_provenance(self, loading_context, runtime_context):
# type: (LoadingContext, RuntimeContext) -> None
"""
Configure ``PROV`` runtime options.
.. seealso::
- https://www.w3.org/TR/prov-overview/
- https://cwltool.readthedocs.io/en/latest/CWLProv.html
- https://docs.ogc.org/DRAFTS/24-051.html#_requirements_class_provenance
"""
weaver_cwl_prov = asbool(self.settings.get("weaver.cwl_prov", True))
if not weaver_cwl_prov:
return

loading_context.user_provenance = True
loading_context.host_provenance = True

fs = runtime_context.make_fs_access or StdFsAccess
if not runtime_context.research_obj:
ro = ResearchObject(
fs(""),
temp_prefix_ro=runtime_context.tmpdir_prefix,
orcid=runtime_context.orcid,
full_name=runtime_context.cwl_full_name,
)

# rewrite auto-initialized random UUIDs with Weaver-specific references
ro.ro_uuid = self.job.uuid
ro.base_uri = f"arcp://uuid,{ro.ro_uuid}/"

loading_context.research_obj = ro
runtime_context.research_obj = ro

def finalize_provenance(self, runtime_context):
# type: (RuntimeContext) -> None
if runtime_context.research_obj:
ro = runtime_context.research_obj
prov_obj = runtime_context.prov_obj

# FIXME: all in try/except fails because 'prov_obj' is unset
# (operation already performed before we reach here! - find a way to hook ourselves during the operation)
# the actual creation of 'cwltool.cwlprov.provenance_profile.ProvenanceProfile'
# happens within one of the 'cwltool.executors.JobExecutor', which ends up
# calling 'process.parent_wf.finalize_prov_profile' directly before the end
# of 'cwltool.executors.JobExecutor.execute', which in turns generates all the PROV files
try:
prov_obj.document.add_namespace("doi", "https://doi.org/")
sha1_ns = prov_obj.document._namespaces.get_namespace("sha1")

crim_name = "Computer Research Institute of Montréal"
crim_entity = prov_obj.document.entity(
"_:crim",
{
prov.constants.PROV_TYPE: prov.constants.PROV["Organization"],
"foaf:name": crim_name,
"schema:name": crim_name,
}
)

weaver_url = get_weaver_url(self.settings)
weaver_sha1 = hashlib.sha1(weaver_url)
weaver_agent = prov_obj.document.agent(
sha1_ns.qname(weaver_sha1),
{
prov.constants.PROV_TYPE: prov.constants.PROV["SoftwareAgent"],
prov.constants.PROV_LOCATION: weaver_url,
prov.constants.PROV_LABEL: f"crim-ca/weaver {weaver_version}",
# "prov:qualifiedPrimarySource":
# "prov:Organization": "Computer Research Institute of Montréal (CRIM).",
# "foaf:Project": "https://github.com/crim-ca/weaver",
# "doi": "10.5281/zenodo.14210717" # see CITATION.cff
}
)

# cross-ref: https://wf4ever.github.io/ro/wfprov.owl
job_entity = prov_obj.document.entity(
self.job.uuid,
{
prov.constants.PROV_TYPE: cwl_prov_const.WFDESC["ProcessRun"],
prov.constants.PROV_LOCATION: self.job.job_url(self.settings),
prov.constants.PROV_LABEL: "Job Information",
}
)
proc_entity = prov_obj.document.entity(
self.job.uuid,
{
prov.constants.PROV_TYPE: cwl_prov_const.WFDESC["Process"],
prov.constants.PROV_LOCATION: self.job.process_url(self.settings),
prov.constants.PROV_LABEL: "Process Description",
}
)

cwl_agent = prov_obj.document.get_record(cwl_prov_const.ACCOUNT_UUID) # cwltool
usr_agent = prov_obj.document.get_record(cwl_prov_const.USER_UUID) # pseudo-user (machine user)
wf_agent = prov_obj.document.get_record(ro.engine_uuid) # current job run aligned with cwl workflow

# FIXME: patch override of 'host_provenance' since access through RO it is not possible
# (private function in cwltool.cwlprov.provenance_profile.ProvenanceProfile.generate_prov_doc
# cwl_agent.extend()
# document.agent(
# ACCOUNT_UUID,
# {
# PROV_TYPE: FOAF["OnlineAccount"],
# "prov:location": hostname,
# CWLPROV["hostname"]: hostname,
# },
# )

# define relationships
prov_obj.document.actedOnBehalfOf(weaver_agent, usr_agent)
prov_obj.document.specializationOf(weaver_agent, cwl_agent)
prov_obj.document.attribution(crim_entity, weaver_agent)
prov_obj.document.wasDerivedFrom(cwl_agent, weaver_agent)
# prov_obj.document.wasStartedBy(job_agent, weaver_agent)
prov_obj.document.wasStartedBy(wf_agent, job_entity, time=self.job.created)
# prov_obj.document.specializationOf(wf_agent, job_entity)
# prov_obj.document.alternateOf(wf_agent, job_entity)
except:
pass

# sign-off and persist completed PROV
prov_dir = self.job.prov_path(self.settings)
close_ro(ro, prov_dir)

def update_requirements(self):
# type: () -> None
"""
@@ -2113,13 +2247,10 @@ def _handler(self, request, response):
elif config == WeaverConfiguration.HYBRID:
self.remote_execution = problem_needs_remote is not None

loading_context = LoadingContext()
if self.remote_execution:
# EMS/Hybrid dispatch the execution to ADES or remote WPS
loading_context = LoadingContext()
loading_context.construct_tool_object = self.make_tool
else:
# ADES/Hybrid execute the CWL/AppPackage locally
loading_context = None

self.update_effective_user()
self.update_requirements()
@@ -2132,6 +2263,7 @@ def _handler(self, request, response):
)
runtime_context = RuntimeContext(kwargs=runtime_params)
runtime_context.secret_store = SecretStore() # pre-allocate to reuse the same references as needed
self.setup_provenance(loading_context, runtime_context)
try:
self.step_launched = []
package_inst, _, self.step_packages = _load_package_content(self.package,
@@ -2203,6 +2335,15 @@ def _handler(self, request, response):
self.update_status("Generate package outputs done.", PACKAGE_PROGRESS_PREP_OUT, Status.RUNNING)
except Exception as exc:
raise self.exception_message(PackageExecutionError, exc, "Failed to save package outputs.")
try:
self.finalize_provenance(runtime_context)
except Exception as exc:
self.exception_message(
PackageExecutionError,
exc,
"Failed to save package PROV metadata. Ignoring error to avoid failing execution.",
level=logging.WARN,
)
except Exception:
# return log file location by status message since outputs are not obtained by WPS failed process
log_url = f"{get_wps_output_url(self.settings)}/{self.uuid}.log"
Loading
Oops, something went wrong.

0 comments on commit 2a5a3ee

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/crim-ca/weaver/commit/2a5a3eeeb167a176171b666d6576bc19ebe112cd

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy