/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.pulsar; import com.carrotsearch.hppc.ObjectSet; import java.lang.reflect.Field; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @Slf4j public class SubscribeProcessController { private static final AtomicInteger state = new AtomicInteger(0); public static void request1_afterFirstThreadLock(long requestId){ if (requestId == 1001){ log.info("after first thread lock"); state.set(1); } else { log.info("request " + requestId + " executed"); } } public static void waitThreadLockStateChange(long requestId){ if (requestId == 1002){ log.info("request " + requestId + " waiting first request get lock"); while (!state.compareAndSet(1, 2)){ log.info("request " + requestId + " waiting first request get lock"); delay(); } } else if (requestId == 1003){ while (state.get() != 8){ log.info("request " + requestId + " waiting second request release lock"); delay(); } log.info("request " + requestId + " process finished"); } } public static void request1_afterAddConsumer(long requestId){ if (requestId == 1001){ while (!state.compareAndSet(2, 3)){ log.info("request " + requestId + " request1_afterAddConsumer"); delay(); } } } public static void trigFirstRequestFutureDone(long requestId){ if (requestId == 1002){ while (!state.compareAndSet(3, 4)){ log.info("request " + requestId + " trigFirstRequestFutureDone"); delay(); } } else if (requestId == 1003){ // 等待 unlock } } public static void request1_FutureDone_waitTrig(long requestId){ if (requestId == 1001){ while (!state.compareAndSet(4, 5)){ log.info("request " + requestId + " request1_FutureDone_waitTrig"); delay(); } } } public static void consumerAddFinished(long requestId, CompletableFuture> getTopicFuture){ if (requestId == 1001){ while (!state.compareAndSet(5, 6)){ log.info("request " + requestId + " trig_requestDeleteLock"); delay(); } } else if (requestId == 1003){ getTopicFuture.thenApply(optionalTopic -> { Topic topic = optionalTopic.get(); Subscription subscription = topic.getSubscriptions().values().iterator().next(); Dispatcher dispatcher = subscription.getDispatcher(); List consumerList = dispatcher.getConsumers(); log.error("Problem reproduce: "); log.error("consumerList: " + String.valueOf(consumerList)); try { Field consumerSetField = AbstractDispatcherMultipleConsumers.class.getDeclaredField("consumerSet"); consumerSetField.setAccessible(true); ObjectSet consumerSet = (ObjectSet) consumerSetField.get(dispatcher); log.error("consumerSet: " + String.valueOf(consumerSet)); } catch (Exception e){ } return null; }); } } public static void waitFirstRequestFutureDone(long requestId){ if (requestId == 1002){ while (!state.compareAndSet(6, 7)){ log.info("request " + requestId + " waitFirstRequestFutureDone"); delay(); } } } public static void request2_afterDeleteLock(long requestId){ if (requestId == 1002){ log.info("request " + requestId + " request2_afterDeleteLock"); while (!state.compareAndSet(7, 8)){ delay(); } } } public static void delay(){ try { Thread.sleep(200); } catch (InterruptedException e) { log.error("some thing error", e); } } }