Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import java.lang.reflect.Type;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;

/**
* Represents an event that should be handled by a background function. This is an internal format
Expand Down Expand Up @@ -53,6 +55,34 @@ public Event deserialize(
context =
jsonDeserializationContext.deserialize(
adjustContextResource(contextCopy), CloudFunctionsContext.class);
} else if (isPubSubEmulatorPayload(root)) {
JsonObject message = root.getAsJsonObject("message");

String timestampString =
message.has("publishTime")
? message.get("publishTime").getAsString()
: DateTimeFormatter.ISO_INSTANT.format(OffsetDateTime.now());

context =
CloudFunctionsContext.builder()
.setEventType("google.pubsub.topic.publish")
.setTimestamp(timestampString)
.setEventId(message.get("messageId").getAsString())
.setResource(
"{"
+ "\"name\":null,"
+ "\"service\":\"pubsub.googleapis.com\","
+ "\"type\":\"type.googleapis.com/google.pubsub.v1.PubsubMessage\""
+ "}")
.build();

JsonObject marshalledData = new JsonObject();
marshalledData.addProperty("@type", "type.googleapis.com/google.pubsub.v1.PubsubMessage");
marshalledData.add("data", message.get("data"));
if (message.has("attributes")) {
marshalledData.add("attributes", message.get("attributes"));
}
data = marshalledData;
} else {
JsonObject rootCopy = root.deepCopy();
rootCopy.remove("data");
Expand All @@ -63,6 +93,14 @@ public Event deserialize(
return Event.of(data, context);
}

private boolean isPubSubEmulatorPayload(JsonObject root) {
if (root.has("subscription") && root.has("message") && root.get("message").isJsonObject()) {
JsonObject message = root.getAsJsonObject("message");
return message.has("data") && message.has("messageId");
}
return false;
}

/**
* Replaces 'resource' member from context JSON with its string equivalent. The original
* 'resource' member can be a JSON object itself while {@link CloudFunctionsContext} requires it
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.google.cloud.functions.invoker;

import static com.google.cloud.functions.invoker.BackgroundFunctionExecutor.backgroundFunctionTypeArgument;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth8.assertThat;

import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -20,7 +25,8 @@ private static class PubSubMessage {
}

private static class PubSubFunction implements BackgroundFunction<PubSubMessage> {
@Override public void accept(PubSubMessage payload, Context context) {}
@Override
public void accept(PubSubMessage payload, Context context) {}
}

@Test
Expand All @@ -31,7 +37,8 @@ public void backgroundFunctionTypeArgument_simple() {
private abstract static class Parent implements BackgroundFunction<PubSubMessage> {}

private static class Child extends Parent {
@Override public void accept(PubSubMessage payload, Context context) {}
@Override
public void accept(PubSubMessage payload, Context context) {}
}

@Test
Expand All @@ -42,7 +49,8 @@ public void backgroundFunctionTypeArgument_superclass() {
private interface GenericParent<T> extends BackgroundFunction<T> {}

private static class GenericChild implements GenericParent<PubSubMessage> {
@Override public void accept(PubSubMessage payload, Context context) {}
@Override
public void accept(PubSubMessage payload, Context context) {}
}

@Test
Expand All @@ -52,7 +60,8 @@ public void backgroundFunctionTypeArgument_genericInterface() {

@SuppressWarnings("rawtypes")
private static class ForgotTypeParameter implements BackgroundFunction {
@Override public void accept(Object payload, Context context) {}
@Override
public void accept(Object payload, Context context) {}
}

@Test
Expand All @@ -62,4 +71,41 @@ public void backgroundFunctionTypeArgument_raw() {
(Class<? extends BackgroundFunction<?>>) (Class<?>) ForgotTypeParameter.class;
assertThat(backgroundFunctionTypeArgument(c)).isEmpty();
}

@Test
public void parseLegacyEventPubSub() throws IOException {
try (Reader reader =
new InputStreamReader(getClass().getResourceAsStream("/pubsub_background.json"))) {
Event event = BackgroundFunctionExecutor.parseLegacyEvent(reader);

Context context = event.getContext();
assertThat(context.eventType()).isEqualTo("google.pubsub.topic.publish");
assertThat(context.eventId()).isEqualTo("1");
assertThat(context.timestamp()).isEqualTo("2021-06-28T05:46:32.390Z");

JsonObject data = event.getData().getAsJsonObject();
assertThat(data.get("data").getAsString()).isEqualTo("eyJmb28iOiJiYXIifQ==");
String attr = data.get("attributes").getAsJsonObject().get("test").getAsString();
assertThat(attr).isEqualTo("123");
}
}

@Test
public void parseLegacyEventPubSubEmulator() throws IOException {
try (Reader reader =
new InputStreamReader(getClass().getResourceAsStream("/pubsub_emulator.json"))) {
Event event = BackgroundFunctionExecutor.parseLegacyEvent(reader);

Context context = event.getContext();
assertThat(context.eventType()).isEqualTo("google.pubsub.topic.publish");
assertThat(context.eventId()).isEqualTo("1");
assertThat(context.timestamp()).isNotNull();
;

JsonObject data = event.getData().getAsJsonObject();
assertThat(data.get("data").getAsString()).isEqualTo("eyJmb28iOiJiYXIifQ==");
String attr = data.get("attributes").getAsJsonObject().get("test").getAsString();
assertThat(attr).isEqualTo("123");
}
}
}
19 changes: 19 additions & 0 deletions invoker/core/src/test/resources/pubsub_background.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"data": {
"@type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",
"data": "eyJmb28iOiJiYXIifQ==",
"attributes": {
"test": "123"
}
},
"context": {
"eventId": "1",
"eventType": "google.pubsub.topic.publish",
"resource": {
"name": "projects/FOO/topics/BAR_TOPIC",
"service": "pubsub.googleapis.com",
"type": "type.googleapis.com/google.pubsub.v1.PubsubMessage"
},
"timestamp": "2021-06-28T05:46:32.390Z"
}
}
10 changes: 10 additions & 0 deletions invoker/core/src/test/resources/pubsub_emulator.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"subscription": "projects/FOO/subscriptions/BAR_SUB",
"message": {
"data": "eyJmb28iOiJiYXIifQ==",
"messageId": "1",
"attributes": {
"test": "123"
}
}
}