diff --git a/aws_asdi_pipelines/cdk/app.py b/aws_asdi_pipelines/cdk/app.py index 224fe98..27264c6 100644 --- a/aws_asdi_pipelines/cdk/app.py +++ b/aws_asdi_pipelines/cdk/app.py @@ -7,9 +7,10 @@ from aws_asdi_pipelines.models.pipeline import Pipeline # Required environment variables -stack_name = os.environ["PIPELINE"].replace("_", "-") +pipeline = os.environ["PIPELINE"] +stack_name = pipeline.replace("_", "-") -with open(f"./aws_asdi_pipelines/pipelines/{stack_name}/config.yaml") as f: +with open(f"./aws_asdi_pipelines/pipelines/{pipeline}/config.yaml") as f: config = yaml.safe_load(f) pipeline = Pipeline(**config) diff --git a/aws_asdi_pipelines/cdk/lambda_stack.py b/aws_asdi_pipelines/cdk/lambda_stack.py index 28e0457..5af9c0f 100644 --- a/aws_asdi_pipelines/cdk/lambda_stack.py +++ b/aws_asdi_pipelines/cdk/lambda_stack.py @@ -27,42 +27,14 @@ def __init__( **kwargs, ) -> None: super().__init__(scope, stack_name) - self.granule_dlq = sqs.Queue( - self, - f"{stack_name}_GranuleDLQ", - retention_period=cdk.Duration.days(14), - ) - - self.granule_queue = sqs.Queue( - self, - f"{stack_name}_GranuleQueue", - visibility_timeout=cdk.Duration.minutes(15), - retention_period=cdk.Duration.days(14), - dead_letter_queue=sqs.DeadLetterQueue( - max_receive_count=3, - queue=self.granule_dlq, - ), - ) - if pipeline.sns: - self.granule_topic = sns.Topic.from_topic_arn( - self, - f"{stack_name}_GranuleSNS", - topic_arn=pipeline.sns, - ) - self.sns_subscription = sns_subscriptions.SqsSubscription( - queue=self.granule_queue, - ) - self.granule_topic.add_subscription(self.sns_subscription) - self.secret = secretsmanager.Secret.from_secret_complete_arn( - self, f"{stack_name}_secret_new", secret_complete_arn=pipeline.secret_arn + self, f"{pipeline.id}_secret_new", secret_complete_arn=pipeline.secret_arn ) self.repo = ecr.Repository.from_repository_name( self, f"{stack_name}_Repository", - repository_name=stack_name, + repository_name=pipeline.id, ) - self.granule_function = aws_lambda.DockerImageFunction( self, f"{stack_name}-granule_function", @@ -86,7 +58,6 @@ def __init__( "INGESTOR_URL": pipeline.ingestor_url, }, ) - self.open_buckets_statement = iam.PolicyStatement( resources=[ "arn:aws:s3:::*", @@ -97,15 +68,69 @@ def __init__( "s3:ListBucket", ], ) - self.granule_function.role.add_to_principal_policy(self.open_buckets_statement) - self.granule_queue.grant_consume_messages(self.granule_function.role) - self.event_source = lambda_event_sources.SqsEventSource( - queue=self.granule_queue, - batch_size=1, - ) - self.granule_function.add_event_source(self.event_source) + if pipeline.queue: + self.granule_dlq = sqs.Queue( + self, + f"{stack_name}_GranuleDLQ", + retention_period=cdk.Duration.days(14), + ) + self.granule_queue = sqs.Queue( + self, + f"{stack_name}_GranuleQueue", + visibility_timeout=cdk.Duration.minutes(15), + retention_period=cdk.Duration.days(14), + dead_letter_queue=sqs.DeadLetterQueue( + max_receive_count=3, + queue=self.granule_dlq, + ), + ) + self.granule_queue.grant_consume_messages(self.granule_function.role) + self.event_source = lambda_event_sources.SqsEventSource( + queue=self.granule_queue, + batch_size=1, + ) + self.granule_function.add_event_source(self.event_source) + else: + custom_resources.AwsCustomResource( + scope=self, + id="invoke_lambda", + policy=( + custom_resources.AwsCustomResourcePolicy.from_statements( + statements=[ + iam.PolicyStatement( + actions=["lambda:InvokeFunction"], + effect=iam.Effect.ALLOW, + resources=[self.granule_function.function_arn], + ) + ] + ) + ), + timeout=cdk.Duration.minutes(15), + on_create=custom_resources.AwsSdkCall( + service="Lambda", + action="invoke", + parameters={ + "FunctionName": self.granule_function.function_name, + "InvocationType": "Event", + }, + physical_resource_id=custom_resources.PhysicalResourceId.of( + "JobSenderTriggerPhysicalId" + ), + ), + ) + + if pipeline.sns: + self.granule_topic = sns.Topic.from_topic_arn( + self, + f"{stack_name}_GranuleSNS", + topic_arn=pipeline.sns, + ) + self.sns_subscription = sns_subscriptions.SqsSubscription( + queue=self.granule_queue, + ) + self.granule_topic.add_subscription(self.sns_subscription) if pipeline.inventory_location: self.athena_results_bucket = s3.Bucket( diff --git a/aws_asdi_pipelines/models/pipeline.py b/aws_asdi_pipelines/models/pipeline.py index 160c98e..f8bb422 100644 --- a/aws_asdi_pipelines/models/pipeline.py +++ b/aws_asdi_pipelines/models/pipeline.py @@ -11,11 +11,10 @@ class ComputeEnum(str, Enum): class Pipeline(BaseModel): id: str - arco: bool - collection: str compute: ComputeEnum secret_arn: str ingestor_url: str + queue: bool sns: Optional[str] inventory_location: Optional[str] initial_chunk: Optional[str] diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/__init__.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py new file mode 100644 index 0000000..bb1dd13 --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py @@ -0,0 +1,69 @@ +import json +import os +from urllib.parse import urljoin + +import fsspec +import pystac +import requests +import xarray as xr +import xstac + +from aws_asdi_pipelines.cognito.utils import get_token + + +def create_stac(collection: bool) -> pystac.STACObject: + if collection: + template_type = "collection-template.json" + else: + template_type = "item-template.json" + template_file = os.path.join(os.path.dirname(__file__), template_type) + template = json.load(open(template_file)) + + url = "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr" + fs = fsspec.filesystem( + "reference", + fo=f"{url}/reference.json", + remote_protocol="s3", + remote_options={"anon": True}, + ) + m = fs.get_mapper("") + ds = xr.open_dataset(m, engine="zarr", backend_kwargs={"consolidated": False}) + + stac = xstac.xarray_to_stac( + ds, + template, + temporal_dimension="time", + x_dimension="lon", + y_dimension="lat", + reference_system="4326", + validate=True, + ) + stac.remove_links(pystac.RelType.SELF) + stac.remove_links(pystac.RelType.ROOT) + return stac + + +def post_ingestor(stac: pystac.STACObject, url: str, headers): + response = requests.post(url=url, data=json.dumps(stac.to_dict()), headers=headers) + try: + response.raise_for_status() + except Exception: + print(response.text) + raise + + +def handler(event, context): + domain = os.environ["DOMAIN"] + client_secret = os.environ["CLIENT_SECRET"] + client_id = os.environ["CLIENT_ID"] + scope = os.environ["SCOPE"] + ingestor_url = os.environ["INGESTOR_URL"] + token = get_token( + domain=domain, client_secret=client_secret, client_id=client_id, scope=scope + ) + headers = {"Authorization": f"bearer {token}"} + collection = create_stac(collection=True) + post_ingestor(collection, urljoin(ingestor_url, "collections"), headers) + item = create_stac(collection=False) + item.collection_id = collection.id + post_ingestor(item, urljoin(ingestor_url, "ingestions"), headers) diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json new file mode 100644 index 0000000..ea0f4e7 --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/collection-template.json @@ -0,0 +1,92 @@ +{ + "type": "Collection", + "id": "aws-noaa-oisst-avhrr-only", + "stac_version": "1.0.0", + "description": "{{ collection.description }}", + "extent": { + "spatial": { + "bbox": [ + [-180, -90, 180, 90] + ] + }, + "temporal": { + "interval": [[null, null]] + } + }, + "links": [ + { + "rel": "license", + "href": "https://www.ncei.noaa.gov/pub/data/sds/cdr/CDRs/Sea_Surface_Temperature_Optimum_Interpolation/UseAgreement_01B-09.pdf", + "title": "Public Domain" + } + ], + "stac_extensions": [ + "https://stac-extensions.github.io/item-assets/v1.0.0/schema.json", + "https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json", + "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" + ], + "title": "aws-noaa-oisst-avhrr-only", + "providers": [ + { + "name": "Development Seed", + "roles": [ + "processor" + ], + "url": "https://developmentseed.org/" + }, + { + "name": "AWS", + "roles": [ + "host", + "processor" + ], + "url": "https://registry.opendata.aws/noaa-cdr-oceanic/" + }, + { + "name": "NOAA", + "roles": [ + "producer" + ], + "url": "https://www.ncei.noaa.gov/products/climate-data-records/sea-surface-temperature-optimum-interpolation" + } + ], + "item_assets": { + "zarr-https": { + "title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root", + "description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.", + "roles": [ + "data", + "zarr", + "https" + ], + "type": "application/vnd+zarr", + "xarray:open_kwargs": { + "consolidated": true + } + } + }, + "assets": { + "zarr-https": { + "href": "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json", + "title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root", + "description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.", + "roles": [ + "data", + "zarr", + "https" + ], + "type": "application/vnd+zarr", + "xarray:open_kwargs": { + "consolidated": true + } + } + }, + "keywords": [ + "SST", + "Temperature", + "Climate" + ], + "sci:doi": "10.25921/RE9P-PT57", + "sci:citation": "Huang, Boyin; Liu, Chunying; Banzon, Viva F.; Freeman, Eric; Graham, Garrett; Hankins, Bill; Smith, Thomas M.; Zhang, Huai-Min. (2020): NOAA 0.25-degree Daily Optimum Interpolation Sea Surface Temperature (OISST), Version 2.1. [indicate subset used]. NOAA National Centers for Environmental Information.", + "license": "proprietary" +} diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml new file mode 100644 index 0000000..2ab0fce --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/config.yaml @@ -0,0 +1,6 @@ +--- + id: "aws_noaa_oisst_avhrr_only" + compute: "awslambda" + secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" + ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev" + queue: false diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/item-template.json b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/item-template.json new file mode 100644 index 0000000..c002eb7 --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/item-template.json @@ -0,0 +1,102 @@ +{ + "type": "Feature", + "id": "aws-noaa-oisst-avhrr-only", + "description": "The NOAA 1/4° daily Optimum Interpolation Sea Surface Temperature (or daily OISST) Climate Data Record (CDR) provides complete ocean temperature fields constructed by combining bias-adjusted observations from different platforms (satellites, ships, buoys) on a regular global grid, with gaps filled in by interpolation.", + "stac_version": "1.0.0", + "bbox": [ + -180, + -90, + 180, + 90 + ], + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [ + -180, + -90 + ], + [ + 180, + -90 + ], + [ + 180, + 90 + ], + [ + -180, + 90 + ], + [ + -180, + -90 + ] + ] + ] + }, + "properties": { + "datetime": "1981-09-01T12:00:00Z", + "sci:doi": "10.25921/RE9P-PT57", + "sci:citation": "Huang, Boyin; Liu, Chunying; Banzon, Viva F.; Freeman, Eric; Graham, Garrett; Hankins, Bill; Smith, Thomas M.; Zhang, Huai-Min. (2020): NOAA 0.25-degree Daily Optimum Interpolation Sea Surface Temperature (OISST), Version 2.1. [indicate subset used]. NOAA National Centers for Environmental Information." + }, + "links": [ + { + "rel": "license", + "href": "https://www.ncei.noaa.gov/pub/data/sds/cdr/CDRs/Sea_Surface_Temperature_Optimum_Interpolation/UseAgreement_01B-09.pdf", + "title": "Public Domain" + } + ], + "stac_extensions": [ + "https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json", + "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" + ], + "title": "aws-noaa-oisst-avhrr-only", + "providers": [ + { + "name": "Development Seed", + "roles": [ + "processor" + ], + "url": "https://developmentseed.org/" + }, + { + "name": "AWS", + "roles": [ + "host", + "processor" + ], + "url": "https://registry.opendata.aws/noaa-cdr-oceanic/" + }, + { + "name": "NOAA", + "roles": [ + "producer" + ], + "url": "https://www.ncei.noaa.gov/products/climate-data-records/sea-surface-temperature-optimum-interpolation" + } + ], + "assets": { + "zarr-https": { + "href": "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json", + "title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root", + "description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.", + "roles": [ + "data", + "zarr", + "https" + ], + "type": "application/vnd+zarr", + "xarray:open_kwargs": { + "consolidated": true + } + } + }, + "keywords": [ + "SST", + "Temperature", + "Climate" + ], + "license": "proprietary" +} diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/requirements.txt b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/requirements.txt new file mode 100644 index 0000000..b4fdf5d --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/requirements.txt @@ -0,0 +1,3 @@ +s3fs +xstac +requests diff --git a/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py new file mode 100644 index 0000000..efd5a86 --- /dev/null +++ b/aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/test_app.py @@ -0,0 +1,63 @@ +import json +import os +from unittest.mock import MagicMock, call, patch + +import pystac + +from aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app import ( + create_stac, + handler, +) + +domain = "domain" +client_secret = "client_secret" +client_id = "client_id" +scope = "scope" +ingestor_url = "ingestor_url" +item = {"id": "id"} +stac_collection = {"id": "collection"} + + +def create_stac_return(collection: bool): + mock = MagicMock() + if collection: + mock.to_dict.return_value = stac_collection + else: + mock.to_dict.return_value = item + return mock + + +def test_create_stac(): + item = create_stac(collection=False) + assert type(item) == pystac.item.Item + + +@patch.dict(os.environ, {"DOMAIN": domain}) +@patch.dict(os.environ, {"CLIENT_SECRET": client_secret}) +@patch.dict(os.environ, {"CLIENT_ID": client_id}) +@patch.dict(os.environ, {"SCOPE": scope}) +@patch.dict(os.environ, {"INGESTOR_URL": ingestor_url}) +@patch( + "aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.create_stac", + side_effect=create_stac_return, +) +@patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.requests") +@patch("aws_asdi_pipelines.pipelines.aws_noaa_oisst_avhrr_only.app.get_token") +def test_handler(get_token, requests, create_stac): + token = "token" + get_token.return_value = token + handler({}, {}) + get_token.assert_called_once_with( + domain=domain, client_secret=client_secret, client_id=client_id, scope=scope + ) + collections_call = call( + url=f"{ingestor_url}/collections", + data=json.dumps(stac_collection), + headers={"Authorization": f"bearer {token}"}, + ) + item_call = call( + url=f"{ingestor_url}/ingestor", + data=json.dumps(item), + headers={"Authorization": f"bearer {token}"}, + ) + requests.post.assert_has_calls([collections_call, item_call], any_order=True) diff --git a/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml b/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml index 1f5ed76..1c42184 100644 --- a/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml +++ b/aws_asdi_pipelines/pipelines/cop_dem_30/config.yaml @@ -1,9 +1,8 @@ --- id: "cop_dem_30" - arco: False - collection: "cop-dem-glo-30" compute: "awslambda" secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev/ingestions" + queue: true inventory_location: "s3://asdi-athena-testing-eu-central-1/inventories/copernicus-dem-30m.csv" historic_frequency: 0 diff --git a/aws_asdi_pipelines/pipelines/sentinel1/config.yaml b/aws_asdi_pipelines/pipelines/sentinel1/config.yaml index 650ab46..a5e3f7a 100644 --- a/aws_asdi_pipelines/pipelines/sentinel1/config.yaml +++ b/aws_asdi_pipelines/pipelines/sentinel1/config.yaml @@ -1,11 +1,10 @@ --- id: "sentinel1" - arco: False - collection: "sentinel1" compute: "awslambda" - sns: "arn:aws:sns:eu-central-1:214830741341:SentinelS1L1C" secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos" ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev/ingestions" + queue: true + sns: "arn:aws:sns:eu-central-1:214830741341:SentinelS1L1C" inventory_location: "s3://sentinel-inventory/sentinel-s1-l1c/sentinel-s1-l1c-inventory/hive/dt=2022-11-16-01-00/" initial_chunk: "2022-4-20" historic_frequency: 1 diff --git a/lambda.Dockerfile b/lambda.Dockerfile index 26f82d4..72ed127 100644 --- a/lambda.Dockerfile +++ b/lambda.Dockerfile @@ -9,6 +9,6 @@ COPY "./aws_asdi_pipelines/cognito/*" "./aws_asdi_pipelines/cognito/" COPY setup.py . RUN pip install . COPY --from=builder /tmp/site-packages ${LAMBDA_TASK_ROOT} -COPY "./aws_asdi_pipelines/pipelines/${pipeline}/app.py" ${LAMBDA_TASK_ROOT} +COPY "./aws_asdi_pipelines/pipelines/${pipeline}/*" "${LAMBDA_TASK_ROOT}/" CMD [ "app.handler" ]