Skip to content

BrokerServiceException$NamingException: Producer with name 'xxxx' is already connected to topic #151

@TimXu0713

Description

@TimXu0713

1、POM

<flink.version>1.15.0</flink.version>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
   <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>flink-connector-pulsar</artifactId>
        <version>1.15.0.1</version>
    </dependency>

2、code

     PulsarSource<String> pulsarSource = PulsarSource.builder()
            .setServiceUrl(configs.pulsar().serviceUrl())
            .setAdminUrl(configs.pulsar().adminUrl())
            .setConfig(Configuration.fromMap(configs.sourceConfigs()))
            .setStartCursor(StartCursor.earliest())
            .setTopicPattern(configs.pulsar().topicPatternSource())
            .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
            .setSubscriptionName(configs.pulsar().subscriptionName())
            .setSubscriptionType(SubscriptionType.Exclusive)
            .build();

    DataStream<JSONObject> jsonObjectStream = env.fromSource(pulsarSource,
                    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(configs.flink().autoWatermarkInterval().toMillis())),
                    "pulsar source"
            )
            .uid("pulsar source").name("pulsar source")
            .flatMap(new OggDataProcess())
            .uid("fill organizationId").name("fill organizationId");

    PulsarSink<JSONObject> pulsarSink = PulsarSink.builder()
            .setServiceUrl(configs.pulsar().serviceUrl())
            .setAdminUrl(configs.pulsar().adminUrl())
            .setConfig(Configuration.fromMap(configs.sinkConfigs()))
            .setProducerName(OggImesDataProcess.class.getSimpleName())
            .setSerializationSchema(new ImesSearializationSchema())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .setTopics(new TopicMergeExtractor())
            .setTopicRoutingMode(TopicRoutingMode.MESSAGE_KEY_HASH)
            .setConfig(PULSAR_MESSAGE_KEY_HASH, MessageKeyHash.MURMUR3_32_HASH)
            .build();
    jsonObjectStream.sinkTo(pulsarSink);

3、The program is running normal, and some data has been dynamically written to the pre-created topic. After about 20 seconds, the program reports the following error:

org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'OggImesDataProcess' is already connected to topic","reqId":1751986192750247684, "remote":"sn-platform-broker-1-broker.scc.com.cn/10.10.94.119:6651", "local":"/10.20.18.53:53355"}
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1036) ~[pulsar-client-api-2.10.0.jar:2.10.0]
	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:88) ~[pulsar-client-all-2.10.0.jar:2.10.0]
	at org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneaky(PulsarExceptionUtils.java:69) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient(PulsarExceptionUtils.java:46) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.sink.writer.topic.ProducerRegister.getOrCreateProducer(ProducerRegister.java:165) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.sink.writer.topic.ProducerRegister.createMessageBuilder(ProducerRegister.java:86) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.createMessageBuilder(PulsarWriter.java:210) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.write(PulsarWriter.java:154) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at cn.com.scc.flink.ods.OggImesDataProcess$OggDataProcess.flatMap(OggImesDataProcess.java:179) ~[classes/:?]
	at cn.com.scc.flink.ods.OggImesDataProcess$OggDataProcess.flatMap(OggImesDataProcess.java:128) ~[classes/:?]
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter$SourceOutputWrapper.collect(PulsarRecordEmitter.java:65) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaWrapper.deserialize(PulsarDeserializationSchemaWrapper.java:58) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:54) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter.emitRecord(PulsarRecordEmitter.java:36) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) ~[flink-connector-base-1.15.0.jar:1.15.0]
	at org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader.pollNext(PulsarOrderedSourceReader.java:109) ~[flink-connector-pulsar-1.15.0.1.jar:1.15.0.1]
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_333]
14:12:20.492 [flink-akka.actor.default-dispatcher-20] INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - Calculating tasks to restart to recover the failed task cfeffec7652c49d4d76661211b4a9203_1.
14:12:20.492 [flink-akka.actor.default-dispatcher-19] INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager - Received resource requirements from job f31d211c3b592fcc09ca7307646b4970: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=7}]
14:12:20.492 [SourceCoordinator-Source: pulsar source] INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator - Removing registered reader after failure for subtask 1 of source Source: pulsar source.
14:12:20.493 [flink-akka.actor.default-dispatcher-20] INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy - 1 tasks should be restarted to recover the failed task cfeffec7652c49d4d76661211b4a9203_1. 
14:12:20.493 [flink-akka.actor.default-dispatcher-20] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph - Job OggImesDataProcess (f31d211c3b592fcc09ca7307646b4970) switched from state RUNNING to RESTARTING.
14:12:20.494 [flink-akka.actor.default-dispatcher-20] WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager - Failed to trigger or complete checkpoint 6 for job f31d211c3b592fcc09ca7307646b4970. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:1900) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:46) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1505) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1109) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1081) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:590) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:369) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:345) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:328) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:303) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-runtime-1.15.0.jar:1.15.0]
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) ~[flink-runtime-1.15.0.jar:1.15.0]
	at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) ~[?:?]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_333]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_333]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) ~[?:?]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[?:?]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[?:?]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[?:?]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[?:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[?:?]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[?:?]
	at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?]
	at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?]
	at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?]
	at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_333]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) ~[?:1.8.0_333]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) ~[?:1.8.0_333]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) ~[?:1.8.0_333]

4、It looks like a bug.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions