From 8eb332c1c4c521118a185e2825d15aa075dc0c4b Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Thu, 9 Jul 2015 19:10:36 +0800 Subject: [PATCH 1/7] Add druid-rocketmq module --- extensions/druid-rocketmq/pom.xml | 50 ++ .../rocketmq/RocketMQDruidModule.java | 46 ++ .../rocketmq/RocketMQFirehoseFactory.java | 537 ++++++++++++++++++ pom.xml | 1 + 4 files changed, 634 insertions(+) create mode 100644 extensions/druid-rocketmq/pom.xml create mode 100644 extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java create mode 100644 extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java diff --git a/extensions/druid-rocketmq/pom.xml b/extensions/druid-rocketmq/pom.xml new file mode 100644 index 000000000000..a02bd1914679 --- /dev/null +++ b/extensions/druid-rocketmq/pom.xml @@ -0,0 +1,50 @@ + + + + 4.0.0 + + druid + io.druid + 0.9.0-SNAPSHOT + ../../pom.xml + + + + 3.2.6 + + + druid-rocketmq + + + + + com.alibaba.rocketmq + rocketmq-client + ${rocketmq.version} + + + + io.druid + druid-api + + + + + \ No newline at end of file diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java new file mode 100644 index 000000000000..159928912a91 --- /dev/null +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java @@ -0,0 +1,46 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.firehose.rocketmq; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +public class RocketMQDruidModule implements DruidModule { + + @Override + public List getJacksonModules() { + return ImmutableList.of( + new SimpleModule("RocketMQFirehoseModule") + .registerSubtypes( + new NamedType(RocketMQFirehoseFactory.class, "RocketMQ-3.2.6") + ) + ); + } + + @Override + public void configure(Binder binder) { + + } +} diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java new file mode 100644 index 000000000000..7d930e5c85b3 --- /dev/null +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -0,0 +1,537 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.firehose.rocketmq; + +import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; +import com.alibaba.rocketmq.client.consumer.MessageQueueListener; +import com.alibaba.rocketmq.client.consumer.PullResult; +import com.alibaba.rocketmq.client.consumer.store.OffsetStore; +import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType; +import com.alibaba.rocketmq.client.exception.MQBrokerException; +import com.alibaba.rocketmq.client.exception.MQClientException; +import com.alibaba.rocketmq.common.ServiceThread; +import com.alibaba.rocketmq.common.message.MessageExt; +import com.alibaba.rocketmq.common.message.MessageQueue; +import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; +import com.alibaba.rocketmq.remoting.exception.RemotingException; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; +import com.metamx.common.parsers.ParseException; +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; + +public class RocketMQFirehoseFactory implements FirehoseFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQFirehoseFactory.class); + + /** + * Passed in configuration for consumer client. + */ + @JsonProperty + private final Properties consumerProps; + + /** + * Consumer group. + */ + @JsonProperty + private final String consumerGroup; + + /** + * Topics to consume. + * Multiple topics are separated by comma ",". + */ + @JsonProperty + private final String feed; + + + /** + * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. + */ + private ConcurrentHashMap> messageQueueTreeSetMap = new ConcurrentHashMap<>(); + + /** + * Store message consuming status. + */ + private ConcurrentHashMap> windows = new ConcurrentHashMap<>(); + + /** + * Default pull batch size. + */ + private static final int PULL_BATCH_SIZE = 32; + + @JsonCreator + public RocketMQFirehoseFactory(@JsonProperty("consumerProps") Properties consumerProps, + @JsonProperty("consumerGroup") String consumerGroup, + @JsonProperty("feed") String feed) { + this.consumerProps = consumerProps; + for (Map.Entry configItem : this.consumerProps.entrySet()) { + System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); + } + this.consumerGroup = consumerGroup; + this.feed = feed; + } + + /** + * Check if there are locally pending messages to consume. + * @return true if there are some; false otherwise. + */ + private boolean hasMessagesPending() { + + for (ConcurrentHashMap.Entry> entry : messageQueueTreeSetMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + return true; + } + } + + return false; + } + + @Override + public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser) throws IOException, ParseException { + + Set newDimExclus = Sets.union( + byteBufferInputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), + Sets.newHashSet("feed") + ); + + final ByteBufferInputRowParser theParser = byteBufferInputRowParser.withParseSpec( + byteBufferInputRowParser.getParseSpec() + .withDimensionsSpec( + byteBufferInputRowParser.getParseSpec() + .getDimensionsSpec() + .withDimensionExclusions( + newDimExclus + ) + ) + ); + + /** + * Topic-Queue mapping. + */ + final ConcurrentHashMap> topicQueueMap; + + /** + * Default Pull-style client for RocketMQ. + */ + final DefaultMQPullConsumer defaultMQPullConsumer; + final DruidPullMessageService pullMessageService; + + messageQueueTreeSetMap.clear(); + windows.clear(); + + try { + defaultMQPullConsumer = new DefaultMQPullConsumer(this.consumerGroup); + defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); + topicQueueMap = new ConcurrentHashMap<>(); + + pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); + String[] topics = feed.split(","); + for (String topic : topics) { + topic = topic.trim(); + if (topic.isEmpty()) { + continue; + } + defaultMQPullConsumer.fetchSubscribeMessageQueues(topic); + topicQueueMap.put(topic, defaultMQPullConsumer.fetchMessageQueuesInBalance(topic)); + } + DruidMessageQueueListener druidMessageQueueListener = + new DruidMessageQueueListener(Sets.newHashSet(topics), topicQueueMap, defaultMQPullConsumer); + defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); + defaultMQPullConsumer.start(); + pullMessageService.start(); + } catch (MQClientException e) { + LOGGER.error("Failed to start DefaultMQPullConsumer", e); + throw new IOException("Failed to start RocketMQ client", e); + } + + return new Firehose() { + + @Override + public boolean hasMore() { + boolean hasMore = false; + DruidPullRequest earliestPullRequest = null; + + for (Map.Entry> entry : topicQueueMap.entrySet()) { + for (MessageQueue messageQueue : entry.getValue()) { + if (messageQueueTreeSetMap.keySet().contains(messageQueue) + && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { + hasMore = true; + } else { + DruidPullRequest newPullRequest = new DruidPullRequest(); + newPullRequest.setMessageQueue(messageQueue); + try { + long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); + newPullRequest.setNextBeginOffset(offset); + } catch (MQClientException e) { + LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); + continue; + } + newPullRequest.setLongPull(!hasMessagesPending()); + + // notify pull message service to pull messages from brokers. + pullMessageService.putRequest(newPullRequest); + + // set the earliest pull in case we need to block. + if (null == earliestPullRequest) { + earliestPullRequest = newPullRequest; + } + } + } + } + + // Block only when there is no locally pending messages. + if (!hasMore && null != earliestPullRequest) { + try { + earliestPullRequest.getCountDownLatch().await(); + hasMore = true; + } catch (InterruptedException e) { + LOGGER.error("CountDownLatch await got interrupted", e); + } + } + return hasMore; + } + + @Override + public InputRow nextRow() { + for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + MessageExt message = entry.getValue().pollFirst(); + InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody())); + + if (!windows.keySet().contains(entry.getKey())) { + windows.put(entry.getKey(), new ConcurrentSkipListSet()); + } + windows.get(entry.getKey()).add(message.getQueueOffset()); + return inputRow; + } + } + + // should never happen. + return null; + } + + @Override + public Runnable commit() { + return new Runnable() { + @Override + public void run() { + OffsetStore offsetStore = defaultMQPullConsumer.getOffsetStore(); + Set updated = new HashSet<>(); + // calculate offsets according to consuming windows. + for (ConcurrentHashMap.Entry> entry : windows.entrySet()) { + while (!entry.getValue().isEmpty()) { + + long offset = offsetStore.readOffset(entry.getKey(), ReadOffsetType.MEMORY_FIRST_THEN_STORE); + if (offset + 1 > entry.getValue().first()) { + entry.getValue().pollFirst(); + } else if (offset + 1 == entry.getValue().first()) { + entry.getValue().pollFirst(); + offsetStore.updateOffset(entry.getKey(), offset + 1, true); + updated.add(entry.getKey()); + } else { + break; + } + + } + } + offsetStore.persistAll(updated); + } + }; + } + + @Override + public void close() throws IOException { + defaultMQPullConsumer.shutdown(); + pullMessageService.shutdown(false); + } + }; + } + + + /** + * Pull request. + */ + class DruidPullRequest { + private MessageQueue messageQueue; + private String tag; + private long nextBeginOffset; + private int pullBatchSize; + private boolean longPull; + private CountDownLatch countDownLatch; + private PullResult pullResult; + private boolean successful; + + public DruidPullRequest() { + countDownLatch = new CountDownLatch(1); + tag = "*"; + pullBatchSize = PULL_BATCH_SIZE; + successful = false; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public void setMessageQueue(MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + public long getNextBeginOffset() { + return nextBeginOffset; + } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public void setNextBeginOffset(long nextBeginOffset) { + this.nextBeginOffset = nextBeginOffset; + } + + public int getPullBatchSize() { + return pullBatchSize; + } + + public void setPullBatchSize(int pullBatchSize) { + this.pullBatchSize = pullBatchSize; + } + + public boolean isLongPull() { + return longPull; + } + + public void setLongPull(boolean longPull) { + this.longPull = longPull; + } + + public CountDownLatch getCountDownLatch() { + return countDownLatch; + } + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + public PullResult getPullResult() { + return pullResult; + } + + public void setPullResult(PullResult pullResult) { + this.pullResult = pullResult; + } + + public boolean isSuccessful() { + return successful; + } + + public void setSuccessful(boolean successful) { + this.successful = successful; + } + } + + + /** + * Pull message service for druid. + * + * Note: this is a single thread service. + */ + class DruidPullMessageService extends ServiceThread { + + private volatile List requestsWrite = new ArrayList(); + private volatile List requestsRead = new ArrayList(); + + private final DefaultMQPullConsumer defaultMQPullConsumer; + + public DruidPullMessageService(final DefaultMQPullConsumer defaultMQPullConsumer) { + this.defaultMQPullConsumer = defaultMQPullConsumer; + } + + public void putRequest(final DruidPullRequest request) { + synchronized (this) { + this.requestsWrite.add(request); + if (!hasNotified) { + hasNotified = true; + notify(); + } + } + } + + private void swapRequests() { + List tmp = requestsWrite; + requestsWrite = requestsRead; + requestsRead = tmp; + } + + @Override + public String getServiceName() { + return getClass().getSimpleName(); + } + + /** + * Core message pulling logic code goes here. + */ + private void doPull() { + for (DruidPullRequest pullRequest : requestsRead) { + PullResult pullResult = null; + try { + if (!pullRequest.isLongPull()) { + pullResult = defaultMQPullConsumer.pull( + pullRequest.getMessageQueue(), + pullRequest.getTag(), + pullRequest.getNextBeginOffset(), + pullRequest.getPullBatchSize()); + } else { + pullResult = defaultMQPullConsumer.pullBlockIfNotFound( + pullRequest.getMessageQueue(), + pullRequest.getTag(), + pullRequest.getNextBeginOffset(), + pullRequest.getPullBatchSize() + ); + } + pullRequest.setPullResult(pullResult); + pullRequest.setSuccessful(true); + + if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { + messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(), + new ConcurrentSkipListSet<>(new MessageComparator())); + } + messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); + + } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("Failed to pull message from broker.", e); + } finally { + pullRequest.getCountDownLatch().countDown(); + } + + } + requestsRead.clear(); + } + + /** + * Thread looping entry. + */ + @Override + public void run() { + LOGGER.info(getServiceName() + " starts."); + while (!isStoped()) { + waitForRunning(0); + doPull(); + } + + // in case this service is shutdown gracefully without interruption. + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOGGER.error("", e); + } + + synchronized (this) { + swapRequests(); + } + + doPull(); + LOGGER.info(getServiceName() + " terminated."); + } + + @Override + protected void onWaitEnd() { + swapRequests(); + } + } + + + /** + * Compare messages pulled from same message queue according to queue offset. + */ + class MessageComparator implements Comparator { + @Override + public int compare(MessageExt lhs, MessageExt rhs) { + return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1); + } + } + + + /** + * Handle message queues re-balance operations. + */ + class DruidMessageQueueListener implements MessageQueueListener { + + private Set topics; + + private final ConcurrentHashMap> topicQueueMap; + + private final DefaultMQPullConsumer defaultMQPullConsumer; + + public DruidMessageQueueListener(final Set topics, + final ConcurrentHashMap> topicQueueMap, + final DefaultMQPullConsumer defaultMQPullConsumer) { + this.topics = topics; + this.topicQueueMap = topicQueueMap; + this.defaultMQPullConsumer = defaultMQPullConsumer; + } + + @Override + public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { + if (topics.contains(topic)) { + topicQueueMap.put(topic, mqDivided); + + // Remove message queues that are re-assigned to other clients. + Iterator>> it = + messageQueueTreeSetMap.entrySet().iterator(); + while (it.hasNext()) { + if (!mqDivided.contains(it.next().getKey())) { + it.remove(); + } + } + + StringBuilder stringBuilder = new StringBuilder(); + for (MessageQueue messageQueue : mqDivided) { + stringBuilder.append(messageQueue.getBrokerName()) + .append("#") + .append(messageQueue.getQueueId()) + .append(", "); + } + + if (LOGGER.isDebugEnabled() && stringBuilder.length() > 2) { + LOGGER.debug(String.format("%s@%s is consuming the following message queues: %s", + defaultMQPullConsumer.getClientIP(), + defaultMQPullConsumer.getInstanceName(), + stringBuilder.substring(0, stringBuilder.length() - 2) /*Remove the trailing comma*/)); + } + } + + } + } +} diff --git a/pom.xml b/pom.xml index 7bc13e8e68ac..f2c8c06fa480 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ extensions/s3-extensions extensions/kafka-eight extensions/kafka-eight-simpleConsumer + extensions/druid-rocketmq extensions/rabbitmq extensions/histogram extensions/mysql-metadata-storage From 3d46356dc9763aebf09ab1ad9ba8ba27b70c291d Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Mon, 11 Jan 2016 11:38:32 +0800 Subject: [PATCH 2/7] Update code on PR comments --- .../rocketmq/RocketMQDruidModule.java | 2 +- .../rocketmq/RocketMQFirehoseFactory.java | 179 ++++++++---------- 2 files changed, 81 insertions(+), 100 deletions(-) diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java index 159928912a91..f902ea739be1 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java @@ -34,7 +34,7 @@ public List getJacksonModules() { return ImmutableList.of( new SimpleModule("RocketMQFirehoseModule") .registerSubtypes( - new NamedType(RocketMQFirehoseFactory.class, "RocketMQ-3.2.6") + new NamedType(RocketMQFirehoseFactory.class, "RocketMQ") ) ); } diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index 7d930e5c85b3..a8ec73d37177 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -18,6 +18,7 @@ */ package io.druid.firehose.rocketmq; +import com.alibaba.rocketmq.client.Validators; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.MessageQueueListener; import com.alibaba.rocketmq.client.consumer.PullResult; @@ -54,23 +55,28 @@ public class RocketMQFirehoseFactory implements FirehoseFactory feed; + /** + * Pull batch size. It's optional. + */ + @JsonProperty + private final String pullBatchSize; /** * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. @@ -85,13 +91,15 @@ public class RocketMQFirehoseFactory implements FirehoseFactory feed, + @JsonProperty("pullBatchSize") String pullBatchSize) { this.consumerProps = consumerProps; + this.pullBatchSize = pullBatchSize; for (Map.Entry configItem : this.consumerProps.entrySet()) { System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); } @@ -153,17 +161,12 @@ public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser) throw topicQueueMap = new ConcurrentHashMap<>(); pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); - String[] topics = feed.split(","); - for (String topic : topics) { - topic = topic.trim(); - if (topic.isEmpty()) { - continue; - } - defaultMQPullConsumer.fetchSubscribeMessageQueues(topic); - topicQueueMap.put(topic, defaultMQPullConsumer.fetchMessageQueuesInBalance(topic)); + for (String topic : feed) { + Validators.checkTopic(topic); + topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic)); } DruidMessageQueueListener druidMessageQueueListener = - new DruidMessageQueueListener(Sets.newHashSet(topics), topicQueueMap, defaultMQPullConsumer); + new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer); defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); defaultMQPullConsumer.start(); pullMessageService.start(); @@ -185,23 +188,23 @@ public boolean hasMore() { && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { hasMore = true; } else { - DruidPullRequest newPullRequest = new DruidPullRequest(); - newPullRequest.setMessageQueue(messageQueue); try { long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); - newPullRequest.setNextBeginOffset(offset); - } catch (MQClientException e) { - LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); - continue; - } - newPullRequest.setLongPull(!hasMessagesPending()); + int batchSize = (null == pullBatchSize || pullBatchSize.isEmpty()) ? + DEFAULT_PULL_BATCH_SIZE:Integer.parseInt(pullBatchSize); - // notify pull message service to pull messages from brokers. - pullMessageService.putRequest(newPullRequest); + DruidPullRequest newPullRequest = new DruidPullRequest(messageQueue, null, offset, + batchSize, !hasMessagesPending()); - // set the earliest pull in case we need to block. - if (null == earliestPullRequest) { - earliestPullRequest = newPullRequest; + // notify pull message service to pull messages from brokers. + pullMessageService.putRequest(newPullRequest); + + // set the earliest pull in case we need to block. + if (null == earliestPullRequest) { + earliestPullRequest = newPullRequest; + } + } catch (MQClientException e) { + LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); } } } @@ -235,7 +238,7 @@ public InputRow nextRow() { } // should never happen. - return null; + throw new RuntimeException("Unexpected Fatal Error! There should have been one row available."); } @Override @@ -279,31 +282,31 @@ public void close() throws IOException { /** * Pull request. */ - class DruidPullRequest { - private MessageQueue messageQueue; - private String tag; - private long nextBeginOffset; - private int pullBatchSize; - private boolean longPull; - private CountDownLatch countDownLatch; - private PullResult pullResult; - private boolean successful; - - public DruidPullRequest() { + final class DruidPullRequest { + private final MessageQueue messageQueue; + private final String tag; + private final long nextBeginOffset; + private final int pullBatchSize; + private final boolean longPull; + private final CountDownLatch countDownLatch; + + public DruidPullRequest(final MessageQueue messageQueue, + final String tag, + final long nextBeginOffset, + final int pullBatchSize, + final boolean useLongPull) { + this.messageQueue = messageQueue; + this.tag = (null == tag ? "*" : tag); + this.nextBeginOffset = nextBeginOffset; + this.pullBatchSize = pullBatchSize; + this.longPull = useLongPull; countDownLatch = new CountDownLatch(1); - tag = "*"; - pullBatchSize = PULL_BATCH_SIZE; - successful = false; } public MessageQueue getMessageQueue() { return messageQueue; } - public void setMessageQueue(MessageQueue messageQueue) { - this.messageQueue = messageQueue; - } - public long getNextBeginOffset() { return nextBeginOffset; } @@ -312,53 +315,17 @@ public String getTag() { return tag; } - public void setTag(String tag) { - this.tag = tag; - } - - public void setNextBeginOffset(long nextBeginOffset) { - this.nextBeginOffset = nextBeginOffset; - } - public int getPullBatchSize() { return pullBatchSize; } - public void setPullBatchSize(int pullBatchSize) { - this.pullBatchSize = pullBatchSize; - } - public boolean isLongPull() { return longPull; } - public void setLongPull(boolean longPull) { - this.longPull = longPull; - } - public CountDownLatch getCountDownLatch() { return countDownLatch; } - - public void setCountDownLatch(CountDownLatch countDownLatch) { - this.countDownLatch = countDownLatch; - } - - public PullResult getPullResult() { - return pullResult; - } - - public void setPullResult(PullResult pullResult) { - this.pullResult = pullResult; - } - - public boolean isSuccessful() { - return successful; - } - - public void setSuccessful(boolean successful) { - this.successful = successful; - } } @@ -367,10 +334,10 @@ public void setSuccessful(boolean successful) { * * Note: this is a single thread service. */ - class DruidPullMessageService extends ServiceThread { + final class DruidPullMessageService extends ServiceThread { - private volatile List requestsWrite = new ArrayList(); - private volatile List requestsRead = new ArrayList(); + private volatile List requestsWrite = new ArrayList<>(); + private volatile List requestsRead = new ArrayList<>(); private final DefaultMQPullConsumer defaultMQPullConsumer; @@ -404,7 +371,7 @@ public String getServiceName() { */ private void doPull() { for (DruidPullRequest pullRequest : requestsRead) { - PullResult pullResult = null; + PullResult pullResult; try { if (!pullRequest.isLongPull()) { pullResult = defaultMQPullConsumer.pull( @@ -420,15 +387,29 @@ private void doPull() { pullRequest.getPullBatchSize() ); } - pullRequest.setPullResult(pullResult); - pullRequest.setSuccessful(true); - if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { - messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(), - new ConcurrentSkipListSet<>(new MessageComparator())); - } - messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); + switch(pullResult.getPullStatus()) { + case FOUND: + // Handle pull result. + if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { + messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(), + new ConcurrentSkipListSet<>(new MessageComparator())); + } + messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); + break; + + case NO_NEW_MSG: + case NO_MATCHED_MSG: + break; + case OFFSET_ILLEGAL: + LOGGER.error("Bad Pull Request: Offset is illegal. Offset used: {}", + pullRequest.getNextBeginOffset()); + break; + + default: + break; + } } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { LOGGER.error("Failed to pull message from broker.", e); } finally { @@ -475,7 +456,7 @@ protected void onWaitEnd() { /** * Compare messages pulled from same message queue according to queue offset. */ - class MessageComparator implements Comparator { + final class MessageComparator implements Comparator { @Override public int compare(MessageExt lhs, MessageExt rhs) { return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1); @@ -486,9 +467,9 @@ public int compare(MessageExt lhs, MessageExt rhs) { /** * Handle message queues re-balance operations. */ - class DruidMessageQueueListener implements MessageQueueListener { + final class DruidMessageQueueListener implements MessageQueueListener { - private Set topics; + private final Set topics; private final ConcurrentHashMap> topicQueueMap; From abe134bef699c0f301dfbf862cca57eb8c7ba59a Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Mon, 11 Jan 2016 11:50:19 +0800 Subject: [PATCH 3/7] Fix typo --- .../io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index a8ec73d37177..c2bd0b67b9f5 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -55,7 +55,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory Date: Tue, 12 Jan 2016 18:03:16 +0800 Subject: [PATCH 4/7] 1. Add newline to pom.xml 2. Change RocketMQ to rocketMQ 3. Make swapRequests methods synchronized in all places. 4. Make comparator static and final and use Long.compare. --- extensions/druid-rocketmq/pom.xml | 2 +- .../rocketmq/RocketMQDruidModule.java | 2 +- .../rocketmq/RocketMQFirehoseFactory.java | 20 +++++++++---------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/extensions/druid-rocketmq/pom.xml b/extensions/druid-rocketmq/pom.xml index a02bd1914679..4f4930eb67ca 100644 --- a/extensions/druid-rocketmq/pom.xml +++ b/extensions/druid-rocketmq/pom.xml @@ -47,4 +47,4 @@ - \ No newline at end of file + diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java index f902ea739be1..e45c1c254af2 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java @@ -34,7 +34,7 @@ public List getJacksonModules() { return ImmutableList.of( new SimpleModule("RocketMQFirehoseModule") .registerSubtypes( - new NamedType(RocketMQFirehoseFactory.class, "RocketMQ") + new NamedType(RocketMQFirehoseFactory.class, "rocketMQ") ) ); } diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index c2bd0b67b9f5..430b79e58416 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -39,8 +39,7 @@ import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.metamx.common.logger.Logger; import java.io.IOException; import java.nio.ByteBuffer; @@ -51,7 +50,7 @@ public class RocketMQFirehoseFactory implements FirehoseFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQFirehoseFactory.class); + private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); /** * Passed in configuration for consumer client. @@ -81,12 +80,13 @@ public class RocketMQFirehoseFactory implements FirehoseFactory> messageQueueTreeSetMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> messageQueueTreeSetMap = + new ConcurrentHashMap<>(); /** * Store message consuming status. */ - private ConcurrentHashMap> windows = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> windows = new ConcurrentHashMap<>(); /** * Default pull batch size. @@ -355,7 +355,7 @@ public void putRequest(final DruidPullRequest request) { } } - private void swapRequests() { + private synchronized void swapRequests() { List tmp = requestsWrite; requestsWrite = requestsRead; requestsRead = tmp; @@ -438,9 +438,7 @@ public void run() { LOGGER.error("", e); } - synchronized (this) { - swapRequests(); - } + swapRequests(); doPull(); LOGGER.info(getServiceName() + " terminated."); @@ -456,10 +454,10 @@ protected void onWaitEnd() { /** * Compare messages pulled from same message queue according to queue offset. */ - final class MessageComparator implements Comparator { + static final class MessageComparator implements Comparator { @Override public int compare(MessageExt lhs, MessageExt rhs) { - return lhs.getQueueOffset() < rhs.getQueueOffset() ? -1 : (lhs.getQueueOffset() == rhs.getQueueOffset() ? 0 : 1); + return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset()); } } From 03cdf3a8fe02dcddce76debd8b1213fc138b1626 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Tue, 12 Jan 2016 18:11:58 +0800 Subject: [PATCH 5/7] Update license text in pom.xml --- extensions/druid-rocketmq/pom.xml | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/extensions/druid-rocketmq/pom.xml b/extensions/druid-rocketmq/pom.xml index 4f4930eb67ca..0e17066dae78 100644 --- a/extensions/druid-rocketmq/pom.xml +++ b/extensions/druid-rocketmq/pom.xml @@ -1,19 +1,21 @@ Date: Tue, 12 Jan 2016 19:33:39 +0800 Subject: [PATCH 6/7] Revert adding synchronized to swapRequest method as the caller method onWaitEnd(), is only invoked in synchronized code blocks. There is no need to repeat sync it even if java's synchronized keyword holds a reentrant mutex semantic ^_^. --- .../io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index 430b79e58416..b1dffc01cdc4 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -355,7 +355,7 @@ public void putRequest(final DruidPullRequest request) { } } - private synchronized void swapRequests() { + private void swapRequests() { List tmp = requestsWrite; requestsWrite = requestsRead; requestsRead = tmp; @@ -438,7 +438,9 @@ public void run() { LOGGER.error("", e); } - swapRequests(); + synchronized (this) { + swapRequests(); + } doPull(); LOGGER.info(getServiceName() + " terminated."); From da5d7b93769124f6e65d9332dcce020163449614 Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Mon, 18 Jan 2016 16:37:03 +0800 Subject: [PATCH 7/7] Reformat code according to Druid Java and Scala style --- extensions/druid-rocketmq/pom.xml | 49 +- .../rocketmq/RocketMQDruidModule.java | 29 +- .../rocketmq/RocketMQFirehoseFactory.java | 871 ++++++++++-------- 3 files changed, 500 insertions(+), 449 deletions(-) diff --git a/extensions/druid-rocketmq/pom.xml b/extensions/druid-rocketmq/pom.xml index 0e17066dae78..adcc523744cd 100644 --- a/extensions/druid-rocketmq/pom.xml +++ b/extensions/druid-rocketmq/pom.xml @@ -20,33 +20,28 @@ - 4.0.0 - - druid - io.druid - 0.9.0-SNAPSHOT - ../../pom.xml - + 4.0.0 + + druid + io.druid + 0.9.0-SNAPSHOT + ../../pom.xml + + druid-rocketmq - - 3.2.6 - - - druid-rocketmq - - - - - com.alibaba.rocketmq - rocketmq-client - ${rocketmq.version} - - - - io.druid - druid-api - - - + + 3.2.6 + + + + com.alibaba.rocketmq + rocketmq-client + ${rocketmq.version} + + + io.druid + druid-api + + diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java index e45c1c254af2..64414bbcd9f2 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQDruidModule.java @@ -27,20 +27,23 @@ import java.util.List; -public class RocketMQDruidModule implements DruidModule { +public class RocketMQDruidModule implements DruidModule +{ - @Override - public List getJacksonModules() { - return ImmutableList.of( - new SimpleModule("RocketMQFirehoseModule") - .registerSubtypes( - new NamedType(RocketMQFirehoseFactory.class, "rocketMQ") - ) - ); - } + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("RocketMQFirehoseModule") + .registerSubtypes( + new NamedType(RocketMQFirehoseFactory.class, "rocketMQ") + ) + ); + } - @Override - public void configure(Binder binder) { + @Override + public void configure(Binder binder) + { - } + } } diff --git a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java index b1dffc01cdc4..f3dcce46dd0b 100644 --- a/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java +++ b/extensions/druid-rocketmq/src/main/java/io/druid/firehose/rocketmq/RocketMQFirehoseFactory.java @@ -48,471 +48,524 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; -public class RocketMQFirehoseFactory implements FirehoseFactory { - - private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); - - /** - * Passed in configuration for consumer client. - * This provides an approach to override default values defined in {@link com.alibaba.rocketmq.common.MixAll}. - */ - @JsonProperty - private final Properties consumerProps; - - /** - * Consumer group. It's required. - */ - @JsonProperty(required = true) - private final String consumerGroup; - - /** - * Topics to consume. It's required. - */ - @JsonProperty(required = true) - private final List feed; - - /** - * Pull batch size. It's optional. - */ - @JsonProperty - private final String pullBatchSize; +public class RocketMQFirehoseFactory implements FirehoseFactory +{ + + private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class); + + /** + * Passed in configuration for consumer client. + * This provides an approach to override default values defined in {@link com.alibaba.rocketmq.common.MixAll}. + */ + @JsonProperty + private final Properties consumerProps; + + /** + * Consumer group. It's required. + */ + @JsonProperty(required = true) + private final String consumerGroup; + + /** + * Topics to consume. It's required. + */ + @JsonProperty(required = true) + private final List feed; + + /** + * Pull batch size. It's optional. + */ + @JsonProperty + private final String pullBatchSize; + + /** + * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. + */ + private final ConcurrentHashMap> messageQueueTreeSetMap = + new ConcurrentHashMap<>(); + + /** + * Store message consuming status. + */ + private final ConcurrentHashMap> windows = new ConcurrentHashMap<>(); + + /** + * Default pull batch size. + */ + private static final int DEFAULT_PULL_BATCH_SIZE = 32; + + @JsonCreator + public RocketMQFirehoseFactory( + @JsonProperty("consumerProps") Properties consumerProps, + @JsonProperty("consumerGroup") String consumerGroup, + @JsonProperty("feed") List feed, + @JsonProperty("pullBatchSize") String pullBatchSize + ) + { + this.consumerProps = consumerProps; + this.pullBatchSize = pullBatchSize; + for (Map.Entry configItem : this.consumerProps.entrySet()) { + System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); + } + this.consumerGroup = consumerGroup; + this.feed = feed; + } + + /** + * Check if there are locally pending messages to consume. + * + * @return true if there are some; false otherwise. + */ + private boolean hasMessagesPending() + { + + for (ConcurrentHashMap.Entry> entry : messageQueueTreeSetMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + return true; + } + } - /** - * Store messages that are fetched from brokers but not yet delivered to druid via fire hose. - */ - private final ConcurrentHashMap> messageQueueTreeSetMap = - new ConcurrentHashMap<>(); + return false; + } + + @Override + public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser) throws IOException, ParseException + { + + Set newDimExclus = Sets.union( + byteBufferInputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), + Sets.newHashSet("feed") + ); + + final ByteBufferInputRowParser theParser = byteBufferInputRowParser.withParseSpec( + byteBufferInputRowParser.getParseSpec() + .withDimensionsSpec( + byteBufferInputRowParser.getParseSpec() + .getDimensionsSpec() + .withDimensionExclusions( + newDimExclus + ) + ) + ); /** - * Store message consuming status. + * Topic-Queue mapping. */ - private final ConcurrentHashMap> windows = new ConcurrentHashMap<>(); + final ConcurrentHashMap> topicQueueMap; /** - * Default pull batch size. + * Default Pull-style client for RocketMQ. */ - private static final int DEFAULT_PULL_BATCH_SIZE = 32; - - @JsonCreator - public RocketMQFirehoseFactory(@JsonProperty("consumerProps") Properties consumerProps, - @JsonProperty("consumerGroup") String consumerGroup, - @JsonProperty("feed") List feed, - @JsonProperty("pullBatchSize") String pullBatchSize) { - this.consumerProps = consumerProps; - this.pullBatchSize = pullBatchSize; - for (Map.Entry configItem : this.consumerProps.entrySet()) { - System.setProperty(configItem.getKey().toString(), configItem.getValue().toString()); - } - this.consumerGroup = consumerGroup; - this.feed = feed; + final DefaultMQPullConsumer defaultMQPullConsumer; + final DruidPullMessageService pullMessageService; + + messageQueueTreeSetMap.clear(); + windows.clear(); + + try { + defaultMQPullConsumer = new DefaultMQPullConsumer(this.consumerGroup); + defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); + topicQueueMap = new ConcurrentHashMap<>(); + + pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); + for (String topic : feed) { + Validators.checkTopic(topic); + topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic)); + } + DruidMessageQueueListener druidMessageQueueListener = + new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer); + defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); + defaultMQPullConsumer.start(); + pullMessageService.start(); + } + catch (MQClientException e) { + LOGGER.error("Failed to start DefaultMQPullConsumer", e); + throw new IOException("Failed to start RocketMQ client", e); } - /** - * Check if there are locally pending messages to consume. - * @return true if there are some; false otherwise. - */ - private boolean hasMessagesPending() { - - for (ConcurrentHashMap.Entry> entry : messageQueueTreeSetMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - return true; + return new Firehose() + { + + @Override + public boolean hasMore() + { + boolean hasMore = false; + DruidPullRequest earliestPullRequest = null; + + for (Map.Entry> entry : topicQueueMap.entrySet()) { + for (MessageQueue messageQueue : entry.getValue()) { + if (messageQueueTreeSetMap.keySet().contains(messageQueue) + && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { + hasMore = true; + } else { + try { + long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); + int batchSize = (null == pullBatchSize || pullBatchSize.isEmpty()) ? + DEFAULT_PULL_BATCH_SIZE : Integer.parseInt(pullBatchSize); + + DruidPullRequest newPullRequest = new DruidPullRequest(messageQueue, null, offset, + batchSize, !hasMessagesPending() + ); + + // notify pull message service to pull messages from brokers. + pullMessageService.putRequest(newPullRequest); + + // set the earliest pull in case we need to block. + if (null == earliestPullRequest) { + earliestPullRequest = newPullRequest; + } + } + catch (MQClientException e) { + LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); + } } + } } - return false; - } - - @Override - public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser) throws IOException, ParseException { - - Set newDimExclus = Sets.union( - byteBufferInputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), - Sets.newHashSet("feed") - ); - - final ByteBufferInputRowParser theParser = byteBufferInputRowParser.withParseSpec( - byteBufferInputRowParser.getParseSpec() - .withDimensionsSpec( - byteBufferInputRowParser.getParseSpec() - .getDimensionsSpec() - .withDimensionExclusions( - newDimExclus - ) - ) - ); - - /** - * Topic-Queue mapping. - */ - final ConcurrentHashMap> topicQueueMap; - - /** - * Default Pull-style client for RocketMQ. - */ - final DefaultMQPullConsumer defaultMQPullConsumer; - final DruidPullMessageService pullMessageService; - - messageQueueTreeSetMap.clear(); - windows.clear(); - - try { - defaultMQPullConsumer = new DefaultMQPullConsumer(this.consumerGroup); - defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING); - topicQueueMap = new ConcurrentHashMap<>(); - - pullMessageService = new DruidPullMessageService(defaultMQPullConsumer); - for (String topic : feed) { - Validators.checkTopic(topic); - topicQueueMap.put(topic, defaultMQPullConsumer.fetchSubscribeMessageQueues(topic)); + // Block only when there is no locally pending messages. + if (!hasMore && null != earliestPullRequest) { + try { + earliestPullRequest.getCountDownLatch().await(); + hasMore = true; + } + catch (InterruptedException e) { + LOGGER.error("CountDownLatch await got interrupted", e); + } + } + return hasMore; + } + + @Override + public InputRow nextRow() + { + for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + MessageExt message = entry.getValue().pollFirst(); + InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody())); + + if (!windows.keySet().contains(entry.getKey())) { + windows.put(entry.getKey(), new ConcurrentSkipListSet()); } - DruidMessageQueueListener druidMessageQueueListener = - new DruidMessageQueueListener(Sets.newHashSet(feed), topicQueueMap, defaultMQPullConsumer); - defaultMQPullConsumer.setMessageQueueListener(druidMessageQueueListener); - defaultMQPullConsumer.start(); - pullMessageService.start(); - } catch (MQClientException e) { - LOGGER.error("Failed to start DefaultMQPullConsumer", e); - throw new IOException("Failed to start RocketMQ client", e); + windows.get(entry.getKey()).add(message.getQueueOffset()); + return inputRow; + } } - return new Firehose() { - - @Override - public boolean hasMore() { - boolean hasMore = false; - DruidPullRequest earliestPullRequest = null; - - for (Map.Entry> entry : topicQueueMap.entrySet()) { - for (MessageQueue messageQueue : entry.getValue()) { - if (messageQueueTreeSetMap.keySet().contains(messageQueue) - && !messageQueueTreeSetMap.get(messageQueue).isEmpty()) { - hasMore = true; - } else { - try { - long offset = defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false); - int batchSize = (null == pullBatchSize || pullBatchSize.isEmpty()) ? - DEFAULT_PULL_BATCH_SIZE:Integer.parseInt(pullBatchSize); - - DruidPullRequest newPullRequest = new DruidPullRequest(messageQueue, null, offset, - batchSize, !hasMessagesPending()); - - // notify pull message service to pull messages from brokers. - pullMessageService.putRequest(newPullRequest); - - // set the earliest pull in case we need to block. - if (null == earliestPullRequest) { - earliestPullRequest = newPullRequest; - } - } catch (MQClientException e) { - LOGGER.error("Failed to fetch consume offset for queue: {}", entry.getKey()); - } - } - } + // should never happen. + throw new RuntimeException("Unexpected Fatal Error! There should have been one row available."); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + OffsetStore offsetStore = defaultMQPullConsumer.getOffsetStore(); + Set updated = new HashSet<>(); + // calculate offsets according to consuming windows. + for (ConcurrentHashMap.Entry> entry : windows.entrySet()) { + while (!entry.getValue().isEmpty()) { + + long offset = offsetStore.readOffset(entry.getKey(), ReadOffsetType.MEMORY_FIRST_THEN_STORE); + if (offset + 1 > entry.getValue().first()) { + entry.getValue().pollFirst(); + } else if (offset + 1 == entry.getValue().first()) { + entry.getValue().pollFirst(); + offsetStore.updateOffset(entry.getKey(), offset + 1, true); + updated.add(entry.getKey()); + } else { + break; } - // Block only when there is no locally pending messages. - if (!hasMore && null != earliestPullRequest) { - try { - earliestPullRequest.getCountDownLatch().await(); - hasMore = true; - } catch (InterruptedException e) { - LOGGER.error("CountDownLatch await got interrupted", e); - } - } - return hasMore; + } } + offsetStore.persistAll(updated); + } + }; + } + + @Override + public void close() throws IOException + { + defaultMQPullConsumer.shutdown(); + pullMessageService.shutdown(false); + } + }; + } + + + /** + * Pull request. + */ + final class DruidPullRequest + { + private final MessageQueue messageQueue; + private final String tag; + private final long nextBeginOffset; + private final int pullBatchSize; + private final boolean longPull; + private final CountDownLatch countDownLatch; + + public DruidPullRequest( + final MessageQueue messageQueue, + final String tag, + final long nextBeginOffset, + final int pullBatchSize, + final boolean useLongPull + ) + { + this.messageQueue = messageQueue; + this.tag = (null == tag ? "*" : tag); + this.nextBeginOffset = nextBeginOffset; + this.pullBatchSize = pullBatchSize; + this.longPull = useLongPull; + countDownLatch = new CountDownLatch(1); + } - @Override - public InputRow nextRow() { - for (Map.Entry> entry : messageQueueTreeSetMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - MessageExt message = entry.getValue().pollFirst(); - InputRow inputRow = theParser.parse(ByteBuffer.wrap(message.getBody())); - - if (!windows.keySet().contains(entry.getKey())) { - windows.put(entry.getKey(), new ConcurrentSkipListSet()); - } - windows.get(entry.getKey()).add(message.getQueueOffset()); - return inputRow; - } - } + public MessageQueue getMessageQueue() + { + return messageQueue; + } - // should never happen. - throw new RuntimeException("Unexpected Fatal Error! There should have been one row available."); - } + public long getNextBeginOffset() + { + return nextBeginOffset; + } - @Override - public Runnable commit() { - return new Runnable() { - @Override - public void run() { - OffsetStore offsetStore = defaultMQPullConsumer.getOffsetStore(); - Set updated = new HashSet<>(); - // calculate offsets according to consuming windows. - for (ConcurrentHashMap.Entry> entry : windows.entrySet()) { - while (!entry.getValue().isEmpty()) { - - long offset = offsetStore.readOffset(entry.getKey(), ReadOffsetType.MEMORY_FIRST_THEN_STORE); - if (offset + 1 > entry.getValue().first()) { - entry.getValue().pollFirst(); - } else if (offset + 1 == entry.getValue().first()) { - entry.getValue().pollFirst(); - offsetStore.updateOffset(entry.getKey(), offset + 1, true); - updated.add(entry.getKey()); - } else { - break; - } - - } - } - offsetStore.persistAll(updated); - } - }; - } + public String getTag() + { + return tag; + } - @Override - public void close() throws IOException { - defaultMQPullConsumer.shutdown(); - pullMessageService.shutdown(false); - } - }; + public int getPullBatchSize() + { + return pullBatchSize; } + public boolean isLongPull() + { + return longPull; + } - /** - * Pull request. - */ - final class DruidPullRequest { - private final MessageQueue messageQueue; - private final String tag; - private final long nextBeginOffset; - private final int pullBatchSize; - private final boolean longPull; - private final CountDownLatch countDownLatch; - - public DruidPullRequest(final MessageQueue messageQueue, - final String tag, - final long nextBeginOffset, - final int pullBatchSize, - final boolean useLongPull) { - this.messageQueue = messageQueue; - this.tag = (null == tag ? "*" : tag); - this.nextBeginOffset = nextBeginOffset; - this.pullBatchSize = pullBatchSize; - this.longPull = useLongPull; - countDownLatch = new CountDownLatch(1); - } + public CountDownLatch getCountDownLatch() + { + return countDownLatch; + } + } - public MessageQueue getMessageQueue() { - return messageQueue; - } - public long getNextBeginOffset() { - return nextBeginOffset; - } + /** + * Pull message service for druid. + *

+ * Note: this is a single thread service. + */ + final class DruidPullMessageService extends ServiceThread + { - public String getTag() { - return tag; - } + private volatile List requestsWrite = new ArrayList<>(); + private volatile List requestsRead = new ArrayList<>(); - public int getPullBatchSize() { - return pullBatchSize; - } + private final DefaultMQPullConsumer defaultMQPullConsumer; - public boolean isLongPull() { - return longPull; - } + public DruidPullMessageService(final DefaultMQPullConsumer defaultMQPullConsumer) + { + this.defaultMQPullConsumer = defaultMQPullConsumer; + } - public CountDownLatch getCountDownLatch() { - return countDownLatch; + public void putRequest(final DruidPullRequest request) + { + synchronized (this) { + this.requestsWrite.add(request); + if (!hasNotified) { + hasNotified = true; + notify(); } + } } + private void swapRequests() + { + List tmp = requestsWrite; + requestsWrite = requestsRead; + requestsRead = tmp; + } + + @Override + public String getServiceName() + { + return getClass().getSimpleName(); + } /** - * Pull message service for druid. - * - * Note: this is a single thread service. + * Core message pulling logic code goes here. */ - final class DruidPullMessageService extends ServiceThread { - - private volatile List requestsWrite = new ArrayList<>(); - private volatile List requestsRead = new ArrayList<>(); - - private final DefaultMQPullConsumer defaultMQPullConsumer; - - public DruidPullMessageService(final DefaultMQPullConsumer defaultMQPullConsumer) { - this.defaultMQPullConsumer = defaultMQPullConsumer; - } - - public void putRequest(final DruidPullRequest request) { - synchronized (this) { - this.requestsWrite.add(request); - if (!hasNotified) { - hasNotified = true; - notify(); - } - } - } - - private void swapRequests() { - List tmp = requestsWrite; - requestsWrite = requestsRead; - requestsRead = tmp; - } - - @Override - public String getServiceName() { - return getClass().getSimpleName(); + private void doPull() + { + for (DruidPullRequest pullRequest : requestsRead) { + PullResult pullResult; + try { + if (!pullRequest.isLongPull()) { + pullResult = defaultMQPullConsumer.pull( + pullRequest.getMessageQueue(), + pullRequest.getTag(), + pullRequest.getNextBeginOffset(), + pullRequest.getPullBatchSize() + ); + } else { + pullResult = defaultMQPullConsumer.pullBlockIfNotFound( + pullRequest.getMessageQueue(), + pullRequest.getTag(), + pullRequest.getNextBeginOffset(), + pullRequest.getPullBatchSize() + ); + } + + switch (pullResult.getPullStatus()) { + case FOUND: + // Handle pull result. + if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { + messageQueueTreeSetMap.putIfAbsent( + pullRequest.getMessageQueue(), + new ConcurrentSkipListSet<>(new MessageComparator()) + ); + } + messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); + break; + + case NO_NEW_MSG: + case NO_MATCHED_MSG: + break; + + case OFFSET_ILLEGAL: + LOGGER.error( + "Bad Pull Request: Offset is illegal. Offset used: {}", + pullRequest.getNextBeginOffset() + ); + break; + + default: + break; + } } - - /** - * Core message pulling logic code goes here. - */ - private void doPull() { - for (DruidPullRequest pullRequest : requestsRead) { - PullResult pullResult; - try { - if (!pullRequest.isLongPull()) { - pullResult = defaultMQPullConsumer.pull( - pullRequest.getMessageQueue(), - pullRequest.getTag(), - pullRequest.getNextBeginOffset(), - pullRequest.getPullBatchSize()); - } else { - pullResult = defaultMQPullConsumer.pullBlockIfNotFound( - pullRequest.getMessageQueue(), - pullRequest.getTag(), - pullRequest.getNextBeginOffset(), - pullRequest.getPullBatchSize() - ); - } - - switch(pullResult.getPullStatus()) { - case FOUND: - // Handle pull result. - if (!messageQueueTreeSetMap.keySet().contains(pullRequest.getMessageQueue())) { - messageQueueTreeSetMap.putIfAbsent(pullRequest.getMessageQueue(), - new ConcurrentSkipListSet<>(new MessageComparator())); - } - messageQueueTreeSetMap.get(pullRequest.getMessageQueue()).addAll(pullResult.getMsgFoundList()); - break; - - case NO_NEW_MSG: - case NO_MATCHED_MSG: - break; - - case OFFSET_ILLEGAL: - LOGGER.error("Bad Pull Request: Offset is illegal. Offset used: {}", - pullRequest.getNextBeginOffset()); - break; - - default: - break; - } - } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { - LOGGER.error("Failed to pull message from broker.", e); - } finally { - pullRequest.getCountDownLatch().countDown(); - } - - } - requestsRead.clear(); + catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + LOGGER.error("Failed to pull message from broker.", e); } - - /** - * Thread looping entry. - */ - @Override - public void run() { - LOGGER.info(getServiceName() + " starts."); - while (!isStoped()) { - waitForRunning(0); - doPull(); - } - - // in case this service is shutdown gracefully without interruption. - try { - Thread.sleep(10); - } catch (InterruptedException e) { - LOGGER.error("", e); - } - - synchronized (this) { - swapRequests(); - } - - doPull(); - LOGGER.info(getServiceName() + " terminated."); + finally { + pullRequest.getCountDownLatch().countDown(); } - @Override - protected void onWaitEnd() { - swapRequests(); - } + } + requestsRead.clear(); } - /** - * Compare messages pulled from same message queue according to queue offset. + * Thread looping entry. */ - static final class MessageComparator implements Comparator { - @Override - public int compare(MessageExt lhs, MessageExt rhs) { - return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset()); - } + @Override + public void run() + { + LOGGER.info(getServiceName() + " starts."); + while (!isStoped()) { + waitForRunning(0); + doPull(); + } + + // in case this service is shutdown gracefully without interruption. + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + LOGGER.error("", e); + } + + synchronized (this) { + swapRequests(); + } + + doPull(); + LOGGER.info(getServiceName() + " terminated."); } + @Override + protected void onWaitEnd() + { + swapRequests(); + } + } - /** - * Handle message queues re-balance operations. - */ - final class DruidMessageQueueListener implements MessageQueueListener { - private final Set topics; + /** + * Compare messages pulled from same message queue according to queue offset. + */ + static final class MessageComparator implements Comparator + { + @Override + public int compare(MessageExt lhs, MessageExt rhs) + { + return Long.compare(lhs.getQueueOffset(), lhs.getQueueOffset()); + } + } - private final ConcurrentHashMap> topicQueueMap; - private final DefaultMQPullConsumer defaultMQPullConsumer; + /** + * Handle message queues re-balance operations. + */ + final class DruidMessageQueueListener implements MessageQueueListener + { - public DruidMessageQueueListener(final Set topics, - final ConcurrentHashMap> topicQueueMap, - final DefaultMQPullConsumer defaultMQPullConsumer) { - this.topics = topics; - this.topicQueueMap = topicQueueMap; - this.defaultMQPullConsumer = defaultMQPullConsumer; - } + private final Set topics; - @Override - public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { - if (topics.contains(topic)) { - topicQueueMap.put(topic, mqDivided); - - // Remove message queues that are re-assigned to other clients. - Iterator>> it = - messageQueueTreeSetMap.entrySet().iterator(); - while (it.hasNext()) { - if (!mqDivided.contains(it.next().getKey())) { - it.remove(); - } - } + private final ConcurrentHashMap> topicQueueMap; - StringBuilder stringBuilder = new StringBuilder(); - for (MessageQueue messageQueue : mqDivided) { - stringBuilder.append(messageQueue.getBrokerName()) - .append("#") - .append(messageQueue.getQueueId()) - .append(", "); - } + private final DefaultMQPullConsumer defaultMQPullConsumer; - if (LOGGER.isDebugEnabled() && stringBuilder.length() > 2) { - LOGGER.debug(String.format("%s@%s is consuming the following message queues: %s", - defaultMQPullConsumer.getClientIP(), - defaultMQPullConsumer.getInstanceName(), - stringBuilder.substring(0, stringBuilder.length() - 2) /*Remove the trailing comma*/)); - } - } + public DruidMessageQueueListener( + final Set topics, + final ConcurrentHashMap> topicQueueMap, + final DefaultMQPullConsumer defaultMQPullConsumer + ) + { + this.topics = topics; + this.topicQueueMap = topicQueueMap; + this.defaultMQPullConsumer = defaultMQPullConsumer; + } + @Override + public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) + { + if (topics.contains(topic)) { + topicQueueMap.put(topic, mqDivided); + + // Remove message queues that are re-assigned to other clients. + Iterator>> it = + messageQueueTreeSetMap.entrySet().iterator(); + while (it.hasNext()) { + if (!mqDivided.contains(it.next().getKey())) { + it.remove(); + } + } + + StringBuilder stringBuilder = new StringBuilder(); + for (MessageQueue messageQueue : mqDivided) { + stringBuilder.append(messageQueue.getBrokerName()) + .append("#") + .append(messageQueue.getQueueId()) + .append(", "); } + + if (LOGGER.isDebugEnabled() && stringBuilder.length() > 2) { + LOGGER.debug(String.format( + "%s@%s is consuming the following message queues: %s", + defaultMQPullConsumer.getClientIP(), + defaultMQPullConsumer.getInstanceName(), + stringBuilder.substring(0, stringBuilder.length() - 2) /*Remove the trailing comma*/ + )); + } + } + } + } }