From ab91dbd39437a78280a643b0d6115172c527b856 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=83=E5=AE=88?= Date: Tue, 21 Nov 2017 12:48:08 +0800 Subject: [PATCH 1/7] consume message filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 元守 --- .../com/youzan/nsq/client/ConsumerImplV2.java | 23 ++++++++ .../com/youzan/nsq/client/ProducerImplV2.java | 3 +- .../nsq/client/core/command/PubExt.java | 3 + .../client/entity/ConsumeMessageFilter.java | 33 +++++++++++ .../entity/ConsumeMessageFilterMode.java | 22 +++++++ .../youzan/nsq/client/entity/NSQConfig.java | 58 ++++++++++++++++++- .../util/ProducerWorkerThreadFactory.java | 56 ++++++++++++++++++ .../com/youzan/nsq/client/ConsumerTest.java | 50 ++++++++++++++++ .../it/youzan/nsq/client/ITTagConsumer.java | 32 ++++++++++ .../it/youzan/nsq/client/ITTagProducer.java | 26 +++++++++ src/test/resources/testng-base-tag-suite.xml | 15 +++++ 11 files changed, 319 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilter.java create mode 100644 src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilterMode.java create mode 100644 src/main/java/com/youzan/util/ProducerWorkerThreadFactory.java diff --git a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java index a172ad7..9ffe575 100644 --- a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java +++ b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static com.youzan.nsq.client.core.command.PubExt.FILTER_EXT_KEY; + /** * TODO: a description *
@@ -579,6 +581,11 @@ public void incoming(final NSQFrame frame, final NSQConnection conn) throws NSQE
                     return;
                 }
 
+                if(!checkExtFilter(message, conn)) {
+                    _finish(message, conn);
+                    return;
+                }
+
                 if (TraceLogger.isTraceLoggerEnabled() && conn.getAddress().isHA())
                     TraceLogger.trace(this, conn, message);
                 if (this.config.isOrdered()
@@ -594,6 +601,18 @@ public void incoming(final NSQFrame frame, final NSQConnection conn) throws NSQE
         }
     }
 
+    protected boolean checkExtFilter(final NSQMessage message, final NSQConnection conn) {
+        String filterKey = this.config.getConsumeMessageFilterKey();
+        if (conn.isExtend() && StringUtils.isNotBlank(filterKey)) {
+            String filterDataInHeader = (String) message.getExtByName(filterKey);
+            String filterDataInConf = this.config.getConsumeMessageFilterValue();
+            if(!this.config.getConsumeMessageFilterMode().getFilter().apply(filterDataInConf, filterDataInHeader)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     private void processMessage(final NSQMessage message, final NSQConnection connection) {
         if (logger.isDebugEnabled()) {
             logger.debug(message.toString());
@@ -957,6 +976,10 @@ public void finish(final NSQMessage message) throws NSQException {
             return;
         }
         final NSQConnection conn = address_2_conn.get(message.getAddress());
+        _finish(message, conn);
+    }
+
+    private void _finish(final NSQMessage message, final NSQConnection conn) throws NSQNoConnectionException {
         if (conn != null) {
             if (conn.getId() == message.getConnectionID().intValue()) {
                 if (conn.isConnected()) {
diff --git a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
index 0b25900..7c197fa 100644
--- a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
@@ -14,6 +14,7 @@
 import com.youzan.util.HostUtil;
 import com.youzan.util.IOUtil;
 import com.youzan.util.NamedThreadFactory;
+import com.youzan.util.ProducerWorkerThreadFactory;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.slf4j.Logger;
@@ -50,7 +51,7 @@ public class ProducerImplV2 implements Producer {
     private final KeyedPooledConnectionFactory factory;
     private GenericKeyedObjectPool bigPool = null;
     private final ScheduledExecutorService scheduler = Executors
-            .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getName(), Thread.NORM_PRIORITY));
+            .newSingleThreadScheduledExecutor(new ProducerWorkerThreadFactory(this.getClass().getName(), Thread.NORM_PRIORITY));
 
 
     private final ExecutorService pubExec;
diff --git a/src/main/java/com/youzan/nsq/client/core/command/PubExt.java b/src/main/java/com/youzan/nsq/client/core/command/PubExt.java
index 99d83b8..1a252a5 100644
--- a/src/main/java/com/youzan/nsq/client/core/command/PubExt.java
+++ b/src/main/java/com/youzan/nsq/client/core/command/PubExt.java
@@ -18,6 +18,9 @@ public class PubExt extends Pub {
     private byte[] jsonHeaderBytes;
     public static final String CLIENT_TAG_KEY = "##client_dispatch_tag";
     public static final String TRACE_ID_KEY = "##trace_id";
+    public static final String FILTER_EXT_KEY = "filter_ext_key";
+    public static final String FILTER_DATA = "filter_data";
+
 
     /**
      * @param msg message object
diff --git a/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilter.java b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilter.java
new file mode 100644
index 0000000..24e1fcd
--- /dev/null
+++ b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilter.java
@@ -0,0 +1,33 @@
+package com.youzan.nsq.client.entity;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Created by lin on 17/11/20.
+ */
+public interface ConsumeMessageFilter {
+    ConsumeMessageFilter EXACT_MATCH_FILTER = new ConsumeMessageFilter() {
+
+        @Override
+        public boolean apply(String filter, String filterVal) {
+            if(StringUtils.isBlank(filter)) {
+                return true;
+            } else if (filter.equals(filterVal)) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public int getType() {
+            return 1;
+        }
+
+
+    };
+
+    boolean apply(String filter, String filterVal);
+
+    int getType();
+}
diff --git a/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilterMode.java b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilterMode.java
new file mode 100644
index 0000000..1557aaa
--- /dev/null
+++ b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilterMode.java
@@ -0,0 +1,22 @@
+package com.youzan.nsq.client.entity;
+
+import static com.youzan.nsq.client.entity.ConsumeMessageFilter.EXACT_MATCH_FILTER;
+
+/**
+ * Created by lin on 17/11/20.
+ */
+public enum ConsumeMessageFilterMode {
+    EXACT_MATCH(EXACT_MATCH_FILTER);
+//    REGEXP_MATCH,
+//    GLOB_MATCH
+
+    private ConsumeMessageFilter currentMessageFilter;
+
+    ConsumeMessageFilterMode(ConsumeMessageFilter filter){
+        currentMessageFilter = filter;
+    }
+
+    public ConsumeMessageFilter getFilter() {
+        return this.currentMessageFilter;
+    }
+}
diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
index a396028..c364c48 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
@@ -8,6 +8,8 @@
 import com.youzan.util.NotThreadSafe;
 import com.youzan.util.SystemUtil;
 import io.netty.handler.ssl.SslContext;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -16,6 +18,8 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.commons.lang3.tuple.Pair.of;
+
 /**
  * It is used for Producer or Consumer, and not both two.
  *
@@ -796,7 +800,11 @@ public String identify(boolean topicExt) {
             buffer.append("\"desired_tag\":\"" + tag.getTagName() + "\",");
         }
         buffer.append("\"extend_support\":" + topicExt + ",");
-        buffer.append("\"user_agent\": \"" + userAgent + "\"}");
+        buffer.append("\"user_agent\": \"" + userAgent + "\"");
+        //ext header feature
+        if(StringUtils.isNotBlank(this.getConsumeMessageFilterKey()))
+            buffer.append(", \"ext_filter\":{\"type\":" + this.getConsumeMessageFilterMode().getFilter().getType() + ", \"filter_ext_key\":\"" + this.getConsumeMessageFilterKey() + "\", \"filter_data\":\"" + this.getConsumeMessageFilterValue() + "\"}");
+        buffer.append("}");
         return buffer.toString();
     }
 
@@ -995,6 +1003,54 @@ public int getPublishWorkerPoolSize() {
         return this.producerPoolSize;
     }
 
+    //consume message filter value default value is null, which means no filter applied
+    private Pair consumeMsgFilterKV = null;
+
+    /**
+     * set consume message filter value, default value is null, which means there is NO message filter applied for
+     * message consumer which has current {@link NSQConfig} applied.
+     * @param filterVal
+     * @return NSQConfig
+     */
+    public NSQConfig setConsumeMessageFilter(String key, String filterVal) {
+        this.consumeMsgFilterKV = of(key, filterVal);
+        return this;
+    }
+
+    public String getConsumeMessageFilterKey() {
+        if(null != this.consumeMsgFilterKV)
+            return this.consumeMsgFilterKV.getKey();
+        else
+            return "";
+    }
+
+    public String getConsumeMessageFilterValue() {
+        if(null != this.consumeMsgFilterKV)
+            return this.consumeMsgFilterKV.getValue();
+        else
+            return "";
+    }
+
+    private ConsumeMessageFilterMode consumeMessageFilterMode = ConsumeMessageFilterMode.EXACT_MATCH;
+
+    /**
+     * specify message consume filter, default value is {@link ConsumeMessageFilterMode#EXACT_MATCH}
+     * @param filterMode
+     * @return {@link NSQConfig}
+     */
+    public NSQConfig setConsumeMessageFilterMode(ConsumeMessageFilterMode filterMode) {
+        if(null == filterMode) {
+            logger.error("null filter mode is not allowed ");
+        } else {
+            this.consumeMessageFilterMode = filterMode;
+        }
+        return this;
+    }
+
+    public ConsumeMessageFilterMode getConsumeMessageFilterMode() {
+        return this.consumeMessageFilterMode;
+    }
+
     @Override
     public Object clone() {
         NSQConfig newCfg = null;
diff --git a/src/main/java/com/youzan/util/ProducerWorkerThreadFactory.java b/src/main/java/com/youzan/util/ProducerWorkerThreadFactory.java
new file mode 100644
index 0000000..7b6937f
--- /dev/null
+++ b/src/main/java/com/youzan/util/ProducerWorkerThreadFactory.java
@@ -0,0 +1,56 @@
+package com.youzan.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Producer worker thread factory
+ * Created by lin on 17/11/20.
+ */
+public class ProducerWorkerThreadFactory implements ThreadFactory {
+    private static final Logger logger = LoggerFactory.getLogger(ProducerWorkerThreadFactory.class);
+
+    private final ThreadGroup group;
+    private final String namePrefix;
+    private int priority = Integer.MIN_VALUE;
+
+    final static Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error(t.getName(), e);
+        }
+    };
+
+    public ProducerWorkerThreadFactory(String poolName, int priority) {
+        if (null == poolName || poolName.isEmpty()) {
+            throw new IllegalArgumentException();
+        }
+        group = Thread.currentThread().getThreadGroup();
+        namePrefix = poolName + "-Producer-Worker-Pool-Thread";
+        this.priority = priority;
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+
+        final Thread t = new Thread(group, r, namePrefix, 0);
+        if (t.isDaemon()) {
+            t.setDaemon(false);
+        }
+        switch(priority){
+            case Thread.MAX_PRIORITY :
+            case Thread.MIN_PRIORITY:
+            case Thread.NORM_PRIORITY:
+                t.setPriority(priority);
+                break;
+            default: {
+                t.setPriority(Thread.NORM_PRIORITY);
+            }
+
+        }
+        Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
+        return t;
+    }
+}
diff --git a/src/test/java/com/youzan/nsq/client/ConsumerTest.java b/src/test/java/com/youzan/nsq/client/ConsumerTest.java
index f603a5c..123023c 100644
--- a/src/test/java/com/youzan/nsq/client/ConsumerTest.java
+++ b/src/test/java/com/youzan/nsq/client/ConsumerTest.java
@@ -6,6 +6,7 @@
 import com.youzan.nsq.client.exception.ExplicitRequeueException;
 import com.youzan.nsq.client.exception.NSQException;
 import com.youzan.nsq.client.utils.TopicUtil;
+import org.apache.commons.collections.map.HashedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -546,6 +547,55 @@ public void testSkipMessage() {
         Assert.assertFalse(skipped);
     }
 
+    @Test
+    public void testMessageHeaderFilter() {
+        NSQConfig config = new NSQConfig("BaseConsumer");
+        config.setConsumeMessageFilter("filter_key1", "filter_val1");
+        ConsumerImplV2 consumer = new ConsumerImplV2(config, new MessageHandler() {
+            @Override
+            public void process(NSQMessage message) {
+                //nothing happen
+            }
+        });
+        //mock a message frame, and NSQConnection
+        Map jsonExt = new HashedMap();
+        jsonExt.put("filter_key1", "filter_val1");
+        jsonExt.put("filter_key2", "filter_val2");
+
+        NSQMessage message = new NSQMessage();
+        message.setJsonExtHeader(jsonExt);
+
+        MockedNSQConnectionImpl conn = new MockedNSQConnectionImpl(0, new Address("127.0.0.1", 4150, "ha", "fakeTopic", 1, true), null, config);
+
+        Assert.assertTrue(consumer.checkExtFilter(message, conn));
+
+        Map jsonExtMissing = new HashedMap();
+        jsonExtMissing.put("filter_key3", "filter_val3");
+        jsonExtMissing.put("filter_key2", "filter_val2");
+
+        message.setJsonExtHeader(jsonExtMissing);
+        Assert.assertFalse(consumer.checkExtFilter(message, conn));
+
+        jsonExtMissing = new HashedMap();
+        jsonExtMissing.put("filter_key1", "filter_val1_missing");
+        jsonExtMissing.put("filter_key2", "filter_val2");
+
+        message.setJsonExtHeader(jsonExtMissing);
+        Assert.assertFalse(consumer.checkExtFilter(message, conn));
+
+        //test default
+        config = new NSQConfig("BaseConsumer");
+        consumer = new ConsumerImplV2(config, new MessageHandler() {
+            @Override
+            public void process(NSQMessage message) {
+                //nothing happen
+            }
+        });
+
+        message.setJsonExtHeader(jsonExtMissing);
+        Assert.assertTrue(consumer.checkExtFilter(message, conn));
+    }
+
     private ScheduledExecutorService keepMessagePublish(final Producer producer, final String topic, long interval) throws NSQException {
         ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
         exec.scheduleWithFixedDelay(new Runnable() {
diff --git a/src/test/java/it/youzan/nsq/client/ITTagConsumer.java b/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
index 239d63b..cc3c558 100644
--- a/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
+++ b/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
@@ -6,6 +6,7 @@
 import com.youzan.nsq.client.entity.DesiredTag;
 import com.youzan.nsq.client.entity.NSQConfig;
 import com.youzan.nsq.client.entity.NSQMessage;
+import com.youzan.nsq.client.exception.NSQException;
 import com.youzan.nsq.client.utils.TopicUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +65,7 @@ public void process(NSQMessage message) {
                 public void process(NSQMessage message) {
                     logger.error("Message should not received: " + message.getReadableContent());
                     logger.error("message tag: " + message.getTag().toString());
+
                 }
             });
             consumer.setAutoFinish(true);
@@ -205,4 +207,34 @@ public void process(NSQMessage message) {
             TopicUtil.emptyQueue("http://" + props.getProperty("admin-address"), topic, "BaseConsumer");
         }
     }
+
+    public void testConsumerWithHeaderFilter() throws Exception {
+        String topic = "JavaTesting-Ext";
+        Consumer consumer = null;
+        try{
+            logger.info("[testConsumerWithHeaderFilter] starts");
+            NSQConfig config = new NSQConfig("BaseConsumer");
+            config.setLookupAddresses(props.getProperty("lookup-addresses"));
+            config.setConsumeMessageFilter("filter_key1", "filter_val1");
+            final AtomicBoolean fail = new AtomicBoolean(false);
+            final CountDownLatch latch = new CountDownLatch(5);
+            consumer = new ConsumerImplV2(config, new MessageHandler() {
+                @Override
+                public void process(NSQMessage message) {
+                    if (null == message.getExtByName("filter_key1") || !message.getExtByName("filter_key1").equals("filter_val1"))
+                        fail.set(true);
+                    latch.countDown();
+                }
+            });
+            consumer.setAutoFinish(true);
+            consumer.subscribe(topic);
+            consumer.start();
+            latch.await(1, TimeUnit.MINUTES);
+            Assert.assertFalse(fail.get());
+        }finally {
+            consumer.close();
+            TopicUtil.emptyQueue("http://" + props.getProperty("admin-address"), topic, "BaseConsumer");
+            logger.info("[testConsumerWithHeaderFilter] ends");
+        }
+    }
 }
diff --git a/src/test/java/it/youzan/nsq/client/ITTagProducer.java b/src/test/java/it/youzan/nsq/client/ITTagProducer.java
index ceb21a6..917cc9b 100644
--- a/src/test/java/it/youzan/nsq/client/ITTagProducer.java
+++ b/src/test/java/it/youzan/nsq/client/ITTagProducer.java
@@ -121,6 +121,32 @@ public void publishWTagMixWHeader() throws Exception {
         }
     }
 
+    public void publishWFilterHeader() throws Exception {
+        TopicUtil.emptyQueue("http://" + props.getProperty("admin-address"), "testExt2Par2Rep", "BaseConsumer");
+        Topic topic = new Topic("JavaTesting-Ext");
+
+        Map ext = new HashMap<>();
+        ext.put("filter_key1" ,"filter_val1");
+        ext.put("key1" ,"this is value 1");
+
+        Map extMissing = new HashMap<>();
+        extMissing.put("filter_key1", "filter_val2");
+        extMissing.put("filter_key2" ,"filter_val2");
+
+
+        for (int i = 0; i < 5; i++) {
+            Message msg = Message.create(topic, "message");
+            msg.setJsonHeaderExt(ext);
+            producer.publish(msg);
+        }
+
+        for (int i = 0; i < 5; i++) {
+            Message msg = Message.create(topic, "message");
+            msg.setJsonHeaderExt(extMissing);
+            producer.publish(msg);
+        }
+    }
+
     @AfterClass
     public void close() {
         IOUtil.closeQuietly(producer);
diff --git a/src/test/resources/testng-base-tag-suite.xml b/src/test/resources/testng-base-tag-suite.xml
index 79f1f9e..bd98892 100644
--- a/src/test/resources/testng-base-tag-suite.xml
+++ b/src/test/resources/testng-base-tag-suite.xml
@@ -60,4 +60,19 @@
             
         
     
+
+    
+        
+            
+                
+                    
+                
+            
+            
+                
+                    
+                
+            
+        
+    
 

From 9bcdfbd2689d61a43c8f813a97417fa05ce0d272 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=85=83=E5=AE=88?= 
Date: Tue, 21 Nov 2017 18:03:40 +0800
Subject: [PATCH 2/7] fix typo
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: 元守 
---
 src/main/java/com/youzan/nsq/client/entity/NSQConfig.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
index c364c48..cbb5fc4 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
@@ -929,7 +929,7 @@ private enum ConsumePolicy {
         SKIP
     }
 
-    private Map> consumePolcyMap = new ConcurrentHashMap<>();
+    private Map> consumePolicyMap = new ConcurrentHashMap<>();
 
     /**
      * Set extension key/value map for consumer to skip, when json extension header in one message
@@ -942,7 +942,7 @@ private enum ConsumePolicy {
     NSQConfig setMessageSkipExtensionKVMap(final Map extensionKV) {
         if(null == extensionKV || extensionKV.size() == 0)
             return this;
-        this.consumePolcyMap.put(ConsumePolicy.SKIP, Collections.unmodifiableMap(extensionKV));
+        this.consumePolicyMap.put(ConsumePolicy.SKIP, Collections.unmodifiableMap(extensionKV));
         return this;
     }
 
@@ -983,7 +983,7 @@ public boolean getBlockWhenBorrowConn4Producer() {
      * @return extension key/value map for consumer to skip, which is {@link Collections.UnmodifiableMap}.
      */
     public Map getMessageSkipExtensionKVMap() {
-        return this.consumePolcyMap.get(ConsumePolicy.SKIP);
+        return this.consumePolicyMap.get(ConsumePolicy.SKIP);
     }
 
     private int producerPoolSize = 4;

From 98df094758299694c5e24715a99cce82ee8052a6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=85=83=E5=AE=88?= 
Date: Tue, 21 Nov 2017 18:03:40 +0800
Subject: [PATCH 3/7] fix typo
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: 元守 
---
 .../com/youzan/nsq/client/ConsumerImplV2.java | 19 -------------------
 .../youzan/nsq/client/entity/NSQConfig.java   |  6 +++---
 2 files changed, 3 insertions(+), 22 deletions(-)

diff --git a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
index 9ffe575..19dc8b6 100644
--- a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
@@ -27,8 +27,6 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static com.youzan.nsq.client.core.command.PubExt.FILTER_EXT_KEY;
-
 /**
  * TODO: a description
  * 
@@ -56,7 +54,6 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
 
     private volatile long lastConnecting = 0L;
     private volatile long lastSuccess = 0L;
-    private volatile float consumptionRate = 0f;
 
     private final AtomicLong received = new AtomicLong(0);
     private final AtomicLong success = new AtomicLong(0);
@@ -254,14 +251,6 @@ public void start() throws NSQException {
         }
     }
 
-    /**
-     * update consumption rate according to message consumption last _INTERVAL_IN_SECOND
-     */
-    private void updateConsumptionRate() {
-        consumptionRate = ((long)success.get() - lastSuccess) / _INTERVAL_IN_SECOND;
-        lastSuccess = success.get();
-    }
-
     public boolean isConsumptionEstimateElapseTimeout() {
 //        return consumptionRate * queue4Consume.get() *1000 >= this.config.getMsgTimeoutInMillisecond();
         return false;
@@ -1074,14 +1063,6 @@ public String toString() {
 
     @Override
     public float getLoadFactor() {
-//        ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
-//        int active = pool.getActiveCount();
-//        long queueSize = queue4Consume.get();
-//        if(active > 0)
-//            return (float)queueSize/active;
-//        else {
-//            return 0f;
-//        }
         return 0f;
     }
 
diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
index c364c48..cbb5fc4 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
@@ -929,7 +929,7 @@ private enum ConsumePolicy {
         SKIP
     }
 
-    private Map> consumePolcyMap = new ConcurrentHashMap<>();
+    private Map> consumePolicyMap = new ConcurrentHashMap<>();
 
     /**
      * Set extension key/value map for consumer to skip, when json extension header in one message
@@ -942,7 +942,7 @@ private enum ConsumePolicy {
     NSQConfig setMessageSkipExtensionKVMap(final Map extensionKV) {
         if(null == extensionKV || extensionKV.size() == 0)
             return this;
-        this.consumePolcyMap.put(ConsumePolicy.SKIP, Collections.unmodifiableMap(extensionKV));
+        this.consumePolicyMap.put(ConsumePolicy.SKIP, Collections.unmodifiableMap(extensionKV));
         return this;
     }
 
@@ -983,7 +983,7 @@ public boolean getBlockWhenBorrowConn4Producer() {
      * @return extension key/value map for consumer to skip, which is {@link Collections.UnmodifiableMap}.
      */
     public Map getMessageSkipExtensionKVMap() {
-        return this.consumePolcyMap.get(ConsumePolicy.SKIP);
+        return this.consumePolicyMap.get(ConsumePolicy.SKIP);
     }
 
     private int producerPoolSize = 4;

From 81dc6dd027470535ea4554b2f19cd3f5b52f76fc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=85=83=E5=AE=88?= 
Date: Fri, 24 Nov 2017 18:10:23 +0800
Subject: [PATCH 4/7] message consume warn&error threshold
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: 元守 
---
 .../com/youzan/nsq/client/ConsumerImplV2.java | 15 +++
 .../com/youzan/nsq/client/MessageReceipt.java | 96 +++++++++++++++++++
 .../java/com/youzan/nsq/client/Producer.java  |  9 ++
 .../com/youzan/nsq/client/ProducerImplV2.java | 41 +++++---
 .../client/entity/DefaultRdyUpdatePolicy.java | 20 ----
 .../nsq/client/entity/IRdyUpdatePolicy.java   | 40 --------
 .../youzan/nsq/client/entity/NSQConfig.java   | 25 ++++-
 .../client/network/frame/ResponseFrame.java   | 53 +++++++---
 .../java/it/youzan/nsq/client/ITProducer.java | 33 ++++++-
 src/test/resources/testng-base-suite.xml      | 12 +++
 10 files changed, 253 insertions(+), 91 deletions(-)
 create mode 100644 src/main/java/com/youzan/nsq/client/MessageReceipt.java
 delete mode 100644 src/main/java/com/youzan/nsq/client/entity/DefaultRdyUpdatePolicy.java
 delete mode 100644 src/main/java/com/youzan/nsq/client/entity/IRdyUpdatePolicy.java

diff --git a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
index 19dc8b6..c485d69 100644
--- a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
@@ -779,6 +779,21 @@ public void operationComplete(ChannelFuture future) throws Exception {
         // Post
         //log warn
         if (!ok) {
+            int attempt = message.getReadableAttempts();
+            int warningThreshold = this.config.getAttemptWarningThresdhold();
+            int errorThreshold = this.config.getAttemptErrorThresdhold();
+
+            if(errorThreshold > 0 && attempt > errorThreshold){
+                if(0==attempt%errorThreshold) {
+                    logger.error("Message attempts number has been {}, consider logging message content and finish. {}", attempt, message.toString());
+                }
+            } else if (warningThreshold > 0 && attempt > warningThreshold){
+                if(attempt > warningThreshold) {
+                    if(0==attempt%warningThreshold) {
+                        logger.warn("Message attempts number has been {}, consider logging message content and finish. {}", attempt, message.toString());
+                    }
+                }
+            }
             //TODO: connection.setMessageConsumptionFailed(start);
 //            logger.warn("Exception occurs in message handler. Please check it right now {} , Original message: {}.", message, message.getReadableContent());
         } else if (!this.config.isOrdered()){
diff --git a/src/main/java/com/youzan/nsq/client/MessageReceipt.java b/src/main/java/com/youzan/nsq/client/MessageReceipt.java
new file mode 100644
index 0000000..360f1d7
--- /dev/null
+++ b/src/main/java/com/youzan/nsq/client/MessageReceipt.java
@@ -0,0 +1,96 @@
+package com.youzan.nsq.client;
+
+import com.youzan.nsq.client.entity.Message;
+
+/**
+ * Message receipt for publish response, for normal publish(PUB),
+ * it contains: topic name, partition number, nsqd address(host:port);
+ * if current receipt is for trace publish(PUB_TARCE), when {@link Message#traced()},
+ * following info contains, besides topic and nsqd info mentioned above:
+ *
+ * [internalID]
+ * [traceID]
+ * [diskQueueOffset]
+ * [diskQueueDataSize]
+ */
+public class MessageReceipt implements MessageMetadata {
+    private long internalID;
+    private long traceID;
+    private long diskQueueOffset=-1l;
+    private int diskQueueSize=-1;
+    private String topicName;
+    private int partition;
+    private String nsqdAddr;
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public void setPartition(int partition) {
+        this.partition = partition;
+    }
+
+    public String getNsqdAddr() {
+        return nsqdAddr;
+    }
+
+    public void setNsqdAddr(String nsqdAddr) {
+        this.nsqdAddr = nsqdAddr;
+    }
+
+    public MessageReceipt() {
+
+    }
+
+    public void setInternalID(long internalID) {
+        this.internalID = internalID;
+    }
+
+    public long getInternalID() {
+        return internalID;
+    }
+
+    public long getTraceID() {
+        return traceID;
+    }
+
+    public void setTraceID(long traceID) {
+        this.traceID = traceID;
+    }
+
+    public long getDiskQueueOffset() {
+        return diskQueueOffset;
+    }
+
+    public void setDiskQueueOffset(long diskQueueOffset) {
+        this.diskQueueOffset = diskQueueOffset;
+    }
+
+    public int getDiskQueueSize() {
+        return diskQueueSize;
+    }
+
+    public void setDiskQueueSize(int diskQueueSize) {
+        this.diskQueueSize = diskQueueSize;
+    }
+
+    @Override
+    public String toMetadataStr() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(this.getClass().toString() + " meta-data:").append("\n");
+        sb.append("\t[internalID]:\t").append(internalID).append("\n");
+        sb.append("\t[traceID]:\t").append(traceID).append("\n");
+        sb.append("\t[diskQueueOffset]:\t").append(diskQueueOffset).append("\n");
+        sb.append("\t[diskQueueDataSize]:\t").append(diskQueueSize).append("\n");
+        sb.append(this.getClass().toString() + " end.");
+        return sb.toString();
+    }
+}
diff --git a/src/main/java/com/youzan/nsq/client/Producer.java b/src/main/java/com/youzan/nsq/client/Producer.java
index 5795eb7..dc71ad1 100644
--- a/src/main/java/com/youzan/nsq/client/Producer.java
+++ b/src/main/java/com/youzan/nsq/client/Producer.java
@@ -47,6 +47,15 @@ public interface Producer extends Client, Closeable {
 
     void publish(Message message) throws NSQException;
 
+    /**
+     * publish message and return receipt contains target nsqd&topic info for normal publish,
+     * and message meta info(internal id, disk offset, disk size, trace id) if message is traced.
+     * @param message
+     * @return receipt
+     * @throws NSQException
+     */
+    MessageReceipt publishAndGetReceipt(Message message) throws NSQException;
+
     /**
      * Use it to produce only one 'message' sending to MQ.
      * partition info is not specified in this function,
diff --git a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
index 7c197fa..fb4d576 100644
--- a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
@@ -296,6 +296,11 @@ public void publish(String message, final Topic topic, Object shardingID) throws
 
     @Override
     public void publish(final Message message) throws NSQException {
+        this.publishAndGetReceipt(message);
+    }
+
+    @Override
+    public MessageReceipt publishAndGetReceipt(final Message message) throws NSQException {
         final Context cxt = new Context();
         if(PERF_LOG.isDebugEnabled()) {
             cxt.setTraceID(pubTraceIdGen.getAndIncrement());
@@ -314,7 +319,7 @@ public void publish(final Message message) throws NSQException {
 
         try{
             //TODO: poll before timeout
-            sendPUB(message, cxt);
+            return sendPUB(message, cxt);
         }catch (NSQPubException pubE){
             logger.error(pubE.getLocalizedMessage());
             pubE.punchExceptions(logger);
@@ -441,7 +446,7 @@ public void publishMulti(List messages, String topic) throws NSQExceptio
         this.publishMulti(messages, new Topic(topic));
     }
 
-    private void sendPUB(final Message msg, Context cxt) throws NSQException {
+    private MessageReceipt sendPUB(final Message msg, Context cxt) throws NSQException {
         int c = 0; // be continuous
         boolean returnCon;
         NSQConnection conn = null;
@@ -488,11 +493,17 @@ private void sendPUB(final Message msg, Context cxt) throws NSQException {
             }
             //create PUB command
             try {
+                String host = conn.getAddress().getHost();
+                int port = conn.getAddress().getPort();
+                String topic = conn.getAddress().getTopic();
+                int partition = -1;
+
                 final Pub pub = createPubCmd(msg);
 
                 //check if address has partition info, if it does, update pub's partition
                 if(conn.getAddress().hasPartition()) {
-                    pub.overrideDefaultPartition(conn.getAddress().getPartition());
+                    partition = conn.getAddress().getPartition();
+                    pub.overrideDefaultPartition(partition);
                 }
 
                 long pubAndWaitStart = System.currentTimeMillis();
@@ -506,10 +517,21 @@ private void sendPUB(final Message msg, Context cxt) throws NSQException {
                 }
 
                 handleResponse(msg.getTopic(), frame, conn);
+                //when hit this line what we have are response frame
                 success.addAndGet(msg.getMessageCount());
-                if(msg.isTraced() && frame instanceof MessageMetadata && TraceLogger.isTraceLoggerEnabled() && conn.getAddress().isHA())
-                    TraceLogger.trace(this, conn, (MessageMetadata) frame);
-                break;
+                if(msg.isTraced() && frame instanceof ResponseFrame && conn.getAddress().isHA()) {
+                    if (TraceLogger.isTraceLoggerEnabled())
+                        TraceLogger.trace(this, conn, (MessageMetadata) frame);
+                }
+                ResponseFrame response = (ResponseFrame) frame;
+                MessageReceipt receipt = response.getReceipt();
+                receipt.setNsqdAddr(host + ":" + port);
+                receipt.setTopicName(topic);
+                receipt.setPartition(partition);
+                if(PERF_LOG.isDebugEnabled()){
+                    PERF_LOG.debug("{}: Producer took {} milliSec to send message to {}", cxt.getTraceID(), System.currentTimeMillis() - start, conn.getAddress());
+                }
+                return receipt;
             }
             catch(NSQPubFactoryInitializeException | NSQTagException | NSQTopicNotExtendableException | NSQExtNotSupportedException expShouldFail) {
                 throw expShouldFail;
@@ -555,12 +577,7 @@ private void sendPUB(final Message msg, Context cxt) throws NSQException {
                 }
             }
         } // end loop
-        if (c >= retry) {
-            throw new NSQPubException(exceptions);
-        }
-        if(PERF_LOG.isDebugEnabled()){
-            PERF_LOG.debug("{}: Producer took {} milliSec to send message to {}", cxt.getTraceID(), System.currentTimeMillis() - start, conn.getAddress());
-        }
+        throw new NSQPubException(exceptions);
     }
 
     /**
diff --git a/src/main/java/com/youzan/nsq/client/entity/DefaultRdyUpdatePolicy.java b/src/main/java/com/youzan/nsq/client/entity/DefaultRdyUpdatePolicy.java
deleted file mode 100644
index 2171e89..0000000
--- a/src/main/java/com/youzan/nsq/client/entity/DefaultRdyUpdatePolicy.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.youzan.nsq.client.entity;
-
-/**
- * @deprecated
- * Created by lin on 17/7/31.
- */
-public class DefaultRdyUpdatePolicy implements IRdyUpdatePolicy {
-    public final static float THRESDHOLD = 1.5f;
-    public final static float WATER_HIGH = 1.75f;
-
-    @Override
-    public boolean rdyShouldIncrease(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
-        return !mayTimeout && scheduleLoad <= THRESDHOLD;
-    }
-
-    @Override
-    public boolean rdyShouldDecline(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
-        return scheduleLoad >= WATER_HIGH && mayTimeout;
-    }
-}
diff --git a/src/main/java/com/youzan/nsq/client/entity/IRdyUpdatePolicy.java b/src/main/java/com/youzan/nsq/client/entity/IRdyUpdatePolicy.java
deleted file mode 100644
index dc5bd99..0000000
--- a/src/main/java/com/youzan/nsq/client/entity/IRdyUpdatePolicy.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.youzan.nsq.client.entity;
-
-/**
- * @deprecated
- * Created by lin on 17/7/31.
- */
-public interface IRdyUpdatePolicy {
-
-    /**
-     * indicate whether rdy should increase.
-     * @param scheduleLoad  scheduleLoad for {@link com.youzan.nsq.client.Consumer} which has current policy applied.
-     *                      scheduleLoad = (float)(message for processing in queue) / (message being processing)
-     *
-     * @param mayTimeout    indicate whether message processing may timeout, given the message waiting for process
-     *                      in queue and the average process time elapse:
-     *                      (message consumption rate) * (message # in queue) >= NSQConfig.getMsgTiemoutInMilliSe
-     * @param maxRdyPerCon  max rdy per connection allowed
-     *
-     * @param extraRdy      extra Rdy could be allocated for current topic's connections
-     *
-     * @return true indicates rdy should increase, otherwise false
-     */
-    boolean rdyShouldIncrease(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy);
-
-    /**
-     * indicate whether rdy should increase.
-     * @param scheduleLoad  scheduleLoad for {@link com.youzan.nsq.client.Consumer} which has current policy applied.
-     *                      scheduleLoad = (float)(message for processing in queue) / (message being processing)
-     *
-     * @param mayTimeout    indicate whether message processing may timeout, given the message waiting for process
-     *                      in queue and the average process time elapse:
-     *                      (message consumption rate) * (message # in queue) >= NSQConfig#getMsgTiemoutInMilliSe
-     * @param maxRdyPerCon  max rdy per connection allowed
-     *
-     * @param extraRdy      extra Rdy could be allocated for current topic's connections
-     *
-     * @return true indicates rdy should decline, otherwise false
-     */
-    boolean rdyShouldDecline(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy);
-}
diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
index cbb5fc4..1bedc36 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
@@ -31,6 +31,7 @@ public class NSQConfig implements java.io.Serializable, Cloneable {
     private static final long serialVersionUID = 6624842850216901700L;
     private static final Logger logger = LoggerFactory.getLogger(NSQConfig.class);
     private static final ObjectMapper MAPPER_CONFIG = new ObjectMapper();
+
     static {
         MAPPER_CONFIG.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
     }
@@ -88,7 +89,7 @@ public void setExpectedRdyUpdatePolicy(String policyClass) {
                 clazz = Class.forName(policyClass);
             }
             Object policy = clazz.newInstance();
-            if (policy instanceof IRdyUpdatePolicy) {
+            if (policy instanceof IExpectedRdyUpdatePolicy) {
                 @SuppressWarnings("unchecked")
                         IExpectedRdyUpdatePolicy expRdyPolicy = (IExpectedRdyUpdatePolicy) policy;
                 this.expRdyUpdatePolicy = expRdyPolicy;
@@ -147,6 +148,28 @@ public enum Compression {
     private boolean featureNegotiation;
     private boolean userSpecifiedLookupd = false;
     private Map localTraceMap = new ConcurrentHashMap<>();
+
+    public int getAttemptWarningThresdhold() {
+        return attemptWarningThresdhold;
+    }
+
+    public NSQConfig setAttemptWarningThresdhold(int attemptWarningThresdhold) {
+        this.attemptWarningThresdhold = attemptWarningThresdhold;
+        return this;
+    }
+
+    public int getAttemptErrorThresdhold() {
+        return attemptErrorThresdhold;
+    }
+
+    public NSQConfig setAttemptErrorThresdhold(int attemptErrorThresdhold) {
+        this.attemptErrorThresdhold = attemptErrorThresdhold;
+        return this;
+    }
+
+    private int attemptWarningThresdhold = 10;
+    private int attemptErrorThresdhold = 50;
+
     /*-
      * =========================================================================
      *                             All of Timeout
diff --git a/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java b/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java
index a68b213..1d44297 100644
--- a/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java
+++ b/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java
@@ -1,6 +1,8 @@
 package com.youzan.nsq.client.network.frame;
 
 import com.youzan.nsq.client.MessageMetadata;
+import com.youzan.nsq.client.MessageReceipt;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -8,6 +10,7 @@
 
 public class ResponseFrame extends NSQFrame implements MessageMetadata{
     private static final Logger logger = LoggerFactory.getLogger(ResponseFrame.class);
+    private MessageReceipt receipt = new MessageReceipt();
 
     @Override
     public FrameType getType() {
@@ -19,6 +22,12 @@ public String getMessage() {
         return new String(getData(), DEFAULT_CHARSET).trim();
     }
 
+    @Override
+    public void setData(byte[] data) {
+        super.setData(data);
+        generateReceipt();
+    }
+
     @Override
     public String toString() {
         return "ResponseFrame: " + this.getMessage();
@@ -29,37 +38,55 @@ public String toMetadataStr() {
         String resMsg = getMessage();
         //check if has meta data
         if(resMsg.startsWith("OK") && resMsg.length() > 2){
+            StringBuilder sb = new StringBuilder();
+            sb.append(this.getClass().toString() + " meta-data:").append("\n");
+            sb.append("\t[internalID]:\t").append(receipt.getInternalID()).append("\n");
+            sb.append("\t[traceID]:\t").append(receipt.getTraceID()).append("\n");
+            sb.append("\t[diskQueueOffset]:\t").append(receipt.getDiskQueueOffset()).append("\n");
+            sb.append("\t[diskQueueDataSize]:\t").append(receipt.getDiskQueueSize()).append("\n");
+            sb.append(this.getClass().toString() + " end.");
+            return sb.toString();
+        }
+        return "No meta data";
+    }
+
+    /**
+     * Generate message receipt for publish response
+     * @return messageReceipt
+     */
+    private void generateReceipt() {
+        String resMsg = getMessage();
+        //check if has meta data
+        if (resMsg.startsWith("OK") && resMsg.length() > 2) {
             byte[] data = getData();
 
             //internal ID
             byte[] internalIDByte = new byte[8];
             System.arraycopy(data, 2, internalIDByte, 0, 8);
             long internalID = ByteBuffer.wrap(internalIDByte).getLong();
+            receipt.setInternalID(internalID);
 
             //traceID
             byte[] traceIDByte = new byte[8];
             System.arraycopy(data, 10, traceIDByte, 0, 8);
             long traceID = ByteBuffer.wrap(traceIDByte).getLong();
+            receipt.setTraceID(traceID);
 
             //disk queue offset
             byte[] diskqueueOffsetByte = new byte[8];
             System.arraycopy(data, 18, diskqueueOffsetByte, 0, 8);
             long diskQueueOffset = ByteBuffer.wrap(diskqueueOffsetByte).getLong();
+            receipt.setDiskQueueOffset(diskQueueOffset);
 
             //disk queue data size
-            byte[] diskQueueSizeByte = new byte[4];
-            System.arraycopy(data, 26, diskQueueSizeByte, 0, 4);
-            int diskQueueSize = ByteBuffer.wrap(diskqueueOffsetByte).getInt();
-
-            StringBuilder sb = new StringBuilder();
-            sb.append(this.getClass().toString() + " meta-data:").append("\n");
-            sb.append("\t[internalID]:\t").append(internalID).append("\n");
-            sb.append("\t[traceID]:\t").append(traceID).append("\n");
-            sb.append("\t[diskQueueOffset]:\t").append(diskQueueOffset).append("\n");
-            sb.append("\t[diskQueueDataSize]:\t").append(diskQueueSize).append("\n");
-            sb.append(this.getClass().toString() + " end.");
-            return sb.toString();
+            byte[] diskqueueSizeByte = new byte[4];
+            System.arraycopy(data, 26, diskqueueSizeByte, 0, 4);
+            int diskQueueSize = ByteBuffer.wrap(diskqueueSizeByte).getInt();
+            receipt.setDiskQueueSize(diskQueueSize);
         }
-        return "No meta data";
+    }
+
+    public MessageReceipt getReceipt() {
+        return this.receipt;
     }
 }
diff --git a/src/test/java/it/youzan/nsq/client/ITProducer.java b/src/test/java/it/youzan/nsq/client/ITProducer.java
index 90d4d1b..c190e5f 100644
--- a/src/test/java/it/youzan/nsq/client/ITProducer.java
+++ b/src/test/java/it/youzan/nsq/client/ITProducer.java
@@ -1,6 +1,7 @@
 package it.youzan.nsq.client;
 
 import com.youzan.nsq.client.*;
+import com.youzan.nsq.client.entity.Message;
 import com.youzan.nsq.client.entity.NSQConfig;
 import com.youzan.nsq.client.entity.NSQMessage;
 import com.youzan.nsq.client.entity.Topic;
@@ -49,9 +50,8 @@ public void init() throws Exception {
         config.setThreadPoolSize4IO(Integer.valueOf(threadPoolSize4IO));
     }
 
-    @Test
     public void multiPublishBatchError2() throws Exception {
-        logger.info("[ITProducer#multiPublishBatch] starts");
+        logger.info("[ITProducer#multiPublishBatchError2] starts");
         Producer producer = null;
         Consumer consumer = null;
         Topic topic = new Topic("JavaTesting-Producer-Base");
@@ -98,9 +98,8 @@ public void process(NSQMessage message) {
         }
     }
 
-    @Test
     public void multiPublishBatchError1() throws Exception {
-        logger.info("[ITProducer#multiPublishBatch] starts");
+        logger.info("[ITProducer#multiPublishBatchError1] starts");
         Producer producer = null;
         Consumer consumer = null;
         Topic topic = new Topic("JavaTesting-Producer-Base");
@@ -149,7 +148,6 @@ public void process(NSQMessage message) {
         }
     }
 
-    @Test
     public void multiPublishBatch() throws Exception {
         logger.info("[ITProducer#multiPublishBatch] starts");
         Producer producer = null;
@@ -266,4 +264,29 @@ public void publishDeflate() throws Exception {
             logger.info("[ITProducer#publishDeflate] ends");
         }
     }
+
+    public void publishTrace() throws Exception {
+        logger.info("[ITProducer#publishTrace] starts");
+        TopicUtil.emptyQueue(adminHttp, "JavaTesting-Producer-Base", "BaseConsumer");
+        Producer producer = new ProducerImplV2(config);
+        try {
+            Topic topic = new Topic("JavaTesting-Producer-Base");
+            producer.start();
+            String msgStr = "The quick brown fox jumps over the lazy dog, 那只迅捷的灰狐狸跳过了那条懒狗";
+            for (int i = 0; i < 10; i++) {
+                Message msg = Message.create(topic, msgStr);
+                msg.traced();
+                MessageReceipt receipt = producer.publishAndGetReceipt(msg);
+                Assert.assertNotNull(receipt.getTopicName());
+                Assert.assertNotNull(receipt.getNsqdAddr());
+                Assert.assertEquals(0, receipt.getTraceID());
+                Assert.assertNotEquals(-1, receipt.getDiskQueueOffset());
+                Assert.assertNotEquals(-1, receipt.getDiskQueueSize());
+            }
+        }finally {
+            producer.close();
+            TopicUtil.emptyQueue(adminHttp, "JavaTesting-Producer-Base", "BaseConsumer");
+            logger.info("[ITProducer#publishDeflate] ends");
+        }
+    }
 }
diff --git a/src/test/resources/testng-base-suite.xml b/src/test/resources/testng-base-suite.xml
index 8c20287..7649e47 100644
--- a/src/test/resources/testng-base-suite.xml
+++ b/src/test/resources/testng-base-suite.xml
@@ -57,4 +57,16 @@
             
         
     
+    
+        
+            
+                
+                    
+                    
+                    
+                    
+                
+            
+        
+    
 

From 8b2051830d4030e28b97dbedce26173346e2ef12 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=85=83=E5=AE=88?= 
Date: Mon, 27 Nov 2017 11:46:55 +0800
Subject: [PATCH 5/7] consuemer name info in client toString
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: 元守 
---
 .../com/youzan/nsq/client/ConsumerImplV2.java | 34 +++++++++++--------
 .../com/youzan/nsq/client/ProducerImplV2.java |  5 ++-
 .../nsq/client/core/NSQSimpleClient.java      |  8 +++--
 .../youzan/nsq/client/entity/NSQConfig.java   |  3 +-
 .../com/youzan/nsq/client/ConsumerTest.java   |  8 +++++
 .../nsq/client/MockedNSQSimpleClient.java     |  2 +-
 .../client/core/ConnectionManagerTest.java    | 21 ------------
 .../it/youzan/nsq/client/ITTagConsumer.java   |  3 +-
 8 files changed, 39 insertions(+), 45 deletions(-)

diff --git a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
index c485d69..170f60e 100644
--- a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
@@ -80,8 +80,7 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
     /*
      * schedule executor for updating nsqd connections in effect
      */
-    private final ScheduledExecutorService scheduler = Executors
-            .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getSimpleName(), Thread.NORM_PRIORITY));
+    private ScheduledExecutorService scheduler;
     /*
      * message handler
      */
@@ -112,7 +111,7 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
      */
     public ConsumerImplV2(NSQConfig config) {
         this.config = config;
-        this.simpleClient = new NSQSimpleClient(Role.Consumer, this.config.getUserSpecifiedLookupAddress());
+        this.simpleClient = new NSQSimpleClient(Role.Consumer, this.config.getUserSpecifiedLookupAddress(), this.config);
 
         //initialize netty component
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup(config.getNettyPoolSize());
@@ -124,7 +123,11 @@ public ConsumerImplV2(NSQConfig config) {
         bootstrap.handler(new NSQClientInitializer());
         //initialize consumer worker size
         executor = Executors.newFixedThreadPool(this.config.getConsumerWorkerPoolSize(),
-                new NamedThreadFactory(this.getClass().getSimpleName() + "-ClientBusiness", Thread.MAX_PRIORITY));
+                new NamedThreadFactory(this.getClass().getSimpleName() + "-ClientBusiness-" + this.config.getConsumerName(), Thread.MAX_PRIORITY));
+        String consumerName = (null == this.config) ? "-null" : "-" + this.config.getConsumerName();
+        //intialize consumer simple client thread
+        scheduler = Executors
+                .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getSimpleName() + consumerName, Thread.NORM_PRIORITY));
     }
 
     /**
@@ -230,7 +233,7 @@ public void start() throws NSQException {
         //start consumer
         if (this.started.compareAndSet(Boolean.FALSE, Boolean.TRUE) && cLock.writeLock().tryLock()) {
             String configJsonStr = NSQConfig.parseNSQConfig(this.config);
-            logger.info("Config for consumer {}: {}", this, configJsonStr);
+            logger.info("Consumer {} starts with config: {}", this, configJsonStr);
             try {
                 //validate that consumer have right lookup address source
                 if (!validateLookupdSource()) {
@@ -570,11 +573,6 @@ public void incoming(final NSQFrame frame, final NSQConnection conn) throws NSQE
                     return;
                 }
 
-                if(!checkExtFilter(message, conn)) {
-                    _finish(message, conn);
-                    return;
-                }
-
                 if (TraceLogger.isTraceLoggerEnabled() && conn.getAddress().isHA())
                     TraceLogger.trace(this, conn, message);
                 if (this.config.isOrdered()
@@ -666,6 +664,7 @@ private void consume(final NSQMessage message, final NSQConnection connection) {
         boolean retry;
         boolean explicitRequeue = false;
         boolean skip = needSkip(message);
+        skip = skip || !checkExtFilter(message, connection);
 
         long start = System.currentTimeMillis();
         try {
@@ -748,6 +747,9 @@ private void consume(final NSQMessage message, final NSQConnection connection) {
                     final byte[] id = message.getMessageID();
                     logger.info("Client does a re-queue explicitly. MessageID: {} , Hex: {}", id, message.newHexString(id));
                 }
+            } else if (skip) {
+                // Finish
+                cmd = new Finish(message.getMessageID());
             } else {
                 // ignore actions
                 cmd = null;
@@ -768,10 +770,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
                             }
                         }
                     });
-                    if (cmd instanceof Finish) {
-                        finished.incrementAndGet();
-                    } else {
-                        re.incrementAndGet();
+                    if(!skip) {
+                        if (cmd instanceof Finish) {
+                            finished.incrementAndGet();
+                        } else {
+                            re.incrementAndGet();
+                        }
                     }
                 }
             }
@@ -1073,7 +1077,7 @@ public String toString() {
         } catch (IOException e) {
             logger.warn(e.getLocalizedMessage());
         }
-        return "[Consumer] at " + ipStr;
+        return "[Consumer@" + this.config.getConsumerName() + "] " + super.toString() + " at " + ipStr;
     }
 
     @Override
diff --git a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
index fb4d576..1b73ed6 100644
--- a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
@@ -53,7 +53,6 @@ public class ProducerImplV2 implements Producer {
     private final ScheduledExecutorService scheduler = Executors
             .newSingleThreadScheduledExecutor(new ProducerWorkerThreadFactory(this.getClass().getName(), Thread.NORM_PRIORITY));
 
-
     private final ExecutorService pubExec;
 
     private final ConcurrentHashMap topic_2_lastActiveTime = new ConcurrentHashMap<>();
@@ -69,7 +68,7 @@ public class ProducerImplV2 implements Producer {
      */
     public ProducerImplV2(NSQConfig config) {
         this.config = config;
-        this.simpleClient = new NSQSimpleClient(Role.Producer, this.config.getUserSpecifiedLookupAddress());
+        this.simpleClient = new NSQSimpleClient(Role.Producer, this.config.getUserSpecifiedLookupAddress(), this.config);
 
         this.poolConfig = new GenericKeyedObjectPoolConfig();
         this.factory = new KeyedPooledConnectionFactory(this.config, this);
@@ -758,6 +757,6 @@ public String toString(){
         } catch (IOException e) {
             logger.warn(e.getLocalizedMessage());
         }
-        return "[Producer] at " + ipStr;
+        return "[Producer] " + super.toString() + " at " + ipStr;
     }
 }
diff --git a/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java b/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java
index 24bc079..b1089f5 100644
--- a/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java
+++ b/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java
@@ -57,8 +57,7 @@ public class NSQSimpleClient implements Client, Closeable {
     /*
      *single schedule executor for maintaining topic to partition map
      */
-    private final ScheduledExecutorService scheduler = Executors
-            .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getName(), Thread.MAX_PRIORITY));
+    private ScheduledExecutorService scheduler;
 
     /*
      * role of client current simple client nested
@@ -67,11 +66,14 @@ public class NSQSimpleClient implements Client, Closeable {
     private final LookupService lookup;
     private final boolean useLocalLookupd;
 
-    public NSQSimpleClient(Role role, boolean localLookupd) {
+    public NSQSimpleClient(Role role, boolean localLookupd, final NSQConfig config) {
         this.role = role;
         this.lookupLocalID = CLIENT_ID.incrementAndGet();
         this.lookup = new LookupServiceImpl(role, this.lookupLocalID);
         this.useLocalLookupd = localLookupd;
+        String consumerName = (null == config || role != Role.Consumer) ? "-null" : "-" + config.getConsumerName();
+        scheduler = Executors
+                .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getName() + "-" + role.getRoleTxt() + consumerName, Thread.MAX_PRIORITY));
     }
 
     /**
diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
index 1bedc36..e08defb 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
@@ -1032,7 +1032,8 @@ public int getPublishWorkerPoolSize() {
     /**
      * set consume message filter value, default value is null, which means there is NO message filter applied for
      * message consumer which has current {@link NSQConfig} applied.
-     * @param filterVal
+     * @param key       extension filter key to locate extension value
+     * @param filterVal expected extension value to match
      * @return NSQConfig
      */
     public NSQConfig setConsumeMessageFilter(String key, String filterVal) {
diff --git a/src/test/java/com/youzan/nsq/client/ConsumerTest.java b/src/test/java/com/youzan/nsq/client/ConsumerTest.java
index 123023c..0d2a592 100644
--- a/src/test/java/com/youzan/nsq/client/ConsumerTest.java
+++ b/src/test/java/com/youzan/nsq/client/ConsumerTest.java
@@ -596,6 +596,14 @@ public void process(NSQMessage message) {
         Assert.assertTrue(consumer.checkExtFilter(message, conn));
     }
 
+    @Test
+    public void testConsumerToString() {
+        NSQConfig config = new NSQConfig("BaseConsumer");
+        Consumer consumer = new ConsumerImplV2(config);
+        String consumerString = consumer.toString();
+        Assert.assertTrue(consumerString.contains("BaseConsumer"));
+    }
+
     private ScheduledExecutorService keepMessagePublish(final Producer producer, final String topic, long interval) throws NSQException {
         ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
         exec.scheduleWithFixedDelay(new Runnable() {
diff --git a/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java b/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java
index c753ea7..90d1ce3 100644
--- a/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java
+++ b/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java
@@ -21,7 +21,7 @@ public class MockedNSQSimpleClient extends NSQSimpleClient {
     private final Logger logger = LoggerFactory.getLogger(MockedNSQSimpleClient.class.getName());
 
     public MockedNSQSimpleClient(Role role, boolean localLookupd) {
-        super(role, localLookupd);
+        super(role, localLookupd, null);
     }
 
     @Override
diff --git a/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java b/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java
index c0625b6..f18cb2d 100644
--- a/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java
+++ b/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java
@@ -759,27 +759,6 @@ public boolean isConsumptionEstimateElapseTimeout() {
         }
     }
 
-    public static class BadRdyUpdatePolicy implements IRdyUpdatePolicy {
-
-        @Override
-        public boolean rdyShouldIncrease(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
-            boolean flag = true;
-            if(flag) {
-                throw new RuntimeException("expected exp in rdy update policy");
-            }
-            return true;
-        }
-
-        @Override
-        public boolean rdyShouldDecline(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
-            boolean flag = true;
-            if(flag) {
-                throw new RuntimeException("expected exp in rdy update policy");
-            }
-            return false;
-        }
-    }
-
     @Test
     public void testMakebadOfRdyRedistribute() throws Exception {
         logger.info("[testMakebadOfRdyRedistribute] starts.");
diff --git a/src/test/java/it/youzan/nsq/client/ITTagConsumer.java b/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
index cc3c558..7ac8b6c 100644
--- a/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
+++ b/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
@@ -3,10 +3,10 @@
 import com.youzan.nsq.client.Consumer;
 import com.youzan.nsq.client.ConsumerImplV2;
 import com.youzan.nsq.client.MessageHandler;
+import com.youzan.nsq.client.entity.ConsumeMessageFilterMode;
 import com.youzan.nsq.client.entity.DesiredTag;
 import com.youzan.nsq.client.entity.NSQConfig;
 import com.youzan.nsq.client.entity.NSQMessage;
-import com.youzan.nsq.client.exception.NSQException;
 import com.youzan.nsq.client.utils.TopicUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -216,6 +216,7 @@ public void testConsumerWithHeaderFilter() throws Exception {
             NSQConfig config = new NSQConfig("BaseConsumer");
             config.setLookupAddresses(props.getProperty("lookup-addresses"));
             config.setConsumeMessageFilter("filter_key1", "filter_val1");
+            config.setConsumeMessageFilterMode(ConsumeMessageFilterMode.EXACT_MATCH);
             final AtomicBoolean fail = new AtomicBoolean(false);
             final CountDownLatch latch = new CountDownLatch(5);
             consumer = new ConsumerImplV2(config, new MessageHandler() {

From 47b4c989755f7d6fe5a9cb8dfd9ac6d9b1329067 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=85=83=E5=AE=88?= 
Date: Thu, 30 Nov 2017 18:08:02 +0800
Subject: [PATCH 6/7] remove limit in requeue timeout
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: 元守 
---
 src/main/java/com/youzan/nsq/client/entity/NSQConfig.java | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
index e08defb..76cd3eb 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
@@ -216,7 +216,7 @@ public NSQConfig setAttemptErrorThresdhold(int attemptErrorThresdhold) {
     // 1 seconds
     public static final int _MIN_NEXT_CONSUMING_IN_SECOND = 0;
     // one hour is the limit
-    public static final int _MAX_NEXT_CONSUMING_IN_SECOND = 3600;
+    public static final int _MAX_NEXT_CONSUMING_IN_SECOND = 24 * 3600;
     private int nextConsumingInSecond = 60;
     private long maxConnWait = 200L;
     private int minIdleConn = 2;
@@ -850,10 +850,8 @@ public NSQConfig setNextConsumingInSeconds(int timeout) {
         if (timeout < NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND) {
             throw new IllegalArgumentException(
                     "Next consuming in second is illegal. It is too small. " + NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND);
-        }
-        if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
-            throw new IllegalArgumentException(
-                    "Next consuming in second is illegal. It is too big. " + NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
+        } else if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
+            logger.warn("Next consuming in second is larger than {}. It may be limited to max value in server side.", NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
         }
         this.nextConsumingInSecond = timeout;
         return this;

From 6b3bfd9a21451cc3e6e899345c8a2c558a51c2e6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=85=83=E5=AE=88?= 
Date: Thu, 30 Nov 2017 18:15:56 +0800
Subject: [PATCH 7/7] remove requeue timeout limit in NSQMessage
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: 元守 
---
 src/main/java/com/youzan/nsq/client/entity/NSQMessage.java | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java b/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java
index fd0d90f..dcb97ab 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java
@@ -324,10 +324,8 @@ public void setNextConsumingInSecond(Integer nextConsumingInSecond) throws NSQEx
             if (timeout < NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND) {
                 throw new IllegalArgumentException(
                         "NextConsumingInSecond is illegal. It is too small. " + NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND);
-            }
-            if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
-                throw new IllegalArgumentException(
-                        "NextConsumingInSecond is illegal. It is too big. " + NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
+            } else if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
+                logger.warn("Next consuming in second is larger than {}. It may be limited to max value in server side.", NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
             }
         }
         this.nextConsumingInSecond = nextConsumingInSecond;