From c1374af63a8c440c848db482ee7febbc0def7265 Mon Sep 17 00:00:00 2001 From: Matthias Wessendorf Date: Fri, 8 Mar 2019 12:06:25 +0100 Subject: [PATCH] Add replica config --- contrib/kafka/pkg/controller/channel/reconcile.go | 13 +++++++++++-- .../kafka/pkg/controller/channel/reconcile_test.go | 11 +++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/contrib/kafka/pkg/controller/channel/reconcile.go b/contrib/kafka/pkg/controller/channel/reconcile.go index 1af5da72042..3935c534f88 100644 --- a/contrib/kafka/pkg/controller/channel/reconcile.go +++ b/contrib/kafka/pkg/controller/channel/reconcile.go @@ -43,11 +43,16 @@ import ( const ( finalizerName = controllerAgentName + // DefaultNumPartitions defines the default number of partitions DefaultNumPartitions = 1 + + // DefaultReplicationFactor defines the default number of replications + DefaultReplicationFactor = 1 ) type channelArgs struct { - NumPartitions int32 + NumPartitions int32 + ReplicationFactor int16 } // Reconcile compares the actual state with the desired, and attempts to @@ -206,8 +211,12 @@ func (r *reconciler) provisionChannel(channel *eventingv1alpha1.Channel, kafkaCl arguments.NumPartitions = DefaultNumPartitions } + if arguments.ReplicationFactor == 0 { + arguments.ReplicationFactor = DefaultReplicationFactor + } + err := kafkaClusterAdmin.CreateTopic(topicName, &sarama.TopicDetail{ - ReplicationFactor: 1, + ReplicationFactor: arguments.ReplicationFactor, NumPartitions: arguments.NumPartitions, }, false) if err == sarama.ErrTopicAlreadyExists { diff --git a/contrib/kafka/pkg/controller/channel/reconcile_test.go b/contrib/kafka/pkg/controller/channel/reconcile_test.go index 62dec79e6e3..3e7325e6209 100644 --- a/contrib/kafka/pkg/controller/channel/reconcile_test.go +++ b/contrib/kafka/pkg/controller/channel/reconcile_test.go @@ -49,6 +49,7 @@ const ( topicPrefix = "knative-eventing-channel" testUID = "test-uid" argumentNumPartitions = "NumPartitions" + argumentReplicationFactor = "ReplicationFactor" ) var ( @@ -276,11 +277,21 @@ func TestProvisionChannel(t *testing.T) { c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: "invalid"}), wantError: fmt.Sprintf("error unmarshalling arguments: json: cannot unmarshal string into Go struct field channelArgs.%s of type int32", argumentNumPartitions), }, + { + name: "provision with invalid channel arguments - errors", + c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentReplicationFactor: "invalid"}), + wantError: fmt.Sprintf("error unmarshalling arguments: json: cannot unmarshal string into Go struct field channelArgs.%s of type int16", argumentReplicationFactor), + }, { name: "provision with nil channel arguments - errors", c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: "nil"}), wantError: fmt.Sprintf("error unmarshalling arguments: json: cannot unmarshal string into Go struct field channelArgs.%s of type int32", argumentNumPartitions), }, + { + name: "provision with nil channel arguments - errors", + c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentReplicationFactor: "nil"}), + wantError: fmt.Sprintf("error unmarshalling arguments: json: cannot unmarshal string into Go struct field channelArgs.%s of type int16", argumentReplicationFactor), + }, { name: "provision with unmarshallable channel arguments - errors", c: func() *eventingv1alpha1.Channel {