Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions aws_asdi_pipelines/cdk/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from aws_asdi_pipelines.models.pipeline import Pipeline

# Required environment variables
stack_name = os.environ["PIPELINE"].replace("_", "-")
pipeline = os.environ["PIPELINE"]
stack_name = pipeline.replace("_", "-")

with open(f"./aws_asdi_pipelines/pipelines/{stack_name}/config.yaml") as f:
with open(f"./aws_asdi_pipelines/pipelines/{pipeline}/config.yaml") as f:
config = yaml.safe_load(f)
pipeline = Pipeline(**config)

Expand Down
101 changes: 63 additions & 38 deletions aws_asdi_pipelines/cdk/lambda_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,14 @@ def __init__(
**kwargs,
) -> None:
super().__init__(scope, stack_name)
self.granule_dlq = sqs.Queue(
self,
f"{stack_name}_GranuleDLQ",
retention_period=cdk.Duration.days(14),
)

self.granule_queue = sqs.Queue(
self,
f"{stack_name}_GranuleQueue",
visibility_timeout=cdk.Duration.minutes(15),
retention_period=cdk.Duration.days(14),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.granule_dlq,
),
)
if pipeline.sns:
self.granule_topic = sns.Topic.from_topic_arn(
self,
f"{stack_name}_GranuleSNS",
topic_arn=pipeline.sns,
)
self.sns_subscription = sns_subscriptions.SqsSubscription(
queue=self.granule_queue,
)
self.granule_topic.add_subscription(self.sns_subscription)

self.secret = secretsmanager.Secret.from_secret_complete_arn(
self, f"{stack_name}_secret_new", secret_complete_arn=pipeline.secret_arn
self, f"{pipeline.id}_secret_new", secret_complete_arn=pipeline.secret_arn
)
self.repo = ecr.Repository.from_repository_name(
self,
f"{stack_name}_Repository",
repository_name=stack_name,
repository_name=pipeline.id,
)

self.granule_function = aws_lambda.DockerImageFunction(
self,
f"{stack_name}-granule_function",
Expand All @@ -86,7 +58,6 @@ def __init__(
"INGESTOR_URL": pipeline.ingestor_url,
},
)

self.open_buckets_statement = iam.PolicyStatement(
resources=[
"arn:aws:s3:::*",
Expand All @@ -97,15 +68,69 @@ def __init__(
"s3:ListBucket",
],
)

self.granule_function.role.add_to_principal_policy(self.open_buckets_statement)

self.granule_queue.grant_consume_messages(self.granule_function.role)
self.event_source = lambda_event_sources.SqsEventSource(
queue=self.granule_queue,
batch_size=1,
)
self.granule_function.add_event_source(self.event_source)
if pipeline.queue:
self.granule_dlq = sqs.Queue(
self,
f"{stack_name}_GranuleDLQ",
retention_period=cdk.Duration.days(14),
)
self.granule_queue = sqs.Queue(
self,
f"{stack_name}_GranuleQueue",
visibility_timeout=cdk.Duration.minutes(15),
retention_period=cdk.Duration.days(14),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.granule_dlq,
),
)
self.granule_queue.grant_consume_messages(self.granule_function.role)
self.event_source = lambda_event_sources.SqsEventSource(
queue=self.granule_queue,
batch_size=1,
)
self.granule_function.add_event_source(self.event_source)
else:
custom_resources.AwsCustomResource(
scope=self,
id="invoke_lambda",
policy=(
custom_resources.AwsCustomResourcePolicy.from_statements(
statements=[
iam.PolicyStatement(
actions=["lambda:InvokeFunction"],
effect=iam.Effect.ALLOW,
resources=[self.granule_function.function_arn],
)
]
)
),
timeout=cdk.Duration.minutes(15),
on_create=custom_resources.AwsSdkCall(
service="Lambda",
action="invoke",
parameters={
"FunctionName": self.granule_function.function_name,
"InvocationType": "Event",
},
physical_resource_id=custom_resources.PhysicalResourceId.of(
"JobSenderTriggerPhysicalId"
),
),
)

if pipeline.sns:
self.granule_topic = sns.Topic.from_topic_arn(
self,
f"{stack_name}_GranuleSNS",
topic_arn=pipeline.sns,
)
self.sns_subscription = sns_subscriptions.SqsSubscription(
queue=self.granule_queue,
)
self.granule_topic.add_subscription(self.sns_subscription)

if pipeline.inventory_location:
self.athena_results_bucket = s3.Bucket(
Expand Down
3 changes: 1 addition & 2 deletions aws_asdi_pipelines/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ class ComputeEnum(str, Enum):

class Pipeline(BaseModel):
id: str
arco: bool
collection: str
compute: ComputeEnum
secret_arn: str
ingestor_url: str
queue: bool
sns: Optional[str]
inventory_location: Optional[str]
initial_chunk: Optional[str]
Expand Down
Empty file.
69 changes: 69 additions & 0 deletions aws_asdi_pipelines/pipelines/aws_noaa_oisst_avhrr_only/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import json
import os
from urllib.parse import urljoin

import fsspec
import pystac
import requests
import xarray as xr
import xstac

from aws_asdi_pipelines.cognito.utils import get_token


def create_stac(collection: bool) -> pystac.STACObject:
if collection:
template_type = "collection-template.json"
else:
template_type = "item-template.json"
template_file = os.path.join(os.path.dirname(__file__), template_type)
template = json.load(open(template_file))

url = "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr"
fs = fsspec.filesystem(
"reference",
fo=f"{url}/reference.json",
remote_protocol="s3",
remote_options={"anon": True},
)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={"consolidated": False})

stac = xstac.xarray_to_stac(
ds,
template,
temporal_dimension="time",
x_dimension="lon",
y_dimension="lat",
reference_system="4326",
validate=True,
)
stac.remove_links(pystac.RelType.SELF)
stac.remove_links(pystac.RelType.ROOT)
return stac


def post_ingestor(stac: pystac.STACObject, url: str, headers):
response = requests.post(url=url, data=json.dumps(stac.to_dict()), headers=headers)
try:
response.raise_for_status()
except Exception:
print(response.text)
raise


def handler(event, context):
domain = os.environ["DOMAIN"]
client_secret = os.environ["CLIENT_SECRET"]
client_id = os.environ["CLIENT_ID"]
scope = os.environ["SCOPE"]
ingestor_url = os.environ["INGESTOR_URL"]
token = get_token(
domain=domain, client_secret=client_secret, client_id=client_id, scope=scope
)
headers = {"Authorization": f"bearer {token}"}
collection = create_stac(collection=True)
post_ingestor(collection, urljoin(ingestor_url, "collections"), headers)
item = create_stac(collection=False)
item.collection_id = collection.id
post_ingestor(item, urljoin(ingestor_url, "ingestions"), headers)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{
"type": "Collection",
"id": "aws-noaa-oisst-avhrr-only",
"stac_version": "1.0.0",
"description": "{{ collection.description }}",
"extent": {
"spatial": {
"bbox": [
[-180, -90, 180, 90]
]
},
"temporal": {
"interval": [[null, null]]
}
},
"links": [
{
"rel": "license",
"href": "https://www.ncei.noaa.gov/pub/data/sds/cdr/CDRs/Sea_Surface_Temperature_Optimum_Interpolation/UseAgreement_01B-09.pdf",
"title": "Public Domain"
}
],
"stac_extensions": [
"https://stac-extensions.github.io/item-assets/v1.0.0/schema.json",
"https://stac-extensions.github.io/xarray-assets/v1.0.0/schema.json",
"https://stac-extensions.github.io/scientific/v1.0.0/schema.json"
],
"title": "aws-noaa-oisst-avhrr-only",
"providers": [
{
"name": "Development Seed",
"roles": [
"processor"
],
"url": "https://developmentseed.org/"
},
{
"name": "AWS",
"roles": [
"host",
"processor"
],
"url": "https://registry.opendata.aws/noaa-cdr-oceanic/"
},
{
"name": "NOAA",
"roles": [
"producer"
],
"url": "https://www.ncei.noaa.gov/products/climate-data-records/sea-surface-temperature-optimum-interpolation"
}
],
"item_assets": {
"zarr-https": {
"title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root",
"description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.",
"roles": [
"data",
"zarr",
"https"
],
"type": "application/vnd+zarr",
"xarray:open_kwargs": {
"consolidated": true
}
}
},
"assets": {
"zarr-https": {
"href": "https://ncsa.osn.xsede.org/Pangeo/pangeo-forge/pangeo-forge/aws-noaa-oisst-feedstock/aws-noaa-oisst-avhrr-only.zarr/reference.json",
"title": "aws-noaa-oisst-avhrr-only HTTPS Zarr root",
"description": "HTTPS URI of the aws-noaa-oisst-avhrr-only Zarr Group on OSN.",
"roles": [
"data",
"zarr",
"https"
],
"type": "application/vnd+zarr",
"xarray:open_kwargs": {
"consolidated": true
}
}
},
"keywords": [
"SST",
"Temperature",
"Climate"
],
"sci:doi": "10.25921/RE9P-PT57",
"sci:citation": "Huang, Boyin; Liu, Chunying; Banzon, Viva F.; Freeman, Eric; Graham, Garrett; Hankins, Bill; Smith, Thomas M.; Zhang, Huai-Min. (2020): NOAA 0.25-degree Daily Optimum Interpolation Sea Surface Temperature (OISST), Version 2.1. [indicate subset used]. NOAA National Centers for Environmental Information.",
"license": "proprietary"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
id: "aws_noaa_oisst_avhrr_only"
compute: "awslambda"
secret_arn: "arn:aws:secretsmanager:eu-central-1:751953039297:secret:asdi-auth-stack-alukach/asdi-workflows-6awSos"
ingestor_url: "https://crtx3178aa.execute-api.us-west-2.amazonaws.com/dev"
queue: false
Loading