From 2297287783779c2d0961f78e7ca3b52374381f63 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 9 Aug 2021 08:58:26 -0700 Subject: [PATCH 1/3] enable source connector to set ForwardMessageProperty --- api/v1alpha1/source_types.go | 16 ++++++++-------- api/v1alpha1/source_webhook.go | 5 +++++ api/v1alpha1/zz_generated.deepcopy.go | 5 +++++ .../compute.functionmesh.io_functionmeshes.yaml | 2 ++ .../bases/compute.functionmesh.io_sources.yaml | 2 ++ config/samples/compute_v1alpha1_source.yaml | 1 + controllers/spec/utils.go | 11 ++++++----- manifests/crd.yaml | 4 ++++ 8 files changed, 33 insertions(+), 13 deletions(-) diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index ed14cb5f5..daba57645 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -40,14 +40,14 @@ type SourceSpec struct { // +kubebuilder:validation:Optional // +kubebuilder:pruning:PreserveUnknownFields - SourceConfig *Config `json:"sourceConfig,omitempty"` - Resources corev1.ResourceRequirements `json:"resources,omitempty"` - SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` - ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` - RuntimeFlags string `json:"runtimeFlags,omitempty"` - VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` - - Pod PodPolicy `json:"pod,omitempty"` + SourceConfig *Config `json:"sourceConfig,omitempty"` + Resources corev1.ResourceRequirements `json:"resources,omitempty"` + SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` + ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` + RuntimeFlags string `json:"runtimeFlags,omitempty"` + VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` + ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"` + Pod PodPolicy `json:"pod,omitempty"` Messaging `json:",inline"` Runtime `json:",inline"` diff --git a/api/v1alpha1/source_webhook.go b/api/v1alpha1/source_webhook.go index e86c21342..ee6c34db9 100644 --- a/api/v1alpha1/source_webhook.go +++ b/api/v1alpha1/source_webhook.go @@ -82,6 +82,11 @@ func (r *Source) Default() { } } + if r.Spec.ForwardSourceMessageProperty == nil { + trueVal := true + r.Spec.ForwardSourceMessageProperty = &trueVal + } + if r.Spec.Output.ProducerConf == nil { producerConf := &ProducerConfig{ MaxPendingMessages: 1000, diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index cfb070faf..20a45b24a 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -930,6 +930,11 @@ func (in *SourceSpec) DeepCopyInto(out *SourceSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ForwardSourceMessageProperty != nil { + in, out := &in.ForwardSourceMessageProperty, &out.ForwardSourceMessageProperty + *out = new(bool) + **out = **in + } in.Pod.DeepCopyInto(&out.Pod) in.Messaging.DeepCopyInto(&out.Messaging) in.Runtime.DeepCopyInto(&out.Runtime) diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 8952a3331..dfdec7497 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -4450,6 +4450,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 5b336b952..006d7cf83 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -36,6 +36,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: diff --git a/config/samples/compute_v1alpha1_source.yaml b/config/samples/compute_v1alpha1_source.yaml index 71c2ec564..e735bd23c 100644 --- a/config/samples/compute_v1alpha1_source.yaml +++ b/config/samples/compute_v1alpha1_source.yaml @@ -13,6 +13,7 @@ spec: useThreadLocalProducers: true topic: persistent://public/default/destination typeClassName: org.apache.pulsar.common.schema.KeyValue + forwardSourceMessageProperty: true resources: limits: cpu: "0.2" diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index c518e2222..fb474bfe9 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -246,11 +246,12 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec { } } return &proto.SinkSpec{ - TypeClassName: source.Spec.Output.TypeClassName, - Topic: source.Spec.Output.Topic, - ProducerSpec: &producerSpec, - SerDeClassName: source.Spec.Output.SinkSerdeClassName, - SchemaType: source.Spec.Output.SinkSchemaType, + TypeClassName: source.Spec.Output.TypeClassName, + Topic: source.Spec.Output.Topic, + ProducerSpec: &producerSpec, + SerDeClassName: source.Spec.Output.SinkSerdeClassName, + SchemaType: source.Spec.Output.SinkSchemaType, + ForwardSourceMessageProperty: *source.Spec.ForwardSourceMessageProperty, } } diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 88aac7718..ab34957c6 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -4448,6 +4448,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: @@ -11212,6 +11214,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: From ee90ccd6e7aedb51daa014b1f969fa7c7403b753 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 9 Aug 2021 09:10:50 -0700 Subject: [PATCH 2/3] update mesh worker service --- .../integration-tests/docker/Dockerfile | 19 +++++++++++++++++++ .../integration-tests/docker/connectors.yaml | 1 + .../compute/util/SourcesUtil.java | 1 + 3 files changed, 21 insertions(+) diff --git a/mesh-worker-service/integration-tests/docker/Dockerfile b/mesh-worker-service/integration-tests/docker/Dockerfile index 1fa788300..1e1103c4e 100644 --- a/mesh-worker-service/integration-tests/docker/Dockerfile +++ b/mesh-worker-service/integration-tests/docker/Dockerfile @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + FROM streamnative/pulsar-all:2.8.0.7 COPY ./target/mesh-worker-service*.nar /pulsar/mesh-worker-service.nar COPY ./integration-tests/docker/connectors.yaml /pulsar/conf/connectors.yaml diff --git a/mesh-worker-service/integration-tests/docker/connectors.yaml b/mesh-worker-service/integration-tests/docker/connectors.yaml index e94f416ba..8d10b4e9c 100644 --- a/mesh-worker-service/integration-tests/docker/connectors.yaml +++ b/mesh-worker-service/integration-tests/docker/connectors.yaml @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. # + - id: pulsar-io-data-generator name: data-generator description: Test data generator connector diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index fb29ed09e..9904519bb 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -131,6 +131,7 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S if (Strings.isNotEmpty(functionDetails.getSink().getSchemaType())) { v1alpha1SourceSpecOutput.setSinkSchemaType(functionDetails.getSink().getSchemaType()); } + v1alpha1SourceSpec.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty()); // process ProducerConf V1alpha1SourceSpecOutputProducerConf v1alpha1SourceSpecOutputProducerConf = new V1alpha1SourceSpecOutputProducerConf(); From f580cea9ed963d835b8294de2c594ff53476ebeb Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 9 Aug 2021 19:22:31 -0700 Subject: [PATCH 3/3] handle nil --- controllers/spec/utils.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index fb474bfe9..d82f15f2e 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -245,13 +245,17 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec { BatchBuilder: source.Spec.Output.ProducerConf.BatchBuilder, } } + var forward = false + if source.Spec.ForwardSourceMessageProperty != nil { + forward = *source.Spec.ForwardSourceMessageProperty + } return &proto.SinkSpec{ TypeClassName: source.Spec.Output.TypeClassName, Topic: source.Spec.Output.Topic, ProducerSpec: &producerSpec, SerDeClassName: source.Spec.Output.SinkSerdeClassName, SchemaType: source.Spec.Output.SinkSchemaType, - ForwardSourceMessageProperty: *source.Spec.ForwardSourceMessageProperty, + ForwardSourceMessageProperty: forward, } }