Skip to content

Apache Kafka Channel Example(docs/eventing/samples/kafka/channel/index.md) #3077

@xtreme-sameer-vohra

Description

@xtreme-sameer-vohra

Issue 1: Confusing Header

Expected Behavior

Creating a KafkaChannel channel instance

Actual Behavior

Creating a KafkaChannel channel CRD

Issue 2: Verify topic

Expected Behavior

kubectl -n kafka exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --zookeeper localhost:2181 --list
...
knative-messaging-kafka.default.testchannel-one
...

Actual Behavior

kubectl -n kafka exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --zookeeper localhost:2181 --list
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
	at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:262)
	at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:258)
	at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:119)
	at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1865)
	at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:360)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)
command terminated with exit code 1

Tried using the svc reference my-cluster-zookeeper-client, however, encountered the same problem.

$ k -n kafka get all
NAME                                              READY   STATUS    RESTARTS   AGE
pod/my-cluster-entity-operator-79877fb799-d5vzd   3/3     Running   0          23h
pod/my-cluster-kafka-0                            1/1     Running   0          23h
pod/my-cluster-zookeeper-0                        1/1     Running   0          23h
pod/my-cluster-zookeeper-1                        1/1     Running   0          23h
pod/my-cluster-zookeeper-2                        1/1     Running   0          23h
pod/strimzi-cluster-operator-79bdc6bf6b-tgtdj     1/1     Running   0          23h

NAME                                  TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)                      AGE
service/my-cluster-kafka-bootstrap    ClusterIP   10.4.33.114   <none>        9091/TCP,9092/TCP,9093/TCP   23h
service/my-cluster-kafka-brokers      ClusterIP   None          <none>        9091/TCP,9092/TCP,9093/TCP   23h
service/my-cluster-zookeeper-client   ClusterIP   10.4.45.188   <none>        2181/TCP                     23h
service/my-cluster-zookeeper-nodes    ClusterIP   None          <none>        2181/TCP,2888/TCP,3888/TCP   23h

NAME                                         READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-cluster-entity-operator   1/1     1            1           23h
deployment.apps/strimzi-cluster-operator     1/1     1            1           23h

NAME                                                    DESIRED   CURRENT   READY   AGE
replicaset.apps/my-cluster-entity-operator-79877fb799   1         1         1       23h
replicaset.apps/strimzi-cluster-operator-79bdc6bf6b     1         1         1       23h

NAME                                    READY   AGE
statefulset.apps/my-cluster-kafka       1/1     23h
statefulset.apps/my-cluster-zookeeper   3/3     23h

Issue 3: Creating a broker that uses the defaults

Expected Behavior

NAME                                          URL                                                                                         AGE    READY   REASON
broker.eventing.knative.dev/my-kafka-broker   http://broker-ingress.knative-eventing.svc.cluster.local/kafka-channel-ex/my-kafka-broker   113s   True

Had to update config-br-default-channel to produce Kafka brokers by default. The docs only mention updating default-ch-webhook, which I assume doesn't apply to brokers and only applies to channels.

After creating a Kafka broker, I didn't see any pods being created in the same namespace as the broker as mentioned in the docs. However, I did see pods in the knative-eventing namespace.

Actual Behavior

kubectl create -f - <<EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
 name: default
EOF

Produced an in memory broker

NAME                                                                URL                                                                                AGE   READY   REASON
inmemorychannel.messaging.knative.dev/my-kafka-broker-kne-trigger   http://my-kafka-broker-kne-trigger-kn-channel.kafka-channel-ex.svc.cluster.local   7s    True

Steps to Reproduce the Problem

  1. Setup a cluster on GKE using In Memory Channels and Broker
  2. Followed guide here

Additional Info

My Knative was using in memory as the default and I added kafka subsequently and modified the config maps to use Kafka

Install information:

  • Platform (GKE, IKS, AKS, etc.): GKE
  • Knative Version: 0.19

Metadata

Metadata

Assignees

Labels

kind/bugCategorizes issue or PR as related to a bug.kind/eventinglifecycle/staleDenotes an issue or PR has remained open with no activity and has become stale.priority/mediumtriage/needs-eng-inputEngineering input is requested

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions