From 8dd6c548e872e97dc7d8da83d102a4c61c7ed55a Mon Sep 17 00:00:00 2001 From: sharkinsspatial Date: Wed, 25 Jan 2023 15:29:47 -0600 Subject: [PATCH 1/5] Remove arco flag from configuration model. --- aws_asdi_pipelines/models/pipeline.py | 1 - aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml | 1 - aws_asdi_pipelines/pipelines/sentinel1/config.yaml | 1 - 3 files changed, 3 deletions(-) diff --git a/aws_asdi_pipelines/models/pipeline.py b/aws_asdi_pipelines/models/pipeline.py index 160c98e..9041566 100644 --- a/aws_asdi_pipelines/models/pipeline.py +++ b/aws_asdi_pipelines/models/pipeline.py @@ -11,7 +11,6 @@ class ComputeEnum(str, Enum): class Pipeline(BaseModel): id: str - arco: bool collection: str compute: ComputeEnum secret_arn: str diff --git a/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml b/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml index 1f5ed76..e30b97e 100644 --- a/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml +++ b/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml @@ -1,6 +1,5 @@ --- id: "cop_dem_30" - arco: False collection: "cop-dem-glo-30" compute: "awslambda" secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" diff --git a/aws_asdi_pipelines/pipelines/sentinel1/config.yaml b/aws_asdi_pipelines/pipelines/sentinel1/config.yaml index 650ab46..73d7932 100644 --- a/aws_asdi_pipelines/pipelines/sentinel1/config.yaml +++ b/aws_asdi_pipelines/pipelines/sentinel1/config.yaml @@ -1,6 +1,5 @@ --- id: "sentinel1" - arco: False collection: "sentinel1" compute: "awslambda" sns: "arn:aws:sns:eu-central-1:214830741341:SentinelS1L1C" From 4347a90654f8bc0a035dc3aa5b74b28140a6f0fb Mon Sep 17 00:00:00 2001 From: sharkinsspatial Date: Wed, 1 Feb 2023 17:01:20 -0600 Subject: [PATCH 2/5] Update configuration and pipelines with single run Lambda for xstac. --- aws_asdi_pipelines/cdk/lambda_stack.py | 84 +++++++++++++++---- aws_asdi_pipelines/models/pipeline.py | 2 +- .../pipelines/cop_dem_30/config.yaml | 2 +- .../pipelines/sentinel1/config.yaml | 4 +- lambda.Dockerfile | 2 +- 5 files changed, 74 insertions(+), 20 deletions(-) diff --git a/aws_asdi_pipelines/cdk/lambda_stack.py b/aws_asdi_pipelines/cdk/lambda_stack.py index 28e0457..c22dca5 100644 --- a/aws_asdi_pipelines/cdk/lambda_stack.py +++ b/aws_asdi_pipelines/cdk/lambda_stack.py @@ -27,22 +27,76 @@ def __init__( **kwargs, ) -> None: super().__init__(scope, stack_name) - self.granule_dlq = sqs.Queue( - self, - f"{stack_name}_GranuleDLQ", - retention_period=cdk.Duration.days(14), - ) + if pipeline.queue: + self.granule_dlq = sqs.Queue( + self, + f"{stack_name}_GranuleDLQ", + retention_period=cdk.Duration.days(14), + ) + + self.granule_queue = sqs.Queue( + self, + f"{stack_name}_GranuleQueue", + visibility_timeout=cdk.Duration.minutes(15), + retention_period=cdk.Duration.days(14), + dead_letter_queue=sqs.DeadLetterQueue( + max_receive_count=3, + queue=self.granule_dlq, + ), + ) + else: + self.stac_function = aws_lambda.DockerImageFunction( + self, + f"{stack_name}-stac_function", + code=aws_lambda.DockerImageCode.from_ecr( + repository=self.repo, tag="latest" + ), + memory_size=1000, + timeout=cdk.Duration.minutes(14), + log_retention=logs.RetentionDays.ONE_WEEK, + environment={ + "CLIENT_SECRET": self.secret.secret_value_from_json( + "client_secret" + ).to_string(), + "CLIENT_ID": self.secret.secret_value_from_json( + "client_id" + ).to_string(), + "DOMAIN": self.secret.secret_value_from_json( + "cognito_domain" + ).to_string(), + "SCOPE": self.secret.secret_value_from_json("scope").to_string(), + "INGESTOR_URL": pipeline.ingestor_url, + }, + ) + + custom_resources.AwsCustomResource( + scope=self, + id="invoke_lambda", + policy=( + custom_resources.AwsCustomResourcePolicy.from_statements( + statements=[ + iam.PolicyStatement( + actions=["lambda:InvokeFunction"], + effect=iam.Effect.ALLOW, + resources=[self.stac_function.function_arn], + ) + ] + ) + ), + timeout=cdk.Duration.minutes(15), + on_create=custom_resources.AwsSdkCall( + service="Lambda", + action="invoke", + parameters={ + "FunctionName": self.stac_function.function_name, + "InvocationType": "Event", + }, + physical_resource_id=custom_resources.PhysicalResourceId.of( + "JobSenderTriggerPhysicalId" + ), + ), + ) - self.granule_queue = sqs.Queue( - self, - f"{stack_name}_GranuleQueue", - visibility_timeout=cdk.Duration.minutes(15), - retention_period=cdk.Duration.days(14), - dead_letter_queue=sqs.DeadLetterQueue( - max_receive_count=3, - queue=self.granule_dlq, - ), - ) if pipeline.sns: self.granule_topic = sns.Topic.from_topic_arn( self, diff --git a/aws_asdi_pipelines/models/pipeline.py b/aws_asdi_pipelines/models/pipeline.py index 9041566..f8bb422 100644 --- a/aws_asdi_pipelines/models/pipeline.py +++ b/aws_asdi_pipelines/models/pipeline.py @@ -11,10 +11,10 @@ class ComputeEnum(str, Enum): class Pipeline(BaseModel): id: str - collection: str compute: ComputeEnum secret_arn: str ingestor_url: str + queue: bool sns: Optional[str] inventory_location: Optional[str] initial_chunk: Optional[str] diff --git a/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml b/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml index e30b97e..1c42184 100644 --- a/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml +++ b/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml @@ -1,8 +1,8 @@ --- id: "cop_dem_30" - collection: "cop-dem-glo-30" compute: "awslambda" secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev/ingestions" + queue: true inventory_location: "s3://asdi-athena-testing-eu-central-1/inventories/copernicus-dem-30m.csv" historic_frequency: 0 diff --git a/aws_asdi_pipelines/pipelines/sentinel1/config.yaml b/aws_asdi_pipelines/pipelines/sentinel1/config.yaml index 73d7932..a5e3f7a 100644 --- a/aws_asdi_pipelines/pipelines/sentinel1/config.yaml +++ b/aws_asdi_pipelines/pipelines/sentinel1/config.yaml @@ -1,10 +1,10 @@ --- id: "sentinel1" - collection: "sentinel1" compute: "awslambda" - sns: "arn:aws:sns:eu-central-1:214830741341:SentinelS1L1C" secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev/ingestions" + queue: true + sns: "arn:aws:sns:eu-central-1:214830741341:SentinelS1L1C" inventory_location: "s3://sentinel-inventory/sentinel-s1-l1c/sentinel-s1-l1c-inventory/hive/dt=2022-11-16-01-00/" initial_chunk: "2022-4-20" historic_frequency: 1 diff --git a/lambda.Dockerfile b/lambda.Dockerfile index 26f82d4..d7b52f2 100644 --- a/lambda.Dockerfile +++ b/lambda.Dockerfile @@ -9,6 +9,6 @@ COPY "./aws_asdi_pipelines/cognito/*" "./aws_asdi_pipelines/cognito/" COPY setup.py . RUN pip install . COPY --from=builder /tmp/site-packages ${LAMBDA_TASK_ROOT} -COPY "./aws_asdi_pipelines/pipelines/${pipeline}/app.py" ${LAMBDA_TASK_ROOT} +COPY "./aws_asdi_pipelines/pipelines/${pipeline}/*" ${LAMBDA_TASK_ROOT} CMD [ "app.handler" ] From 10199e937584f0c3077bf7f187ceda8ecd33165c Mon Sep 17 00:00:00 2001 From: sharkinsspatial Date: Wed, 1 Feb 2023 17:03:59 -0600 Subject: [PATCH 3/5] Add xstac pipeline for aws_noaa_oisst_avhrr_only. --- .../aws_noaa_oisst_avhrr_only/__init__.py | 0 .../aws_noaa_oisst_avhrr_only/app.py | 64 +++++++++++ .../collection-template.json | 76 +++++++++++++ .../aws_noaa_oisst_avhrr_only/config.yaml | 6 ++ .../item-template.json | 102 ++++++++++++++++++ .../requirements.txt | 3 + .../aws_noaa_oisst_avhrr_only/test_app.py | 45 ++++++++ 7 files changed, 296 insertions(+) create mode 100644 aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/__init__.py create mode 100644 aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py create mode 100644 aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json create mode 100644 aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml create mode 100644 aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/item-template.json create mode 100644 aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/requirements.txt create mode 100644 aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/__init__.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py new file mode 100644 index 0000000..f16c64b --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py @@ -0,0 +1,64 @@ +import json +import os + +import fsspec +import pystac +import requests +import xarray as xr +import xstac + +from aws_asdi_pipelines.cognito.utils import get_token + + +def create_stac(collection: bool): + if collection: + template_type = "collection-template.json" + else: + template_type = "item-template.json" + template_file = os.path.join(os.path.dirname(__file__), template_type) + template = json.load(open(template_file)) + + url = "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr" + fs = fsspec.filesystem( + "reference", + fo=f"{url}/reference.json", + remote_protocol="s3", + remote_options={"anon": True}, + ) + m = fs.get_mapper("") + ds = xr.open_dataset(m, engine="zarr", backend_kwargs={"consolidated": False}) + + stac = xstac.xarray_to_stac( + ds, + template, + temporal_dimension="time", + x_dimension="lon", + y_dimension="lat", + reference_system="4326", + validate=True, + ) + stac.remove_links(pystac.RelType.SELF) + stac.remove_links(pystac.RelType.ROOT) + return stac + + +def handler(event, context): + domain = os.environ["DOMAIN"] + client_secret = os.environ["CLIENT_SECRET"] + client_id = os.environ["CLIENT_ID"] + scope = os.environ["SCOPE"] + ingestor_url = os.environ["INGESTOR_URL"] + token = get_token( + domain=domain, client_secret=client_secret, client_id=client_id, scope=scope + ) + headers = {"Authorization": f"bearer {token}"} + item = create_stac(collection=False) + item.collection_id = "aws-noaa-oisst-avhrr-only" + response = requests.post( + url=ingestor_url, data=json.dumps(item.to_dict()), headers=headers + ) + try: + response.raise_for_status() + except Exception: + print(response.text) + raise diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json new file mode 100644 index 0000000..63b6111 --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json @@ -0,0 +1,76 @@ +{ + "type": "Collection", + "id": "aws-noaa-oisst-avhrr-only", + "stac_version": "1.0.0", + "description": "{{ collection.description }}", + "extent": { + "spatial": { + "bbox": [ + [-180, -90, 180, 90] + ] + }, + "temporal": { + "interval": [[null, null]] + } + }, + "links": [ + { + "rel": "license", + "href": "https://www.ncei.noaa.gov/pub/data/sds/cdr/CDRs/Sea_Surface_Temperature_Optimum_Interpolation/UseAgreement_01B-09.pdf", + "title": "Public Domain" + } + ], + "stac_extensions": [ + "https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json", + "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" + ], + "title": "aws-noaa-oisst-avhrr-only", + "providers": [ + { + "name": "Development Seed", + "roles": [ + "processor" + ], + "url": "https://developmentseed.org/" + }, + { + "name": "AWS", + "roles": [ + "host", + "processor" + ], + "url": "https://registry.opendata.aws/noaa-cdr-oceanic/" + }, + { + "name": "NOAA", + "roles": [ + "producer" + ], + "url": "https://www.ncei.noaa.gov/products/climate-data-records/sea-surface-temperature-optimum-interpolation" + } + ], + "assets": { + "zarr-https": { + "href": "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json", + "title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root", + "description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.", + "roles": [ + "data", + "zarr", + "https" + ], + "type": "application/vnd+zarr", + "xarray:open_kwargs": { + "consolidated": true + } + } + }, + "keywords": [ + "SST", + "Temperature", + "Climate" + ], + "sci:doi": "10.25921/RE9P-PT57", + "sci:citation": "Huang, Boyin; Liu, Chunying; Banzon, Viva F.; Freeman, Eric; Graham, Garrett; Hankins, Bill; Smith, Thomas M.; Zhang, Huai-Min. (2020): NOAA 0.25-degree Daily Optimum Interpolation Sea Surface Temperature (OISST), Version 2.1. [indicate subset used]. NOAA National Centers for Environmental Information.", + "license": "proprietary" +} diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml new file mode 100644 index 0000000..db40a79 --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml @@ -0,0 +1,6 @@ +--- + id: "aws-noaa-oisst" + compute: "awslambda" + secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" + ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev/ingestions" + queue: false diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/item-template.json b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/item-template.json new file mode 100644 index 0000000..c002eb7 --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/item-template.json @@ -0,0 +1,102 @@ +{ + "type": "Feature", + "id": "aws-noaa-oisst-avhrr-only", + "description": "The NOAA 1/4° daily Optimum Interpolation Sea Surface Temperature (or daily OISST) Climate Data Record (CDR) provides complete ocean temperature fields constructed by combining bias-adjusted observations from different platforms (satellites, ships, buoys) on a regular global grid, with gaps filled in by interpolation.", + "stac_version": "1.0.0", + "bbox": [ + -180, + -90, + 180, + 90 + ], + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [ + -180, + -90 + ], + [ + 180, + -90 + ], + [ + 180, + 90 + ], + [ + -180, + 90 + ], + [ + -180, + -90 + ] + ] + ] + }, + "properties": { + "datetime": "1981-09-01T12:00:00Z", + "sci:doi": "10.25921/RE9P-PT57", + "sci:citation": "Huang, Boyin; Liu, Chunying; Banzon, Viva F.; Freeman, Eric; Graham, Garrett; Hankins, Bill; Smith, Thomas M.; Zhang, Huai-Min. (2020): NOAA 0.25-degree Daily Optimum Interpolation Sea Surface Temperature (OISST), Version 2.1. [indicate subset used]. NOAA National Centers for Environmental Information." + }, + "links": [ + { + "rel": "license", + "href": "https://www.ncei.noaa.gov/pub/data/sds/cdr/CDRs/Sea_Surface_Temperature_Optimum_Interpolation/UseAgreement_01B-09.pdf", + "title": "Public Domain" + } + ], + "stac_extensions": [ + "https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json", + "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" + ], + "title": "aws-noaa-oisst-avhrr-only", + "providers": [ + { + "name": "Development Seed", + "roles": [ + "processor" + ], + "url": "https://developmentseed.org/" + }, + { + "name": "AWS", + "roles": [ + "host", + "processor" + ], + "url": "https://registry.opendata.aws/noaa-cdr-oceanic/" + }, + { + "name": "NOAA", + "roles": [ + "producer" + ], + "url": "https://www.ncei.noaa.gov/products/climate-data-records/sea-surface-temperature-optimum-interpolation" + } + ], + "assets": { + "zarr-https": { + "href": "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json", + "title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root", + "description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.", + "roles": [ + "data", + "zarr", + "https" + ], + "type": "application/vnd+zarr", + "xarray:open_kwargs": { + "consolidated": true + } + } + }, + "keywords": [ + "SST", + "Temperature", + "Climate" + ], + "license": "proprietary" +} diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/requirements.txt b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/requirements.txt new file mode 100644 index 0000000..b4fdf5d --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/requirements.txt @@ -0,0 +1,3 @@ +s3fs +xstac +requests diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py new file mode 100644 index 0000000..f5b9a2e --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py @@ -0,0 +1,45 @@ +import json +import os +from unittest.mock import patch + +import pystac + +from aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app import ( + create_stac, + handler, +) + +domain = "domain" +client_secret = "client_secret" +client_id = "client_id" +scope = "scope" +ingestor_url = "ingestor_url" + + +def test_create_stac(): + item = create_stac(collection=False) + assert type(item) == pystac.item.Item + + +@patch.dict(os.environ, {"DOMAIN": domain}) +@patch.dict(os.environ, {"CLIENT_SECRET": client_secret}) +@patch.dict(os.environ, {"CLIENT_ID": client_id}) +@patch.dict(os.environ, {"SCOPE": scope}) +@patch.dict(os.environ, {"INGESTOR_URL": ingestor_url}) +@patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.create_stac") +@patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.requests") +@patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.get_token") +def test_handler(get_token, requests, create_stac): + token = "token" + item = {"id": "id"} + create_stac.return_value.to_dict.return_value = item + get_token.return_value = token + handler({}, {}) + get_token.assert_called_once_with( + domain=domain, client_secret=client_secret, client_id=client_id, scope=scope + ) + requests.post.assert_called_once_with( + url=ingestor_url, + data=json.dumps(item), + headers={"Authorization": f"bearer {token}"}, + ) From 8f1c4e7b47ad996e5fa3850d5229a1c0c79eb3d6 Mon Sep 17 00:00:00 2001 From: sharkinsspatial Date: Mon, 13 Mar 2023 19:11:41 -0600 Subject: [PATCH 4/5] Refactor lambda_stack to use shared granule function for xstac. --- aws_asdi_pipelines/cdk/app.py | 5 +- aws_asdi_pipelines/cdk/lambda_stack.py | 131 ++++++++++--------------- lambda.Dockerfile | 2 +- 3 files changed, 55 insertions(+), 83 deletions(-) diff --git a/aws_asdi_pipelines/cdk/app.py b/aws_asdi_pipelines/cdk/app.py index 224fe98..27264c6 100644 --- a/aws_asdi_pipelines/cdk/app.py +++ b/aws_asdi_pipelines/cdk/app.py @@ -7,9 +7,10 @@ from aws_asdi_pipelines.models.pipeline import Pipeline # Required environment variables -stack_name = os.environ["PIPELINE"].replace("_", "-") +pipeline = os.environ["PIPELINE"] +stack_name = pipeline.replace("_", "-") -with open(f"./aws_asdi_pipelines/pipelines/{stack_name}/config.yaml") as f: +with open(f"./aws_asdi_pipelines/pipelines/{pipeline}/config.yaml") as f: config = yaml.safe_load(f) pipeline = Pipeline(**config) diff --git a/aws_asdi_pipelines/cdk/lambda_stack.py b/aws_asdi_pipelines/cdk/lambda_stack.py index c22dca5..5af9c0f 100644 --- a/aws_asdi_pipelines/cdk/lambda_stack.py +++ b/aws_asdi_pipelines/cdk/lambda_stack.py @@ -27,13 +27,55 @@ def __init__( **kwargs, ) -> None: super().__init__(scope, stack_name) + self.secret = secretsmanager.Secret.from_secret_complete_arn( + self, f"{pipeline.id}_secret_new", secret_complete_arn=pipeline.secret_arn + ) + self.repo = ecr.Repository.from_repository_name( + self, + f"{stack_name}_Repository", + repository_name=pipeline.id, + ) + self.granule_function = aws_lambda.DockerImageFunction( + self, + f"{stack_name}-granule_function", + code=aws_lambda.DockerImageCode.from_ecr( + repository=self.repo, tag="latest" + ), + memory_size=1000, + timeout=cdk.Duration.minutes(14), + log_retention=logs.RetentionDays.ONE_WEEK, + environment={ + "CLIENT_SECRET": self.secret.secret_value_from_json( + "client_secret" + ).to_string(), + "CLIENT_ID": self.secret.secret_value_from_json( + "client_id" + ).to_string(), + "DOMAIN": self.secret.secret_value_from_json( + "cognito_domain" + ).to_string(), + "SCOPE": self.secret.secret_value_from_json("scope").to_string(), + "INGESTOR_URL": pipeline.ingestor_url, + }, + ) + self.open_buckets_statement = iam.PolicyStatement( + resources=[ + "arn:aws:s3:::*", + ], + actions=[ + "s3:Get*", + "s3:List*", + "s3:ListBucket", + ], + ) + self.granule_function.role.add_to_principal_policy(self.open_buckets_statement) + if pipeline.queue: self.granule_dlq = sqs.Queue( self, f"{stack_name}_GranuleDLQ", retention_period=cdk.Duration.days(14), ) - self.granule_queue = sqs.Queue( self, f"{stack_name}_GranuleQueue", @@ -44,31 +86,13 @@ def __init__( queue=self.granule_dlq, ), ) - else: - self.stac_function = aws_lambda.DockerImageFunction( - self, - f"{stack_name}-stac_function", - code=aws_lambda.DockerImageCode.from_ecr( - repository=self.repo, tag="latest" - ), - memory_size=1000, - timeout=cdk.Duration.minutes(14), - log_retention=logs.RetentionDays.ONE_WEEK, - environment={ - "CLIENT_SECRET": self.secret.secret_value_from_json( - "client_secret" - ).to_string(), - "CLIENT_ID": self.secret.secret_value_from_json( - "client_id" - ).to_string(), - "DOMAIN": self.secret.secret_value_from_json( - "cognito_domain" - ).to_string(), - "SCOPE": self.secret.secret_value_from_json("scope").to_string(), - "INGESTOR_URL": pipeline.ingestor_url, - }, + self.granule_queue.grant_consume_messages(self.granule_function.role) + self.event_source = lambda_event_sources.SqsEventSource( + queue=self.granule_queue, + batch_size=1, ) - + self.granule_function.add_event_source(self.event_source) + else: custom_resources.AwsCustomResource( scope=self, id="invoke_lambda", @@ -78,7 +102,7 @@ def __init__( iam.PolicyStatement( actions=["lambda:InvokeFunction"], effect=iam.Effect.ALLOW, - resources=[self.stac_function.function_arn], + resources=[self.granule_function.function_arn], ) ] ) @@ -88,7 +112,7 @@ def __init__( service="Lambda", action="invoke", parameters={ - "FunctionName": self.stac_function.function_name, + "FunctionName": self.granule_function.function_name, "InvocationType": "Event", }, physical_resource_id=custom_resources.PhysicalResourceId.of( @@ -108,59 +132,6 @@ def __init__( ) self.granule_topic.add_subscription(self.sns_subscription) - self.secret = secretsmanager.Secret.from_secret_complete_arn( - self, f"{stack_name}_secret_new", secret_complete_arn=pipeline.secret_arn - ) - self.repo = ecr.Repository.from_repository_name( - self, - f"{stack_name}_Repository", - repository_name=stack_name, - ) - - self.granule_function = aws_lambda.DockerImageFunction( - self, - f"{stack_name}-granule_function", - code=aws_lambda.DockerImageCode.from_ecr( - repository=self.repo, tag="latest" - ), - memory_size=1000, - timeout=cdk.Duration.minutes(14), - log_retention=logs.RetentionDays.ONE_WEEK, - environment={ - "CLIENT_SECRET": self.secret.secret_value_from_json( - "client_secret" - ).to_string(), - "CLIENT_ID": self.secret.secret_value_from_json( - "client_id" - ).to_string(), - "DOMAIN": self.secret.secret_value_from_json( - "cognito_domain" - ).to_string(), - "SCOPE": self.secret.secret_value_from_json("scope").to_string(), - "INGESTOR_URL": pipeline.ingestor_url, - }, - ) - - self.open_buckets_statement = iam.PolicyStatement( - resources=[ - "arn:aws:s3:::*", - ], - actions=[ - "s3:Get*", - "s3:List*", - "s3:ListBucket", - ], - ) - - self.granule_function.role.add_to_principal_policy(self.open_buckets_statement) - - self.granule_queue.grant_consume_messages(self.granule_function.role) - self.event_source = lambda_event_sources.SqsEventSource( - queue=self.granule_queue, - batch_size=1, - ) - self.granule_function.add_event_source(self.event_source) - if pipeline.inventory_location: self.athena_results_bucket = s3.Bucket( self, diff --git a/lambda.Dockerfile b/lambda.Dockerfile index d7b52f2..72ed127 100644 --- a/lambda.Dockerfile +++ b/lambda.Dockerfile @@ -9,6 +9,6 @@ COPY "./aws_asdi_pipelines/cognito/*" "./aws_asdi_pipelines/cognito/" COPY setup.py . RUN pip install . COPY --from=builder /tmp/site-packages ${LAMBDA_TASK_ROOT} -COPY "./aws_asdi_pipelines/pipelines/${pipeline}/*" ${LAMBDA_TASK_ROOT} +COPY "./aws_asdi_pipelines/pipelines/${pipeline}/*" "${LAMBDA_TASK_ROOT}/" CMD [ "app.handler" ] From 277358fc2e31dfe3451ebadcd6b831e5675108fb Mon Sep 17 00:00:00 2001 From: sharkinsspatial Date: Mon, 13 Mar 2023 19:13:21 -0600 Subject: [PATCH 5/5] Add collection creation to aws_noaa_oisst pipeline as xstac example. --- .../aws_noaa_oisst_avhrr_only/app.py | 25 +++++++++------- .../collection-template.json | 16 ++++++++++ .../aws_noaa_oisst_avhrr_only/config.yaml | 4 +-- .../aws_noaa_oisst_avhrr_only/test_app.py | 30 +++++++++++++++---- 4 files changed, 57 insertions(+), 18 deletions(-) diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py index f16c64b..bb1dd13 100644 --- a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py @@ -1,5 +1,6 @@ import json import os +from urllib.parse import urljoin import fsspec import pystac @@ -10,7 +11,7 @@ from aws_asdi_pipelines.cognito.utils import get_token -def create_stac(collection: bool): +def create_stac(collection: bool) -> pystac.STACObject: if collection: template_type = "collection-template.json" else: @@ -42,6 +43,15 @@ def create_stac(collection: bool): return stac +def post_ingestor(stac: pystac.STACObject, url: str, headers): + response = requests.post(url=url, data=json.dumps(stac.to_dict()), headers=headers) + try: + response.raise_for_status() + except Exception: + print(response.text) + raise + + def handler(event, context): domain = os.environ["DOMAIN"] client_secret = os.environ["CLIENT_SECRET"] @@ -52,13 +62,8 @@ def handler(event, context): domain=domain, client_secret=client_secret, client_id=client_id, scope=scope ) headers = {"Authorization": f"bearer {token}"} + collection = create_stac(collection=True) + post_ingestor(collection, urljoin(ingestor_url, "collections"), headers) item = create_stac(collection=False) - item.collection_id = "aws-noaa-oisst-avhrr-only" - response = requests.post( - url=ingestor_url, data=json.dumps(item.to_dict()), headers=headers - ) - try: - response.raise_for_status() - except Exception: - print(response.text) - raise + item.collection_id = collection.id + post_ingestor(item, urljoin(ingestor_url, "ingestions"), headers) diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json index 63b6111..ea0f4e7 100644 --- a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json @@ -21,6 +21,7 @@ } ], "stac_extensions": [ + "https://stac-extensions.github.io/item-assets/v1.0.0/schema.json", "https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json", "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" ], @@ -49,6 +50,21 @@ "url": "https://www.ncei.noaa.gov/products/climate-data-records/sea-surface-temperature-optimum-interpolation" } ], + "item_assets": { + "zarr-https": { + "title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root", + "description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.", + "roles": [ + "data", + "zarr", + "https" + ], + "type": "application/vnd+zarr", + "xarray:open_kwargs": { + "consolidated": true + } + } + }, "assets": { "zarr-https": { "href": "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json", diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml index db40a79..2ab0fce 100644 --- a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml @@ -1,6 +1,6 @@ --- - id: "aws-noaa-oisst" + id: "aws_noaa_oisst_avhrr_only" compute: "awslambda" secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" - ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev/ingestions" + ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev" queue: false diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py index f5b9a2e..efd5a86 100644 --- a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py @@ -1,6 +1,6 @@ import json import os -from unittest.mock import patch +from unittest.mock import MagicMock, call, patch import pystac @@ -14,6 +14,17 @@ client_id = "client_id" scope = "scope" ingestor_url = "ingestor_url" +item = {"id": "id"} +stac_collection = {"id": "collection"} + + +def create_stac_return(collection: bool): + mock = MagicMock() + if collection: + mock.to_dict.return_value = stac_collection + else: + mock.to_dict.return_value = item + return mock def test_create_stac(): @@ -26,20 +37,27 @@ def test_create_stac(): @patch.dict(os.environ, {"CLIENT_ID": client_id}) @patch.dict(os.environ, {"SCOPE": scope}) @patch.dict(os.environ, {"INGESTOR_URL": ingestor_url}) -@patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.create_stac") +@patch( + "aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.create_stac", + side_effect=create_stac_return, +) @patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.requests") @patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.get_token") def test_handler(get_token, requests, create_stac): token = "token" - item = {"id": "id"} - create_stac.return_value.to_dict.return_value = item get_token.return_value = token handler({}, {}) get_token.assert_called_once_with( domain=domain, client_secret=client_secret, client_id=client_id, scope=scope ) - requests.post.assert_called_once_with( - url=ingestor_url, + collections_call = call( + url=f"{ingestor_url}/collections", + data=json.dumps(stac_collection), + headers={"Authorization": f"bearer {token}"}, + ) + item_call = call( + url=f"{ingestor_url}/ingestor", data=json.dumps(item), headers={"Authorization": f"bearer {token}"}, ) + requests.post.assert_has_calls([collections_call, item_call], any_order=True)