-
Notifications
You must be signed in to change notification settings - Fork 14
feat(rds): support PostgreSQL aws_lambda extension (#800) #802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
ce26914
feat(rds): support PostgreSQL aws_lambda extension (#800)
vieiralucas 9f4df1b
fix(rds): address Cubic findings on aws_lambda extension
vieiralucas 17ddfe5
fix(rds): bind postgres json params via SQL literals in aws_lambda e2e
vieiralucas 52c80d7
fix(rds): switch composite-typed aws_lambda.invoke wrapper to LANGUAG…
vieiralucas 26779e7
fix(sdk-typescript): raise rds e2e timeout for postgres image build
vieiralucas 447b5de
fix(sdk-typescript): prettier format e2e test
vieiralucas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| //! End-to-end tests for the RDS PostgreSQL `aws_lambda` extension. | ||
| //! | ||
| //! Drives a full happy path: create a Lambda, create a Postgres DB | ||
| //! instance (which triggers the lazy build of `fakecloud-postgres`), | ||
| //! connect via tokio_postgres, run `CREATE EXTENSION aws_lambda CASCADE`, | ||
| //! and call `aws_lambda.invoke()` with both a name and an | ||
| //! `aws_commons.create_lambda_function_arn` composite. Async (`Event`) | ||
| //! invocation path is exercised too. | ||
|
|
||
| mod helpers; | ||
|
|
||
| use std::io::Write; | ||
|
|
||
| use aws_sdk_lambda::primitives::Blob; | ||
| use helpers::TestServer; | ||
| use tokio_postgres::NoTls; | ||
|
|
||
| fn make_echo_zip() -> Vec<u8> { | ||
| // Returns the raw event back to the caller so we can verify the | ||
| // payload round-trips through plpython3u + the bridge endpoint. | ||
| let buf = Vec::new(); | ||
| let cursor = std::io::Cursor::new(buf); | ||
| let mut writer = zip::ZipWriter::new(cursor); | ||
| let options = zip::write::SimpleFileOptions::default(); | ||
| writer.start_file("index.py", options).unwrap(); | ||
| writer | ||
| .write_all(b"def handler(event, context):\n return event\n") | ||
| .unwrap(); | ||
| let cursor = writer.finish().unwrap(); | ||
| cursor.into_inner() | ||
| } | ||
|
|
||
| async fn connect_with_retry( | ||
| host: &str, | ||
| port: i32, | ||
| user: &str, | ||
| password: &str, | ||
| dbname: &str, | ||
| ) -> tokio_postgres::Client { | ||
| let connection_string = | ||
| format!("host={host} port={port} user={user} password={password} dbname={dbname}"); | ||
| let mut last_error = None; | ||
| for _ in 0..30 { | ||
| match tokio_postgres::connect(&connection_string, NoTls).await { | ||
| Ok((client, connection)) => { | ||
| tokio::spawn(async move { | ||
| let _ = connection.await; | ||
| }); | ||
| return client; | ||
| } | ||
| Err(error) => { | ||
| last_error = Some(error); | ||
| tokio::time::sleep(std::time::Duration::from_millis(500)).await; | ||
| } | ||
| } | ||
| } | ||
| panic!( | ||
| "could not connect to postgres at {}:{}: {:?}", | ||
| host, port, last_error | ||
| ); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn aws_lambda_extension_invoke_round_trip() { | ||
| let server = TestServer::start().await; | ||
| let lambda = server.lambda_client().await; | ||
| let rds = server.rds_client().await; | ||
|
|
||
| // 1. Create the echo Lambda. | ||
| lambda | ||
| .create_function() | ||
| .function_name("echo") | ||
| .runtime(aws_sdk_lambda::types::Runtime::Python312) | ||
| .role("arn:aws:iam::000000000000:role/test-role") | ||
| .handler("index.handler") | ||
| .code( | ||
| aws_sdk_lambda::types::FunctionCode::builder() | ||
| .zip_file(Blob::new(make_echo_zip())) | ||
| .build(), | ||
| ) | ||
| .send() | ||
| .await | ||
| .expect("create echo lambda"); | ||
|
|
||
| // 2. Create the Postgres DB instance — triggers lazy fakecloud-postgres | ||
| // image build on first run, so this can take a while. | ||
| let create = rds | ||
| .create_db_instance() | ||
| .db_instance_identifier("aws-lambda-ext-db") | ||
| .allocated_storage(20) | ||
| .db_instance_class("db.t3.micro") | ||
| .engine("postgres") | ||
| .engine_version("16.3") | ||
| .master_username("admin") | ||
| .master_user_password("secret123") | ||
| .db_name("appdb") | ||
| .send() | ||
| .await | ||
| .expect("create postgres instance"); | ||
|
|
||
| let endpoint = create | ||
| .db_instance() | ||
| .and_then(|i| i.endpoint()) | ||
| .expect("endpoint"); | ||
| let host = endpoint.address().expect("address").to_string(); | ||
| let port = endpoint.port().expect("port"); | ||
|
|
||
| // 3. Connect and load the extension. | ||
| let client = connect_with_retry(&host, port, "admin", "secret123", "appdb").await; | ||
| client | ||
| .simple_query("CREATE EXTENSION IF NOT EXISTS aws_lambda CASCADE") | ||
| .await | ||
| .expect("load aws_lambda extension"); | ||
|
|
||
| // tokio-postgres in this workspace doesn't ship the with-serde_json-1 | ||
| // feature, so we can't bind a Rust `&str` to a postgres `json` | ||
| // parameter. Embed payloads as SQL literals (test fixtures, no | ||
| // injection concern) and cast result `payload` to text on the wire. | ||
|
|
||
| // 4. Sync invoke by function name + json payload — payload round-trips. | ||
| let row = client | ||
| .query_one( | ||
| "SELECT status_code, payload::text \ | ||
| FROM aws_lambda.invoke('echo', '{\"hello\":\"world\"}'::json)", | ||
| &[], | ||
| ) | ||
| .await | ||
| .expect("invoke by name"); | ||
| let status_code: i32 = row.get(0); | ||
| let payload_text: String = row.get(1); | ||
| let payload: serde_json::Value = serde_json::from_str(&payload_text).unwrap(); | ||
| assert_eq!(status_code, 200); | ||
| assert_eq!(payload, serde_json::json!({"hello": "world"})); | ||
|
|
||
| // 5. aws_commons.create_lambda_function_arn returns a composite type. | ||
| let arn_row = client | ||
| .query_one( | ||
| "SELECT (aws_commons.create_lambda_function_arn('echo')).function_name", | ||
| &[], | ||
| ) | ||
| .await | ||
| .expect("create_lambda_function_arn"); | ||
| let function_name: String = arn_row.get(0); | ||
| assert_eq!(function_name, "echo"); | ||
|
|
||
| // 6. Sync invoke via the composite-typed overload. | ||
| let row = client | ||
| .query_one( | ||
| "SELECT status_code, payload::text FROM aws_lambda.invoke(\ | ||
| aws_commons.create_lambda_function_arn('echo'), '{\"k\":1}'::json)", | ||
| &[], | ||
| ) | ||
| .await | ||
| .expect("invoke via composite arn"); | ||
| let status_code: i32 = row.get(0); | ||
| let payload_text: String = row.get(1); | ||
| let payload: serde_json::Value = serde_json::from_str(&payload_text).unwrap(); | ||
| assert_eq!(status_code, 200); | ||
| assert_eq!(payload, serde_json::json!({"k": 1})); | ||
|
|
||
| // 7. Async (Event) invocation returns 202 immediately. | ||
| let row = client | ||
| .query_one( | ||
| "SELECT status_code, payload::text FROM aws_lambda.invoke(\ | ||
| 'echo', '{\"async\":true}'::json, NULL, 'Event')", | ||
| &[], | ||
| ) | ||
| .await | ||
| .expect("invoke async"); | ||
| let status_code: i32 = row.get(0); | ||
| let payload_text: Option<String> = row.get(1); | ||
| assert_eq!(status_code, 202); | ||
| assert!(payload_text.is_none()); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| ARG PG_VERSION=16 | ||
| FROM postgres:${PG_VERSION} | ||
| ARG PG_VERSION | ||
|
|
||
| RUN apt-get update \ | ||
| && apt-get install -y --no-install-recommends \ | ||
| postgresql-plpython3-${PG_VERSION} \ | ||
| ca-certificates \ | ||
| && rm -rf /var/lib/apt/lists/* | ||
|
|
||
| COPY aws_commons.control aws_commons--1.0.sql \ | ||
| aws_lambda.control aws_lambda--1.0.sql \ | ||
| /usr/share/postgresql/${PG_VERSION}/extension/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| -- aws_commons extension v1.0 (fakecloud) | ||
| -- Provides composite types and helpers used by aws_lambda and aws_s3 RDS extensions. | ||
|
|
||
| \echo Use "CREATE EXTENSION aws_commons" to load this file. \quit | ||
|
|
||
| CREATE TYPE aws_commons._lambda_function_arn_1 AS ( | ||
| function_name text, | ||
| region text | ||
| ); | ||
|
|
||
| CREATE FUNCTION aws_commons.create_lambda_function_arn( | ||
| function_name text, | ||
| region text DEFAULT NULL | ||
| ) RETURNS aws_commons._lambda_function_arn_1 | ||
| LANGUAGE plpgsql IMMUTABLE | ||
| AS $$ | ||
| DECLARE | ||
| result aws_commons._lambda_function_arn_1; | ||
| BEGIN | ||
| result.function_name := function_name; | ||
| result.region := region; | ||
| RETURN result; | ||
| END; | ||
| $$; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| comment = 'AWS commons types and helpers (fakecloud)' | ||
| default_version = '1.0' | ||
| relocatable = false | ||
| schema = 'aws_commons' |
101 changes: 101 additions & 0 deletions
101
crates/fakecloud-rds/assets/postgres/aws_lambda--1.0.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| -- aws_lambda extension v1.0 (fakecloud) | ||
| -- Calls fakecloud Lambda invocations through a host bridge endpoint. | ||
|
|
||
| \echo Use "CREATE EXTENSION aws_lambda CASCADE" to load this file. \quit | ||
|
|
||
| CREATE FUNCTION aws_lambda.invoke( | ||
| function_name text, | ||
| payload json, | ||
| region text DEFAULT NULL, | ||
| invocation_type text DEFAULT 'RequestResponse' | ||
| ) RETURNS TABLE( | ||
| status_code integer, | ||
| payload json, | ||
| executed_version text, | ||
| log_result text | ||
| ) | ||
| LANGUAGE plpython3u | ||
| AS $$ | ||
| import json | ||
| import os | ||
| import urllib.request | ||
| import urllib.error | ||
|
|
||
| endpoint = os.environ.get('FAKECLOUD_ENDPOINT') | ||
| if not endpoint: | ||
| plpy.error('aws_lambda: FAKECLOUD_ENDPOINT not set on the database container') | ||
|
|
||
| account_id = os.environ.get('FAKECLOUD_ACCOUNT_ID', '000000000000') | ||
| default_region = os.environ.get('FAKECLOUD_REGION', 'us-east-1') | ||
|
|
||
| body = { | ||
| 'function_name': function_name, | ||
| 'payload': json.loads(payload) if payload is not None else None, | ||
| 'invocation_type': invocation_type, | ||
| 'region': region or default_region, | ||
| } | ||
|
|
||
| req = urllib.request.Request( | ||
| endpoint.rstrip('/') + '/_fakecloud/rds/lambda-invoke', | ||
| data=json.dumps(body).encode('utf-8'), | ||
| headers={ | ||
| 'Content-Type': 'application/json', | ||
| 'X-Fakecloud-Account-Id': account_id, | ||
| }, | ||
| method='POST', | ||
| ) | ||
|
|
||
| http_status = None | ||
| try: | ||
| with urllib.request.urlopen(req, timeout=300) as resp: | ||
| raw = resp.read() | ||
| http_status = resp.status | ||
| except urllib.error.HTTPError as e: | ||
| raw = e.read() | ||
| http_status = e.code | ||
| except Exception as e: | ||
| plpy.error('aws_lambda: bridge call failed: {}'.format(e)) | ||
|
|
||
| try: | ||
| parsed = json.loads(raw) | ||
| except ValueError: | ||
| parsed = { | ||
| 'status_code': http_status, | ||
| 'payload': {'errorMessage': raw.decode('utf-8', errors='replace')}, | ||
| } | ||
|
|
||
| status_code = parsed.get('status_code') | ||
| if status_code is None: | ||
| status_code = http_status if http_status is not None else 0 | ||
|
|
||
| return [( | ||
| int(status_code), | ||
| json.dumps(parsed.get('payload')) if parsed.get('payload') is not None else None, | ||
| parsed.get('executed_version'), | ||
| parsed.get('log_result'), | ||
| )] | ||
| $$; | ||
|
|
||
| -- LANGUAGE SQL keeps the user-facing arg name `payload` even though it | ||
| -- also names a RETURNS TABLE column. PL/pgSQL would reject that as a | ||
| -- duplicate identifier in the function namespace; SQL doesn't. | ||
| CREATE FUNCTION aws_lambda.invoke( | ||
| function_name aws_commons._lambda_function_arn_1, | ||
| payload json, | ||
| region text DEFAULT NULL, | ||
| invocation_type text DEFAULT 'RequestResponse' | ||
| ) RETURNS TABLE( | ||
| status_code integer, | ||
| payload json, | ||
| executed_version text, | ||
| log_result text | ||
| ) | ||
| LANGUAGE SQL | ||
| AS $$ | ||
| SELECT * FROM aws_lambda.invoke( | ||
| (function_name).function_name, | ||
| payload, | ||
| COALESCE(region, (function_name).region), | ||
| invocation_type | ||
| ); | ||
| $$; | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| comment = 'AWS Lambda invocation from PostgreSQL (fakecloud)' | ||
| default_version = '1.0' | ||
| relocatable = false | ||
| schema = 'aws_lambda' | ||
| requires = 'plpython3u, aws_commons' |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.