diff --git a/api/v1alpha1/source_types.go b/api/v1alpha1/source_types.go index ed14cb5f5..daba57645 100644 --- a/api/v1alpha1/source_types.go +++ b/api/v1alpha1/source_types.go @@ -40,14 +40,14 @@ type SourceSpec struct { // +kubebuilder:validation:Optional // +kubebuilder:pruning:PreserveUnknownFields - SourceConfig *Config `json:"sourceConfig,omitempty"` - Resources corev1.ResourceRequirements `json:"resources,omitempty"` - SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` - ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` - RuntimeFlags string `json:"runtimeFlags,omitempty"` - VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` - - Pod PodPolicy `json:"pod,omitempty"` + SourceConfig *Config `json:"sourceConfig,omitempty"` + Resources corev1.ResourceRequirements `json:"resources,omitempty"` + SecretsMap map[string]SecretRef `json:"secretsMap,omitempty"` + ProcessingGuarantee ProcessGuarantee `json:"processingGuarantee,omitempty"` + RuntimeFlags string `json:"runtimeFlags,omitempty"` + VolumeMounts []corev1.VolumeMount `json:"volumeMounts,omitempty"` + ForwardSourceMessageProperty *bool `json:"forwardSourceMessageProperty,omitempty"` + Pod PodPolicy `json:"pod,omitempty"` Messaging `json:",inline"` Runtime `json:",inline"` diff --git a/api/v1alpha1/source_webhook.go b/api/v1alpha1/source_webhook.go index e86c21342..ee6c34db9 100644 --- a/api/v1alpha1/source_webhook.go +++ b/api/v1alpha1/source_webhook.go @@ -82,6 +82,11 @@ func (r *Source) Default() { } } + if r.Spec.ForwardSourceMessageProperty == nil { + trueVal := true + r.Spec.ForwardSourceMessageProperty = &trueVal + } + if r.Spec.Output.ProducerConf == nil { producerConf := &ProducerConfig{ MaxPendingMessages: 1000, diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index cfb070faf..20a45b24a 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -930,6 +930,11 @@ func (in *SourceSpec) DeepCopyInto(out *SourceSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ForwardSourceMessageProperty != nil { + in, out := &in.ForwardSourceMessageProperty, &out.ForwardSourceMessageProperty + *out = new(bool) + **out = **in + } in.Pod.DeepCopyInto(&out.Pod) in.Messaging.DeepCopyInto(&out.Messaging) in.Runtime.DeepCopyInto(&out.Runtime) diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 8952a3331..dfdec7497 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -4450,6 +4450,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: diff --git a/config/crd/bases/compute.functionmesh.io_sources.yaml b/config/crd/bases/compute.functionmesh.io_sources.yaml index 5b336b952..006d7cf83 100644 --- a/config/crd/bases/compute.functionmesh.io_sources.yaml +++ b/config/crd/bases/compute.functionmesh.io_sources.yaml @@ -36,6 +36,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: diff --git a/config/samples/compute_v1alpha1_source.yaml b/config/samples/compute_v1alpha1_source.yaml index 71c2ec564..e735bd23c 100644 --- a/config/samples/compute_v1alpha1_source.yaml +++ b/config/samples/compute_v1alpha1_source.yaml @@ -13,6 +13,7 @@ spec: useThreadLocalProducers: true topic: persistent://public/default/destination typeClassName: org.apache.pulsar.common.schema.KeyValue + forwardSourceMessageProperty: true resources: limits: cpu: "0.2" diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index c518e2222..d82f15f2e 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -245,12 +245,17 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec { BatchBuilder: source.Spec.Output.ProducerConf.BatchBuilder, } } + var forward = false + if source.Spec.ForwardSourceMessageProperty != nil { + forward = *source.Spec.ForwardSourceMessageProperty + } return &proto.SinkSpec{ - TypeClassName: source.Spec.Output.TypeClassName, - Topic: source.Spec.Output.Topic, - ProducerSpec: &producerSpec, - SerDeClassName: source.Spec.Output.SinkSerdeClassName, - SchemaType: source.Spec.Output.SinkSchemaType, + TypeClassName: source.Spec.Output.TypeClassName, + Topic: source.Spec.Output.Topic, + ProducerSpec: &producerSpec, + SerDeClassName: source.Spec.Output.SinkSerdeClassName, + SchemaType: source.Spec.Output.SinkSchemaType, + ForwardSourceMessageProperty: forward, } } diff --git a/manifests/crd.yaml b/manifests/crd.yaml index 88aac7718..ab34957c6 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -4448,6 +4448,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: @@ -11212,6 +11214,8 @@ spec: type: string clusterName: type: string + forwardSourceMessageProperty: + type: boolean golang: properties: go: diff --git a/mesh-worker-service/integration-tests/docker/Dockerfile b/mesh-worker-service/integration-tests/docker/Dockerfile index 1fa788300..1e1103c4e 100644 --- a/mesh-worker-service/integration-tests/docker/Dockerfile +++ b/mesh-worker-service/integration-tests/docker/Dockerfile @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + FROM streamnative/pulsar-all:2.8.0.7 COPY ./target/mesh-worker-service*.nar /pulsar/mesh-worker-service.nar COPY ./integration-tests/docker/connectors.yaml /pulsar/conf/connectors.yaml diff --git a/mesh-worker-service/integration-tests/docker/connectors.yaml b/mesh-worker-service/integration-tests/docker/connectors.yaml index e94f416ba..8d10b4e9c 100644 --- a/mesh-worker-service/integration-tests/docker/connectors.yaml +++ b/mesh-worker-service/integration-tests/docker/connectors.yaml @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. # + - id: pulsar-io-data-generator name: data-generator description: Test data generator connector diff --git a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java index fb29ed09e..9904519bb 100644 --- a/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java +++ b/mesh-worker-service/src/main/java/io/functionmesh/compute/util/SourcesUtil.java @@ -131,6 +131,7 @@ public static V1alpha1Source createV1alpha1SourceFromSourceConfig(String kind, S if (Strings.isNotEmpty(functionDetails.getSink().getSchemaType())) { v1alpha1SourceSpecOutput.setSinkSchemaType(functionDetails.getSink().getSchemaType()); } + v1alpha1SourceSpec.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty()); // process ProducerConf V1alpha1SourceSpecOutputProducerConf v1alpha1SourceSpecOutputProducerConf = new V1alpha1SourceSpecOutputProducerConf();