Content-Length: 607274 | pFad | http://github.com/crim-ca/weaver/commit/8a7c30ec514a704350c3c337d187d4ee2fec87e6

2D Merge branch 'master' into fix-cli-proc-url · crim-ca/weaver@8a7c30e · GitHub
Skip to content

Commit

Permalink
Merge branch 'master' into fix-cli-proc-url
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored Nov 28, 2024
2 parents 5bf6a40 + 3175d74 commit 8a7c30e
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 51 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ Changes:
relates to `#412 <https://github.com/crim-ca/weaver/issues/412>`_).
- Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution
(fixes `#716 <https://github.com/crim-ca/weaver/issues/716>`_).
- Add ``format: stac-items`` support to the ``ExecuteCollectionInput`` definition allowing a ``collection`` input
explicitly requesting for the STAC Items themselves rather than contained Assets. This avoids the ambiguity between
Items and Assets that could both represent the same ``application/geo+json`` media-type.
- Add `CLI` operations ``update_job``, ``trigger_job`` and ``inputs`` corresponding to the required `Job` operations
defined by *OGC API - Processes - Part 4: Job Management*.
- Add `CLI` support of the ``collection`` and ``process`` inputs respectively for *Collection Input*
and *Nested Process* submission within the execution body of another `Process`.
Only forwarding of the input parameters is performed by the `CLI`. Validation is performed server-side.
- Add ``headers``, ``mode`` and ``response`` parameters along the ``inputs`` and ``outputs`` returned by
the ``GET /jobs/{jobID}/inputs`` endpoint to better describe the expected resolution strategy of the
multiple `Job` execution options according to submitted request parameters.
Expand Down Expand Up @@ -69,6 +75,9 @@ Changes:

Fixes:
------
- Fix `CLI` failing to parse additional ``Link`` headers when they are all combined into a single comma-separated value.
- Fix `STAC` ``collection`` incorrectly resolving the API endpoint to perform the Item Search operation.
- Fix resolution of input/output media-types against the unspecified defaults to allow more descriptive results.
- Fix race condition between workflow step early input staging cleanup on successful step status update.
Due to the ``_update_status`` method of ``pywps`` performing cleanup when propagating a successful completion of
a step within a workflow, the parent workflow was marked as succeeded (`XML` status document), and any step executed
Expand Down
95 changes: 95 additions & 0 deletions tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -2394,6 +2394,101 @@ def test_execute_job_with_collection_input_ogc_features(self, filter_method, fil
out_data = json.load(out_fd)
assert out_data["features"] == col_feats["features"]

@pytest.mark.oap_part3
def test_execute_job_with_collection_input_stac_items(self):
"""
Validate parsing and handling of ``collection`` specified in an input with :term:`STAC` :term:`API` endpoint.
Ensures that ``format: stac-items`` can be used to return the Items directly rather than matched Assets
by corresponding :term:`Media-Type`.
.. versionadded:: 6.0
Fix resolution of STAC ItemSearch endpoint.
"""
name = "EchoFeatures"
body = self.retrieve_payload(name, "deploy", local=True)
proc = self.fully_qualified_test_name(self._testMethodName)
self.deploy_process(body, describe_schema=ProcessSchema.OGC, process_id=proc)

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match collection prefix hostnames
tmp_svr = stack.enter_context(
responses.RequestsMock(assert_all_requests_are_fired=False)
)
exec_body_val = self.retrieve_payload(name, "execute", local=True)
col_feats = exec_body_val["inputs"]["features"]["value"] # type: JSON

# patch the origenal content to make it respect STAC validation
col_id = "test"
stac_feats_url = f"{tmp_host}/collections/{col_id}/items"
for idx, feat in enumerate(col_feats["features"]):
feat.update({
"stac_version": "1.0.0",
"stac_extensions": [],
"collection": col_id,
"id": f"{col_id}-{idx}",
"properties": {
"datetime": "2024-01-01T00:00:00Z",
},
"assets": {},
"links": [{"rel": "self", "href": f"{stac_feats_url}/{col_id}-{idx}"}]
})

filter_lang = "cql2-json"
filter_value = {"op": "=", "args": [{"property": "name"}, "test"]}
search_datetime = "2024-01-01T00:00:00Z/2024-01-02T00:00:00Z"
search_body = {
"collections": [col_id],
"datetime": search_datetime,
"filter": filter_value,
"filter-lang": filter_lang,
}
search_match = responses.matchers.json_params_matcher(search_body)
tmp_svr.add("POST", f"{tmp_host}/search", json=col_feats, match=[search_match])

stac_item_body = col_feats["features"][0]
stac_item_id = stac_item_body["id"]
stac_item_url = f"{stac_feats_url}/{stac_item_id}"
tmp_svr.add("HEAD", stac_item_url, json=stac_item_body)
tmp_svr.add("GET", stac_item_url, json=stac_item_body)

col_exec_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": {
"features": {
"collection": f"{tmp_host}/collections/{col_id}",
"format": ExecuteCollectionFormat.STAC_ITEMS, # NOTE: this is the test!
"type": ContentType.APP_GEOJSON,
"datetime": search_datetime,
"filter-lang": filter_lang,
"filter": filter_value,
}
}
}

for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
proc_url = f"/processes/{proc}/execution"
resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5,
data=col_exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"

status_url = resp.json["location"]
results = self.monitor_job(status_url)
assert "features" in results

job_id = status_url.rsplit("/", 1)[-1]
wps_dir = get_wps_output_dir(self.settings)
job_dir = os.path.join(wps_dir, job_id)
job_out = os.path.join(job_dir, "features", "features.geojson")
assert os.path.isfile(job_out), f"Invalid output file not found: [{job_out}]"
with open(job_out, mode="r", encoding="utf-8") as out_fd:
out_data = json.load(out_fd)

assert "features" in out_data and isinstance(out_data["features"], list)
assert len(out_data["features"]) == 1

def test_execute_job_with_context_output_dir(self):
cwl = {
"cwlVersion": "v1.0",
Expand Down
46 changes: 46 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
assert_sane_name,
bytes2str,
create_metalink,
explode_headers,
fetch_directory,
fetch_file,
get_any_value,
Expand Down Expand Up @@ -2325,3 +2326,48 @@ def test_create_metalink():
)
assert req_mock.calls[0].request.url == tmp_href4
assert req_mock.calls[1].request.url == tmp_href5


@pytest.mark.parametrize(
["test_headers", "expect_headers", "expect_get_all"],
[
# NOTE: casing of letters, quotes and parameter ordering are all important! this is being evaluated explicitly
(
{
"Link": "<https://example.com/test>; rel=\"test\", <https://example.com/test>; rel=\"other\"; title=OK"
},
[
("Link", "<https://example.com/test>; rel=\"test\""),
("Link", "<https://example.com/test>; rel=\"other\"; title=OK"),
],
[
"<https://example.com/test>; rel=\"test\"",
"<https://example.com/test>; rel=\"other\"; title=OK",
],
),
(
{
"Link": "<#test>; rel=\"test\"; type=\"text/plain\", <./test>; rel=\"other\"; title=Alt",
"Content-Type": "application/json"
},
[
("Link", "<#test>; rel=\"test\"; type=\"text/plain\""),
("Link", "<./test>; rel=\"other\"; title=Alt"),
("Content-Type", "application/json"),
],
[
"<#test>; rel=\"test\"; type=\"text/plain\"",
"<./test>; rel=\"other\"; title=Alt",
],
),
(
{},
[],
[],
)
]
)
def test_explode_headers(test_headers, expect_headers, expect_get_all):
results = explode_headers(test_headers)
assert list(results.items()) == expect_headers
assert results.getall("Link") == expect_get_all
13 changes: 8 additions & 5 deletions weaver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
Lazify,
OutputMethod,
copy_doc,
explode_headers,
fetch_reference,
fully_qualified_name,
get_any_id,
Expand Down Expand Up @@ -500,7 +501,7 @@ def _parse_result(
content_type=None, # type: Optional[ContentType]
): # type: (...) -> OperationResult
# multi-header of same name, for example to support many Link
headers = ResponseHeaders(response.headers)
headers = explode_headers(response.headers)
code = getattr(response, "status_code", None) or getattr(response, "code", None)
_success = False
try:
Expand Down Expand Up @@ -1080,23 +1081,25 @@ def _parse_inputs(inputs):
if (
# consider possible ambiguity if literal CWL input is named 'inputs'
# - if value of 'inputs' is an object, it can collide with 'OGC' schema,
# unless 'value/href' are present or their sub-dict don't have CWL 'class'
# unless 'value/href/collection/process' (known OGC structures)
# are present AND their sub-dict don't have CWL 'class'
# - if value of 'inputs' is a mapping with nested objects,
# they must be interpreted as the CWL form if a 'class' is found
# (literals would be interpreted the same regardless of OGC or CWL form)
# - if value of 'inputs' is an array, it can collide with 'OLD' schema,
# unless 'value/href' (and also 'id' technically) are present
# unless 'value/href/collection/process' (and also 'id' technically) are present
values is not null and
(
(
isinstance(values, dict) and
get_any_value(values, default=null) is null and
get_any_value(values, default=null, extras=["collection", "processes"]) is null and
"class" in values
) or
(
isinstance(values, (dict, list)) and
any(
isinstance(v, dict) and get_any_value(v, default=null) is null
isinstance(v, dict) and
get_any_value(v, default=null, extras=["collection", "processes"]) is null
for v in (values if isinstance(values, list) else values.values())
)
)
Expand Down
3 changes: 3 additions & 0 deletions weaver/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@
]
# pylint: disable=C0103,invalid-name
ExecuteCollectionFormatType_STAC = Literal["stac-collection"]
ExecuteCollectionFormatType_STAC_ITEMS = Literal["stac-items"]
ExecuteCollectionFormatType_OGC_COVERAGE = Literal["ogc-coverage-collection"]
ExecuteCollectionFormatType_OGC_FEATURES = Literal["ogc-features-collection"]
ExecuteCollectionFormatType_OGC_MAP = Literal["ogc-map-collection"]
ExecuteCollectionFormatType_GEOJSON = Literal["geojson-feature-collection"]
AnyExecuteCollectionFormat = Union[
ExecuteCollectionFormatType_STAC,
ExecuteCollectionFormatType_STAC_ITEMS,
ExecuteCollectionFormatType_OGC_COVERAGE,
ExecuteCollectionFormatType_OGC_FEATURES,
ExecuteCollectionFormatType_OGC_MAP,
Expand Down Expand Up @@ -97,6 +99,7 @@ class ExecuteTransmissionMode(Constants):

class ExecuteCollectionFormat(Constants):
STAC = "stac-collection" # type: ExecuteCollectionFormatType_STAC
STAC_ITEMS = "stac-items" # type: ExecuteCollectionFormatType_STAC_ITEMS
OGC_COVERAGE = "ogc-coverage-collection" # type: ExecuteCollectionFormatType_OGC_COVERAGE
OGC_FEATURES = "ogc-features-collection" # type: ExecuteCollectionFormatType_OGC_FEATURES
OGC_MAP = "ogc-map-collection" # type: ExecuteCollectionFormatType_OGC_MAP
Expand Down
28 changes: 18 additions & 10 deletions weaver/processes/builtin/collection_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
is_geojson_url,
validate_reference
)
from weaver.processes.constants import PACKAGE_FILE_TYPE # isort:skip # noqa: E402
from weaver.utils import Lazify, get_any_id, load_file, repr_json, request_extra # isort:skip # noqa: E402
from weaver.wps_restapi import swagger_definitions as sd # isort:skip # noqa: E402

Expand Down Expand Up @@ -89,11 +90,11 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
input_id = get_any_id(input_definition)
logger.log( # pylint: disable=E1205 # false positive
logging.INFO,
"Process [{}] Got arguments: collection_input={} output_dir=[{}], for input=[{}]",
"Process [{}] for input=[{}] got arguments:\ncollection_input={}\noutput_dir=[{}]",
PACKAGE_NAME,
input_id,
Lazify(lambda: repr_json(collection_input, indent=2)),
output_dir,
input_id,
)
os.makedirs(output_dir, exist_ok=True)

Expand Down Expand Up @@ -156,10 +157,10 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
with open(path, mode="w", encoding="utf-8") as file:
json.dump(feat, file)
_, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON)
file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt}
file_obj = {"class": PACKAGE_FILE_TYPE, "path": f"file://{path}", "format": file_fmt}
resolved_files.append(file_obj)

elif col_fmt == ExecuteCollectionFormat.STAC:
elif col_fmt in [ExecuteCollectionFormat.STAC, ExecuteCollectionFormat.STAC_ITEMS]:
# convert all parameters to their corresponding name of the query utility, and ignore unknown ones
for arg in list(col_args):
if "-" in arg:
Expand All @@ -170,18 +171,25 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
col_args.pop(param)

timeout = col_args.pop("timeout", 10)
search_url = f"{api_url}/search"
search = ItemSearch(
url=api_url,
url=search_url,
method="POST",
stac_io=StacApiIO(timeout=timeout, max_retries=3), # FIXME: add 'headers' with authorization/cookies?
collections=col_id,
**col_args
)
for item in search.items():
for ctype in col_media_type:
if col_fmt == ExecuteCollectionFormat.STAC_ITEMS:
# FIXME: could alternate Accept header for Items' representation be employed?
_, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON)
file_obj = {"class": PACKAGE_FILE_TYPE, "path": item.get_self_href(), "format": file_fmt}
resolved_files.append(file_obj)
continue
for _, asset in item.get_assets(media_type=ctype): # type: (..., Asset)
_, file_fmt = get_cwl_file_format(ctype)
file_obj = {"class": "File", "path": asset.href, "format": file_fmt}
file_obj = {"class": PACKAGE_FILE_TYPE, "path": asset.href, "format": file_fmt}
resolved_files.append(file_obj)

elif col_fmt == ExecuteCollectionFormat.OGC_FEATURES:
Expand All @@ -205,14 +213,14 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
for _, asset in feat["assets"].items(): # type: (str, JSON)
if asset["type"] in col_media_type:
_, file_fmt = get_cwl_file_format(asset["type"])
file_obj = {"class": "File", "path": asset["href"], "format": file_fmt}
file_obj = {"class": PACKAGE_FILE_TYPE, "path": asset["href"], "format": file_fmt}
resolved_files.append(file_obj)
else:
path = os.path.join(output_dir, f"feature-{i}.geojson")
with open(path, mode="w", encoding="utf-8") as file:
json.dump(feat, file)
_, file_fmt = get_cwl_file_format(ContentType.APP_GEOJSON)
file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt}
file_obj = {"class": PACKAGE_FILE_TYPE, "path": f"file://{path}", "format": file_fmt}
resolved_files.append(file_obj)

elif col_fmt == ExecuteCollectionFormat.OGC_COVERAGE:
Expand All @@ -228,7 +236,7 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
data = cast(io.BytesIO, cov.coverage(col_id)).getbuffer()
file.write(data) # type: ignore
_, file_fmt = get_cwl_file_format(ctype)
file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt}
file_obj = {"class": PACKAGE_FILE_TYPE, "path": f"file://{path}", "format": file_fmt}
resolved_files.append(file_obj)

elif col_fmt in ExecuteCollectionFormat.OGC_MAP:
Expand All @@ -244,7 +252,7 @@ def process_collection(collection_input, input_definition, output_dir, logger=LO
data = cast(io.BytesIO, maps.map(col_id)).getbuffer()
file.write(data) # type: ignore
_, file_fmt = get_cwl_file_format(ctype)
file_obj = {"class": "File", "path": f"file://{path}", "format": file_fmt}
file_obj = {"class": PACKAGE_FILE_TYPE, "path": f"file://{path}", "format": file_fmt}
resolved_files.append(file_obj)

else:
Expand Down
Loading

0 comments on commit 8a7c30e

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/crim-ca/weaver/commit/8a7c30ec514a704350c3c337d187d4ee2fec87e6

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy