diff --git a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
index a172ad7..170f60e 100644
--- a/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
@@ -54,7 +54,6 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
private volatile long lastConnecting = 0L;
private volatile long lastSuccess = 0L;
- private volatile float consumptionRate = 0f;
private final AtomicLong received = new AtomicLong(0);
private final AtomicLong success = new AtomicLong(0);
@@ -81,8 +80,7 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
/*
* schedule executor for updating nsqd connections in effect
*/
- private final ScheduledExecutorService scheduler = Executors
- .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getSimpleName(), Thread.NORM_PRIORITY));
+ private ScheduledExecutorService scheduler;
/*
* message handler
*/
@@ -113,7 +111,7 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
*/
public ConsumerImplV2(NSQConfig config) {
this.config = config;
- this.simpleClient = new NSQSimpleClient(Role.Consumer, this.config.getUserSpecifiedLookupAddress());
+ this.simpleClient = new NSQSimpleClient(Role.Consumer, this.config.getUserSpecifiedLookupAddress(), this.config);
//initialize netty component
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(config.getNettyPoolSize());
@@ -125,7 +123,11 @@ public ConsumerImplV2(NSQConfig config) {
bootstrap.handler(new NSQClientInitializer());
//initialize consumer worker size
executor = Executors.newFixedThreadPool(this.config.getConsumerWorkerPoolSize(),
- new NamedThreadFactory(this.getClass().getSimpleName() + "-ClientBusiness", Thread.MAX_PRIORITY));
+ new NamedThreadFactory(this.getClass().getSimpleName() + "-ClientBusiness-" + this.config.getConsumerName(), Thread.MAX_PRIORITY));
+ String consumerName = (null == this.config) ? "-null" : "-" + this.config.getConsumerName();
+ //intialize consumer simple client thread
+ scheduler = Executors
+ .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getSimpleName() + consumerName, Thread.NORM_PRIORITY));
}
/**
@@ -231,7 +233,7 @@ public void start() throws NSQException {
//start consumer
if (this.started.compareAndSet(Boolean.FALSE, Boolean.TRUE) && cLock.writeLock().tryLock()) {
String configJsonStr = NSQConfig.parseNSQConfig(this.config);
- logger.info("Config for consumer {}: {}", this, configJsonStr);
+ logger.info("Consumer {} starts with config: {}", this, configJsonStr);
try {
//validate that consumer have right lookup address source
if (!validateLookupdSource()) {
@@ -252,14 +254,6 @@ public void start() throws NSQException {
}
}
- /**
- * update consumption rate according to message consumption last _INTERVAL_IN_SECOND
- */
- private void updateConsumptionRate() {
- consumptionRate = ((long)success.get() - lastSuccess) / _INTERVAL_IN_SECOND;
- lastSuccess = success.get();
- }
-
public boolean isConsumptionEstimateElapseTimeout() {
// return consumptionRate * queue4Consume.get() *1000 >= this.config.getMsgTimeoutInMillisecond();
return false;
@@ -594,6 +588,18 @@ public void incoming(final NSQFrame frame, final NSQConnection conn) throws NSQE
}
}
+ protected boolean checkExtFilter(final NSQMessage message, final NSQConnection conn) {
+ String filterKey = this.config.getConsumeMessageFilterKey();
+ if (conn.isExtend() && StringUtils.isNotBlank(filterKey)) {
+ String filterDataInHeader = (String) message.getExtByName(filterKey);
+ String filterDataInConf = this.config.getConsumeMessageFilterValue();
+ if(!this.config.getConsumeMessageFilterMode().getFilter().apply(filterDataInConf, filterDataInHeader)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private void processMessage(final NSQMessage message, final NSQConnection connection) {
if (logger.isDebugEnabled()) {
logger.debug(message.toString());
@@ -658,6 +664,7 @@ private void consume(final NSQMessage message, final NSQConnection connection) {
boolean retry;
boolean explicitRequeue = false;
boolean skip = needSkip(message);
+ skip = skip || !checkExtFilter(message, connection);
long start = System.currentTimeMillis();
try {
@@ -740,6 +747,9 @@ private void consume(final NSQMessage message, final NSQConnection connection) {
final byte[] id = message.getMessageID();
logger.info("Client does a re-queue explicitly. MessageID: {} , Hex: {}", id, message.newHexString(id));
}
+ } else if (skip) {
+ // Finish
+ cmd = new Finish(message.getMessageID());
} else {
// ignore actions
cmd = null;
@@ -760,10 +770,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}
});
- if (cmd instanceof Finish) {
- finished.incrementAndGet();
- } else {
- re.incrementAndGet();
+ if(!skip) {
+ if (cmd instanceof Finish) {
+ finished.incrementAndGet();
+ } else {
+ re.incrementAndGet();
+ }
}
}
}
@@ -771,6 +783,21 @@ public void operationComplete(ChannelFuture future) throws Exception {
// Post
//log warn
if (!ok) {
+ int attempt = message.getReadableAttempts();
+ int warningThreshold = this.config.getAttemptWarningThresdhold();
+ int errorThreshold = this.config.getAttemptErrorThresdhold();
+
+ if(errorThreshold > 0 && attempt > errorThreshold){
+ if(0==attempt%errorThreshold) {
+ logger.error("Message attempts number has been {}, consider logging message content and finish. {}", attempt, message.toString());
+ }
+ } else if (warningThreshold > 0 && attempt > warningThreshold){
+ if(attempt > warningThreshold) {
+ if(0==attempt%warningThreshold) {
+ logger.warn("Message attempts number has been {}, consider logging message content and finish. {}", attempt, message.toString());
+ }
+ }
+ }
//TODO: connection.setMessageConsumptionFailed(start);
// logger.warn("Exception occurs in message handler. Please check it right now {} , Original message: {}.", message, message.getReadableContent());
} else if (!this.config.isOrdered()){
@@ -957,6 +984,10 @@ public void finish(final NSQMessage message) throws NSQException {
return;
}
final NSQConnection conn = address_2_conn.get(message.getAddress());
+ _finish(message, conn);
+ }
+
+ private void _finish(final NSQMessage message, final NSQConnection conn) throws NSQNoConnectionException {
if (conn != null) {
if (conn.getId() == message.getConnectionID().intValue()) {
if (conn.isConnected()) {
@@ -1046,19 +1077,11 @@ public String toString() {
} catch (IOException e) {
logger.warn(e.getLocalizedMessage());
}
- return "[Consumer] at " + ipStr;
+ return "[Consumer@" + this.config.getConsumerName() + "] " + super.toString() + " at " + ipStr;
}
@Override
public float getLoadFactor() {
-// ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
-// int active = pool.getActiveCount();
-// long queueSize = queue4Consume.get();
-// if(active > 0)
-// return (float)queueSize/active;
-// else {
-// return 0f;
-// }
return 0f;
}
diff --git a/src/main/java/com/youzan/nsq/client/MessageReceipt.java b/src/main/java/com/youzan/nsq/client/MessageReceipt.java
new file mode 100644
index 0000000..360f1d7
--- /dev/null
+++ b/src/main/java/com/youzan/nsq/client/MessageReceipt.java
@@ -0,0 +1,96 @@
+package com.youzan.nsq.client;
+
+import com.youzan.nsq.client.entity.Message;
+
+/**
+ * Message receipt for publish response, for normal publish(PUB),
+ * it contains: topic name, partition number, nsqd address(host:port);
+ * if current receipt is for trace publish(PUB_TARCE), when {@link Message#traced()},
+ * following info contains, besides topic and nsqd info mentioned above:
+ *
+ * [internalID]
+ * [traceID]
+ * [diskQueueOffset]
+ * [diskQueueDataSize]
+ */
+public class MessageReceipt implements MessageMetadata {
+ private long internalID;
+ private long traceID;
+ private long diskQueueOffset=-1l;
+ private int diskQueueSize=-1;
+ private String topicName;
+ private int partition;
+ private String nsqdAddr;
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
+ public String getNsqdAddr() {
+ return nsqdAddr;
+ }
+
+ public void setNsqdAddr(String nsqdAddr) {
+ this.nsqdAddr = nsqdAddr;
+ }
+
+ public MessageReceipt() {
+
+ }
+
+ public void setInternalID(long internalID) {
+ this.internalID = internalID;
+ }
+
+ public long getInternalID() {
+ return internalID;
+ }
+
+ public long getTraceID() {
+ return traceID;
+ }
+
+ public void setTraceID(long traceID) {
+ this.traceID = traceID;
+ }
+
+ public long getDiskQueueOffset() {
+ return diskQueueOffset;
+ }
+
+ public void setDiskQueueOffset(long diskQueueOffset) {
+ this.diskQueueOffset = diskQueueOffset;
+ }
+
+ public int getDiskQueueSize() {
+ return diskQueueSize;
+ }
+
+ public void setDiskQueueSize(int diskQueueSize) {
+ this.diskQueueSize = diskQueueSize;
+ }
+
+ @Override
+ public String toMetadataStr() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClass().toString() + " meta-data:").append("\n");
+ sb.append("\t[internalID]:\t").append(internalID).append("\n");
+ sb.append("\t[traceID]:\t").append(traceID).append("\n");
+ sb.append("\t[diskQueueOffset]:\t").append(diskQueueOffset).append("\n");
+ sb.append("\t[diskQueueDataSize]:\t").append(diskQueueSize).append("\n");
+ sb.append(this.getClass().toString() + " end.");
+ return sb.toString();
+ }
+}
diff --git a/src/main/java/com/youzan/nsq/client/Producer.java b/src/main/java/com/youzan/nsq/client/Producer.java
index 5795eb7..dc71ad1 100644
--- a/src/main/java/com/youzan/nsq/client/Producer.java
+++ b/src/main/java/com/youzan/nsq/client/Producer.java
@@ -47,6 +47,15 @@ public interface Producer extends Client, Closeable {
void publish(Message message) throws NSQException;
+ /**
+ * publish message and return receipt contains target nsqd&topic info for normal publish,
+ * and message meta info(internal id, disk offset, disk size, trace id) if message is traced.
+ * @param message
+ * @return receipt
+ * @throws NSQException
+ */
+ MessageReceipt publishAndGetReceipt(Message message) throws NSQException;
+
/**
* Use it to produce only one 'message' sending to MQ.
* partition info is not specified in this function,
diff --git a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
index 0b25900..1b73ed6 100644
--- a/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
+++ b/src/main/java/com/youzan/nsq/client/ProducerImplV2.java
@@ -14,6 +14,7 @@
import com.youzan.util.HostUtil;
import com.youzan.util.IOUtil;
import com.youzan.util.NamedThreadFactory;
+import com.youzan.util.ProducerWorkerThreadFactory;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.slf4j.Logger;
@@ -50,8 +51,7 @@ public class ProducerImplV2 implements Producer {
private final KeyedPooledConnectionFactory factory;
private GenericKeyedObjectPool
bigPool = null;
private final ScheduledExecutorService scheduler = Executors
- .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getName(), Thread.NORM_PRIORITY));
-
+ .newSingleThreadScheduledExecutor(new ProducerWorkerThreadFactory(this.getClass().getName(), Thread.NORM_PRIORITY));
private final ExecutorService pubExec;
@@ -68,7 +68,7 @@ public class ProducerImplV2 implements Producer {
*/
public ProducerImplV2(NSQConfig config) {
this.config = config;
- this.simpleClient = new NSQSimpleClient(Role.Producer, this.config.getUserSpecifiedLookupAddress());
+ this.simpleClient = new NSQSimpleClient(Role.Producer, this.config.getUserSpecifiedLookupAddress(), this.config);
this.poolConfig = new GenericKeyedObjectPoolConfig();
this.factory = new KeyedPooledConnectionFactory(this.config, this);
@@ -295,6 +295,11 @@ public void publish(String message, final Topic topic, Object shardingID) throws
@Override
public void publish(final Message message) throws NSQException {
+ this.publishAndGetReceipt(message);
+ }
+
+ @Override
+ public MessageReceipt publishAndGetReceipt(final Message message) throws NSQException {
final Context cxt = new Context();
if(PERF_LOG.isDebugEnabled()) {
cxt.setTraceID(pubTraceIdGen.getAndIncrement());
@@ -313,7 +318,7 @@ public void publish(final Message message) throws NSQException {
try{
//TODO: poll before timeout
- sendPUB(message, cxt);
+ return sendPUB(message, cxt);
}catch (NSQPubException pubE){
logger.error(pubE.getLocalizedMessage());
pubE.punchExceptions(logger);
@@ -440,7 +445,7 @@ public void publishMulti(List messages, String topic) throws NSQExceptio
this.publishMulti(messages, new Topic(topic));
}
- private void sendPUB(final Message msg, Context cxt) throws NSQException {
+ private MessageReceipt sendPUB(final Message msg, Context cxt) throws NSQException {
int c = 0; // be continuous
boolean returnCon;
NSQConnection conn = null;
@@ -487,11 +492,17 @@ private void sendPUB(final Message msg, Context cxt) throws NSQException {
}
//create PUB command
try {
+ String host = conn.getAddress().getHost();
+ int port = conn.getAddress().getPort();
+ String topic = conn.getAddress().getTopic();
+ int partition = -1;
+
final Pub pub = createPubCmd(msg);
//check if address has partition info, if it does, update pub's partition
if(conn.getAddress().hasPartition()) {
- pub.overrideDefaultPartition(conn.getAddress().getPartition());
+ partition = conn.getAddress().getPartition();
+ pub.overrideDefaultPartition(partition);
}
long pubAndWaitStart = System.currentTimeMillis();
@@ -505,10 +516,21 @@ private void sendPUB(final Message msg, Context cxt) throws NSQException {
}
handleResponse(msg.getTopic(), frame, conn);
+ //when hit this line what we have are response frame
success.addAndGet(msg.getMessageCount());
- if(msg.isTraced() && frame instanceof MessageMetadata && TraceLogger.isTraceLoggerEnabled() && conn.getAddress().isHA())
- TraceLogger.trace(this, conn, (MessageMetadata) frame);
- break;
+ if(msg.isTraced() && frame instanceof ResponseFrame && conn.getAddress().isHA()) {
+ if (TraceLogger.isTraceLoggerEnabled())
+ TraceLogger.trace(this, conn, (MessageMetadata) frame);
+ }
+ ResponseFrame response = (ResponseFrame) frame;
+ MessageReceipt receipt = response.getReceipt();
+ receipt.setNsqdAddr(host + ":" + port);
+ receipt.setTopicName(topic);
+ receipt.setPartition(partition);
+ if(PERF_LOG.isDebugEnabled()){
+ PERF_LOG.debug("{}: Producer took {} milliSec to send message to {}", cxt.getTraceID(), System.currentTimeMillis() - start, conn.getAddress());
+ }
+ return receipt;
}
catch(NSQPubFactoryInitializeException | NSQTagException | NSQTopicNotExtendableException | NSQExtNotSupportedException expShouldFail) {
throw expShouldFail;
@@ -554,12 +576,7 @@ private void sendPUB(final Message msg, Context cxt) throws NSQException {
}
}
} // end loop
- if (c >= retry) {
- throw new NSQPubException(exceptions);
- }
- if(PERF_LOG.isDebugEnabled()){
- PERF_LOG.debug("{}: Producer took {} milliSec to send message to {}", cxt.getTraceID(), System.currentTimeMillis() - start, conn.getAddress());
- }
+ throw new NSQPubException(exceptions);
}
/**
@@ -740,6 +757,6 @@ public String toString(){
} catch (IOException e) {
logger.warn(e.getLocalizedMessage());
}
- return "[Producer] at " + ipStr;
+ return "[Producer] " + super.toString() + " at " + ipStr;
}
}
diff --git a/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java b/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java
index 24bc079..b1089f5 100644
--- a/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java
+++ b/src/main/java/com/youzan/nsq/client/core/NSQSimpleClient.java
@@ -57,8 +57,7 @@ public class NSQSimpleClient implements Client, Closeable {
/*
*single schedule executor for maintaining topic to partition map
*/
- private final ScheduledExecutorService scheduler = Executors
- .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getName(), Thread.MAX_PRIORITY));
+ private ScheduledExecutorService scheduler;
/*
* role of client current simple client nested
@@ -67,11 +66,14 @@ public class NSQSimpleClient implements Client, Closeable {
private final LookupService lookup;
private final boolean useLocalLookupd;
- public NSQSimpleClient(Role role, boolean localLookupd) {
+ public NSQSimpleClient(Role role, boolean localLookupd, final NSQConfig config) {
this.role = role;
this.lookupLocalID = CLIENT_ID.incrementAndGet();
this.lookup = new LookupServiceImpl(role, this.lookupLocalID);
this.useLocalLookupd = localLookupd;
+ String consumerName = (null == config || role != Role.Consumer) ? "-null" : "-" + config.getConsumerName();
+ scheduler = Executors
+ .newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getName() + "-" + role.getRoleTxt() + consumerName, Thread.MAX_PRIORITY));
}
/**
diff --git a/src/main/java/com/youzan/nsq/client/core/command/PubExt.java b/src/main/java/com/youzan/nsq/client/core/command/PubExt.java
index 99d83b8..1a252a5 100644
--- a/src/main/java/com/youzan/nsq/client/core/command/PubExt.java
+++ b/src/main/java/com/youzan/nsq/client/core/command/PubExt.java
@@ -18,6 +18,9 @@ public class PubExt extends Pub {
private byte[] jsonHeaderBytes;
public static final String CLIENT_TAG_KEY = "##client_dispatch_tag";
public static final String TRACE_ID_KEY = "##trace_id";
+ public static final String FILTER_EXT_KEY = "filter_ext_key";
+ public static final String FILTER_DATA = "filter_data";
+
/**
* @param msg message object
diff --git a/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilter.java b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilter.java
new file mode 100644
index 0000000..24e1fcd
--- /dev/null
+++ b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilter.java
@@ -0,0 +1,33 @@
+package com.youzan.nsq.client.entity;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Created by lin on 17/11/20.
+ */
+public interface ConsumeMessageFilter {
+ ConsumeMessageFilter EXACT_MATCH_FILTER = new ConsumeMessageFilter() {
+
+ @Override
+ public boolean apply(String filter, String filterVal) {
+ if(StringUtils.isBlank(filter)) {
+ return true;
+ } else if (filter.equals(filterVal)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int getType() {
+ return 1;
+ }
+
+
+ };
+
+ boolean apply(String filter, String filterVal);
+
+ int getType();
+}
diff --git a/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilterMode.java b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilterMode.java
new file mode 100644
index 0000000..1557aaa
--- /dev/null
+++ b/src/main/java/com/youzan/nsq/client/entity/ConsumeMessageFilterMode.java
@@ -0,0 +1,22 @@
+package com.youzan.nsq.client.entity;
+
+import static com.youzan.nsq.client.entity.ConsumeMessageFilter.EXACT_MATCH_FILTER;
+
+/**
+ * Created by lin on 17/11/20.
+ */
+public enum ConsumeMessageFilterMode {
+ EXACT_MATCH(EXACT_MATCH_FILTER);
+// REGEXP_MATCH,
+// GLOB_MATCH
+
+ private ConsumeMessageFilter currentMessageFilter;
+
+ ConsumeMessageFilterMode(ConsumeMessageFilter filter){
+ currentMessageFilter = filter;
+ }
+
+ public ConsumeMessageFilter getFilter() {
+ return this.currentMessageFilter;
+ }
+}
diff --git a/src/main/java/com/youzan/nsq/client/entity/DefaultRdyUpdatePolicy.java b/src/main/java/com/youzan/nsq/client/entity/DefaultRdyUpdatePolicy.java
deleted file mode 100644
index 2171e89..0000000
--- a/src/main/java/com/youzan/nsq/client/entity/DefaultRdyUpdatePolicy.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.youzan.nsq.client.entity;
-
-/**
- * @deprecated
- * Created by lin on 17/7/31.
- */
-public class DefaultRdyUpdatePolicy implements IRdyUpdatePolicy {
- public final static float THRESDHOLD = 1.5f;
- public final static float WATER_HIGH = 1.75f;
-
- @Override
- public boolean rdyShouldIncrease(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
- return !mayTimeout && scheduleLoad <= THRESDHOLD;
- }
-
- @Override
- public boolean rdyShouldDecline(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
- return scheduleLoad >= WATER_HIGH && mayTimeout;
- }
-}
diff --git a/src/main/java/com/youzan/nsq/client/entity/IRdyUpdatePolicy.java b/src/main/java/com/youzan/nsq/client/entity/IRdyUpdatePolicy.java
deleted file mode 100644
index dc5bd99..0000000
--- a/src/main/java/com/youzan/nsq/client/entity/IRdyUpdatePolicy.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.youzan.nsq.client.entity;
-
-/**
- * @deprecated
- * Created by lin on 17/7/31.
- */
-public interface IRdyUpdatePolicy {
-
- /**
- * indicate whether rdy should increase.
- * @param scheduleLoad scheduleLoad for {@link com.youzan.nsq.client.Consumer} which has current policy applied.
- * scheduleLoad = (float)(message for processing in queue) / (message being processing)
- *
- * @param mayTimeout indicate whether message processing may timeout, given the message waiting for process
- * in queue and the average process time elapse:
- * (message consumption rate) * (message # in queue) >= NSQConfig.getMsgTiemoutInMilliSe
- * @param maxRdyPerCon max rdy per connection allowed
- *
- * @param extraRdy extra Rdy could be allocated for current topic's connections
- *
- * @return true indicates rdy should increase, otherwise false
- */
- boolean rdyShouldIncrease(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy);
-
- /**
- * indicate whether rdy should increase.
- * @param scheduleLoad scheduleLoad for {@link com.youzan.nsq.client.Consumer} which has current policy applied.
- * scheduleLoad = (float)(message for processing in queue) / (message being processing)
- *
- * @param mayTimeout indicate whether message processing may timeout, given the message waiting for process
- * in queue and the average process time elapse:
- * (message consumption rate) * (message # in queue) >= NSQConfig#getMsgTiemoutInMilliSe
- * @param maxRdyPerCon max rdy per connection allowed
- *
- * @param extraRdy extra Rdy could be allocated for current topic's connections
- *
- * @return true indicates rdy should decline, otherwise false
- */
- boolean rdyShouldDecline(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy);
-}
diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
index a396028..76cd3eb 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQConfig.java
@@ -8,6 +8,8 @@
import com.youzan.util.NotThreadSafe;
import com.youzan.util.SystemUtil;
import io.netty.handler.ssl.SslContext;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,6 +18,8 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.commons.lang3.tuple.Pair.of;
+
/**
* It is used for Producer or Consumer, and not both two.
*
@@ -27,6 +31,7 @@ public class NSQConfig implements java.io.Serializable, Cloneable {
private static final long serialVersionUID = 6624842850216901700L;
private static final Logger logger = LoggerFactory.getLogger(NSQConfig.class);
private static final ObjectMapper MAPPER_CONFIG = new ObjectMapper();
+
static {
MAPPER_CONFIG.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
}
@@ -84,7 +89,7 @@ public void setExpectedRdyUpdatePolicy(String policyClass) {
clazz = Class.forName(policyClass);
}
Object policy = clazz.newInstance();
- if (policy instanceof IRdyUpdatePolicy) {
+ if (policy instanceof IExpectedRdyUpdatePolicy) {
@SuppressWarnings("unchecked")
IExpectedRdyUpdatePolicy expRdyPolicy = (IExpectedRdyUpdatePolicy) policy;
this.expRdyUpdatePolicy = expRdyPolicy;
@@ -143,6 +148,28 @@ public enum Compression {
private boolean featureNegotiation;
private boolean userSpecifiedLookupd = false;
private Map localTraceMap = new ConcurrentHashMap<>();
+
+ public int getAttemptWarningThresdhold() {
+ return attemptWarningThresdhold;
+ }
+
+ public NSQConfig setAttemptWarningThresdhold(int attemptWarningThresdhold) {
+ this.attemptWarningThresdhold = attemptWarningThresdhold;
+ return this;
+ }
+
+ public int getAttemptErrorThresdhold() {
+ return attemptErrorThresdhold;
+ }
+
+ public NSQConfig setAttemptErrorThresdhold(int attemptErrorThresdhold) {
+ this.attemptErrorThresdhold = attemptErrorThresdhold;
+ return this;
+ }
+
+ private int attemptWarningThresdhold = 10;
+ private int attemptErrorThresdhold = 50;
+
/*-
* =========================================================================
* All of Timeout
@@ -189,7 +216,7 @@ public enum Compression {
// 1 seconds
public static final int _MIN_NEXT_CONSUMING_IN_SECOND = 0;
// one hour is the limit
- public static final int _MAX_NEXT_CONSUMING_IN_SECOND = 3600;
+ public static final int _MAX_NEXT_CONSUMING_IN_SECOND = 24 * 3600;
private int nextConsumingInSecond = 60;
private long maxConnWait = 200L;
private int minIdleConn = 2;
@@ -796,7 +823,11 @@ public String identify(boolean topicExt) {
buffer.append("\"desired_tag\":\"" + tag.getTagName() + "\",");
}
buffer.append("\"extend_support\":" + topicExt + ",");
- buffer.append("\"user_agent\": \"" + userAgent + "\"}");
+ buffer.append("\"user_agent\": \"" + userAgent + "\"");
+ //ext header feature
+ if(StringUtils.isNotBlank(this.getConsumeMessageFilterKey()))
+ buffer.append(", \"ext_filter\":{\"type\":" + this.getConsumeMessageFilterMode().getFilter().getType() + ", \"filter_ext_key\":\"" + this.getConsumeMessageFilterKey() + "\", \"filter_data\":\"" + this.getConsumeMessageFilterValue() + "\"}");
+ buffer.append("}");
return buffer.toString();
}
@@ -819,10 +850,8 @@ public NSQConfig setNextConsumingInSeconds(int timeout) {
if (timeout < NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND) {
throw new IllegalArgumentException(
"Next consuming in second is illegal. It is too small. " + NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND);
- }
- if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
- throw new IllegalArgumentException(
- "Next consuming in second is illegal. It is too big. " + NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
+ } else if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
+ logger.warn("Next consuming in second is larger than {}. It may be limited to max value in server side.", NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
}
this.nextConsumingInSecond = timeout;
return this;
@@ -921,7 +950,7 @@ private enum ConsumePolicy {
SKIP
}
- private Map> consumePolcyMap = new ConcurrentHashMap<>();
+ private Map> consumePolicyMap = new ConcurrentHashMap<>();
/**
* Set extension key/value map for consumer to skip, when json extension header in one message
@@ -934,7 +963,7 @@ private enum ConsumePolicy {
NSQConfig setMessageSkipExtensionKVMap(final Map extensionKV) {
if(null == extensionKV || extensionKV.size() == 0)
return this;
- this.consumePolcyMap.put(ConsumePolicy.SKIP, Collections.unmodifiableMap(extensionKV));
+ this.consumePolicyMap.put(ConsumePolicy.SKIP, Collections.unmodifiableMap(extensionKV));
return this;
}
@@ -975,7 +1004,7 @@ public boolean getBlockWhenBorrowConn4Producer() {
* @return extension key/value map for consumer to skip, which is {@link Collections.UnmodifiableMap}.
*/
public Map getMessageSkipExtensionKVMap() {
- return this.consumePolcyMap.get(ConsumePolicy.SKIP);
+ return this.consumePolicyMap.get(ConsumePolicy.SKIP);
}
private int producerPoolSize = 4;
@@ -995,6 +1024,55 @@ public int getPublishWorkerPoolSize() {
return this.producerPoolSize;
}
+ //consume message filter value default value is null, which means no filter applied
+ private Pair consumeMsgFilterKV = null;
+
+ /**
+ * set consume message filter value, default value is null, which means there is NO message filter applied for
+ * message consumer which has current {@link NSQConfig} applied.
+ * @param key extension filter key to locate extension value
+ * @param filterVal expected extension value to match
+ * @return NSQConfig
+ */
+ public NSQConfig setConsumeMessageFilter(String key, String filterVal) {
+ this.consumeMsgFilterKV = of(key, filterVal);
+ return this;
+ }
+
+ public String getConsumeMessageFilterKey() {
+ if(null != this.consumeMsgFilterKV)
+ return this.consumeMsgFilterKV.getKey();
+ else
+ return "";
+ }
+
+ public String getConsumeMessageFilterValue() {
+ if(null != this.consumeMsgFilterKV)
+ return this.consumeMsgFilterKV.getValue();
+ else
+ return "";
+ }
+
+ private ConsumeMessageFilterMode consumeMessageFilterMode = ConsumeMessageFilterMode.EXACT_MATCH;
+
+ /**
+ * specify message consume filter, default value is {@link ConsumeMessageFilterMode#EXACT_MATCH}
+ * @param filterMode
+ * @return {@link NSQConfig}
+ */
+ public NSQConfig setConsumeMessageFilterMode(ConsumeMessageFilterMode filterMode) {
+ if(null == filterMode) {
+ logger.error("null filter mode is not allowed ");
+ } else {
+ this.consumeMessageFilterMode = filterMode;
+ }
+ return this;
+ }
+
+ public ConsumeMessageFilterMode getConsumeMessageFilterMode() {
+ return this.consumeMessageFilterMode;
+ }
+
@Override
public Object clone() {
NSQConfig newCfg = null;
diff --git a/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java b/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java
index fd0d90f..dcb97ab 100644
--- a/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java
+++ b/src/main/java/com/youzan/nsq/client/entity/NSQMessage.java
@@ -324,10 +324,8 @@ public void setNextConsumingInSecond(Integer nextConsumingInSecond) throws NSQEx
if (timeout < NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND) {
throw new IllegalArgumentException(
"NextConsumingInSecond is illegal. It is too small. " + NSQConfig._MIN_NEXT_CONSUMING_IN_SECOND);
- }
- if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
- throw new IllegalArgumentException(
- "NextConsumingInSecond is illegal. It is too big. " + NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
+ } else if (timeout > NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND) {
+ logger.warn("Next consuming in second is larger than {}. It may be limited to max value in server side.", NSQConfig._MAX_NEXT_CONSUMING_IN_SECOND);
}
}
this.nextConsumingInSecond = nextConsumingInSecond;
diff --git a/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java b/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java
index a68b213..1d44297 100644
--- a/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java
+++ b/src/main/java/com/youzan/nsq/client/network/frame/ResponseFrame.java
@@ -1,6 +1,8 @@
package com.youzan.nsq.client.network.frame;
import com.youzan.nsq.client.MessageMetadata;
+import com.youzan.nsq.client.MessageReceipt;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -8,6 +10,7 @@
public class ResponseFrame extends NSQFrame implements MessageMetadata{
private static final Logger logger = LoggerFactory.getLogger(ResponseFrame.class);
+ private MessageReceipt receipt = new MessageReceipt();
@Override
public FrameType getType() {
@@ -19,6 +22,12 @@ public String getMessage() {
return new String(getData(), DEFAULT_CHARSET).trim();
}
+ @Override
+ public void setData(byte[] data) {
+ super.setData(data);
+ generateReceipt();
+ }
+
@Override
public String toString() {
return "ResponseFrame: " + this.getMessage();
@@ -29,37 +38,55 @@ public String toMetadataStr() {
String resMsg = getMessage();
//check if has meta data
if(resMsg.startsWith("OK") && resMsg.length() > 2){
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClass().toString() + " meta-data:").append("\n");
+ sb.append("\t[internalID]:\t").append(receipt.getInternalID()).append("\n");
+ sb.append("\t[traceID]:\t").append(receipt.getTraceID()).append("\n");
+ sb.append("\t[diskQueueOffset]:\t").append(receipt.getDiskQueueOffset()).append("\n");
+ sb.append("\t[diskQueueDataSize]:\t").append(receipt.getDiskQueueSize()).append("\n");
+ sb.append(this.getClass().toString() + " end.");
+ return sb.toString();
+ }
+ return "No meta data";
+ }
+
+ /**
+ * Generate message receipt for publish response
+ * @return messageReceipt
+ */
+ private void generateReceipt() {
+ String resMsg = getMessage();
+ //check if has meta data
+ if (resMsg.startsWith("OK") && resMsg.length() > 2) {
byte[] data = getData();
//internal ID
byte[] internalIDByte = new byte[8];
System.arraycopy(data, 2, internalIDByte, 0, 8);
long internalID = ByteBuffer.wrap(internalIDByte).getLong();
+ receipt.setInternalID(internalID);
//traceID
byte[] traceIDByte = new byte[8];
System.arraycopy(data, 10, traceIDByte, 0, 8);
long traceID = ByteBuffer.wrap(traceIDByte).getLong();
+ receipt.setTraceID(traceID);
//disk queue offset
byte[] diskqueueOffsetByte = new byte[8];
System.arraycopy(data, 18, diskqueueOffsetByte, 0, 8);
long diskQueueOffset = ByteBuffer.wrap(diskqueueOffsetByte).getLong();
+ receipt.setDiskQueueOffset(diskQueueOffset);
//disk queue data size
- byte[] diskQueueSizeByte = new byte[4];
- System.arraycopy(data, 26, diskQueueSizeByte, 0, 4);
- int diskQueueSize = ByteBuffer.wrap(diskqueueOffsetByte).getInt();
-
- StringBuilder sb = new StringBuilder();
- sb.append(this.getClass().toString() + " meta-data:").append("\n");
- sb.append("\t[internalID]:\t").append(internalID).append("\n");
- sb.append("\t[traceID]:\t").append(traceID).append("\n");
- sb.append("\t[diskQueueOffset]:\t").append(diskQueueOffset).append("\n");
- sb.append("\t[diskQueueDataSize]:\t").append(diskQueueSize).append("\n");
- sb.append(this.getClass().toString() + " end.");
- return sb.toString();
+ byte[] diskqueueSizeByte = new byte[4];
+ System.arraycopy(data, 26, diskqueueSizeByte, 0, 4);
+ int diskQueueSize = ByteBuffer.wrap(diskqueueSizeByte).getInt();
+ receipt.setDiskQueueSize(diskQueueSize);
}
- return "No meta data";
+ }
+
+ public MessageReceipt getReceipt() {
+ return this.receipt;
}
}
diff --git a/src/main/java/com/youzan/util/ProducerWorkerThreadFactory.java b/src/main/java/com/youzan/util/ProducerWorkerThreadFactory.java
new file mode 100644
index 0000000..7b6937f
--- /dev/null
+++ b/src/main/java/com/youzan/util/ProducerWorkerThreadFactory.java
@@ -0,0 +1,56 @@
+package com.youzan.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Producer worker thread factory
+ * Created by lin on 17/11/20.
+ */
+public class ProducerWorkerThreadFactory implements ThreadFactory {
+ private static final Logger logger = LoggerFactory.getLogger(ProducerWorkerThreadFactory.class);
+
+ private final ThreadGroup group;
+ private final String namePrefix;
+ private int priority = Integer.MIN_VALUE;
+
+ final static Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error(t.getName(), e);
+ }
+ };
+
+ public ProducerWorkerThreadFactory(String poolName, int priority) {
+ if (null == poolName || poolName.isEmpty()) {
+ throw new IllegalArgumentException();
+ }
+ group = Thread.currentThread().getThreadGroup();
+ namePrefix = poolName + "-Producer-Worker-Pool-Thread";
+ this.priority = priority;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+
+ final Thread t = new Thread(group, r, namePrefix, 0);
+ if (t.isDaemon()) {
+ t.setDaemon(false);
+ }
+ switch(priority){
+ case Thread.MAX_PRIORITY :
+ case Thread.MIN_PRIORITY:
+ case Thread.NORM_PRIORITY:
+ t.setPriority(priority);
+ break;
+ default: {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+
+ }
+ Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
+ return t;
+ }
+}
diff --git a/src/test/java/com/youzan/nsq/client/ConsumerTest.java b/src/test/java/com/youzan/nsq/client/ConsumerTest.java
index f603a5c..0d2a592 100644
--- a/src/test/java/com/youzan/nsq/client/ConsumerTest.java
+++ b/src/test/java/com/youzan/nsq/client/ConsumerTest.java
@@ -6,6 +6,7 @@
import com.youzan.nsq.client.exception.ExplicitRequeueException;
import com.youzan.nsq.client.exception.NSQException;
import com.youzan.nsq.client.utils.TopicUtil;
+import org.apache.commons.collections.map.HashedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -546,6 +547,63 @@ public void testSkipMessage() {
Assert.assertFalse(skipped);
}
+ @Test
+ public void testMessageHeaderFilter() {
+ NSQConfig config = new NSQConfig("BaseConsumer");
+ config.setConsumeMessageFilter("filter_key1", "filter_val1");
+ ConsumerImplV2 consumer = new ConsumerImplV2(config, new MessageHandler() {
+ @Override
+ public void process(NSQMessage message) {
+ //nothing happen
+ }
+ });
+ //mock a message frame, and NSQConnection
+ Map jsonExt = new HashedMap();
+ jsonExt.put("filter_key1", "filter_val1");
+ jsonExt.put("filter_key2", "filter_val2");
+
+ NSQMessage message = new NSQMessage();
+ message.setJsonExtHeader(jsonExt);
+
+ MockedNSQConnectionImpl conn = new MockedNSQConnectionImpl(0, new Address("127.0.0.1", 4150, "ha", "fakeTopic", 1, true), null, config);
+
+ Assert.assertTrue(consumer.checkExtFilter(message, conn));
+
+ Map jsonExtMissing = new HashedMap();
+ jsonExtMissing.put("filter_key3", "filter_val3");
+ jsonExtMissing.put("filter_key2", "filter_val2");
+
+ message.setJsonExtHeader(jsonExtMissing);
+ Assert.assertFalse(consumer.checkExtFilter(message, conn));
+
+ jsonExtMissing = new HashedMap();
+ jsonExtMissing.put("filter_key1", "filter_val1_missing");
+ jsonExtMissing.put("filter_key2", "filter_val2");
+
+ message.setJsonExtHeader(jsonExtMissing);
+ Assert.assertFalse(consumer.checkExtFilter(message, conn));
+
+ //test default
+ config = new NSQConfig("BaseConsumer");
+ consumer = new ConsumerImplV2(config, new MessageHandler() {
+ @Override
+ public void process(NSQMessage message) {
+ //nothing happen
+ }
+ });
+
+ message.setJsonExtHeader(jsonExtMissing);
+ Assert.assertTrue(consumer.checkExtFilter(message, conn));
+ }
+
+ @Test
+ public void testConsumerToString() {
+ NSQConfig config = new NSQConfig("BaseConsumer");
+ Consumer consumer = new ConsumerImplV2(config);
+ String consumerString = consumer.toString();
+ Assert.assertTrue(consumerString.contains("BaseConsumer"));
+ }
+
private ScheduledExecutorService keepMessagePublish(final Producer producer, final String topic, long interval) throws NSQException {
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
exec.scheduleWithFixedDelay(new Runnable() {
diff --git a/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java b/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java
index c753ea7..90d1ce3 100644
--- a/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java
+++ b/src/test/java/com/youzan/nsq/client/MockedNSQSimpleClient.java
@@ -21,7 +21,7 @@ public class MockedNSQSimpleClient extends NSQSimpleClient {
private final Logger logger = LoggerFactory.getLogger(MockedNSQSimpleClient.class.getName());
public MockedNSQSimpleClient(Role role, boolean localLookupd) {
- super(role, localLookupd);
+ super(role, localLookupd, null);
}
@Override
diff --git a/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java b/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java
index c0625b6..f18cb2d 100644
--- a/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java
+++ b/src/test/java/com/youzan/nsq/client/core/ConnectionManagerTest.java
@@ -759,27 +759,6 @@ public boolean isConsumptionEstimateElapseTimeout() {
}
}
- public static class BadRdyUpdatePolicy implements IRdyUpdatePolicy {
-
- @Override
- public boolean rdyShouldIncrease(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
- boolean flag = true;
- if(flag) {
- throw new RuntimeException("expected exp in rdy update policy");
- }
- return true;
- }
-
- @Override
- public boolean rdyShouldDecline(String topic, float scheduleLoad, boolean mayTimeout, int maxRdyPerCon, int extraRdy) {
- boolean flag = true;
- if(flag) {
- throw new RuntimeException("expected exp in rdy update policy");
- }
- return false;
- }
- }
-
@Test
public void testMakebadOfRdyRedistribute() throws Exception {
logger.info("[testMakebadOfRdyRedistribute] starts.");
diff --git a/src/test/java/it/youzan/nsq/client/ITProducer.java b/src/test/java/it/youzan/nsq/client/ITProducer.java
index 90d4d1b..c190e5f 100644
--- a/src/test/java/it/youzan/nsq/client/ITProducer.java
+++ b/src/test/java/it/youzan/nsq/client/ITProducer.java
@@ -1,6 +1,7 @@
package it.youzan.nsq.client;
import com.youzan.nsq.client.*;
+import com.youzan.nsq.client.entity.Message;
import com.youzan.nsq.client.entity.NSQConfig;
import com.youzan.nsq.client.entity.NSQMessage;
import com.youzan.nsq.client.entity.Topic;
@@ -49,9 +50,8 @@ public void init() throws Exception {
config.setThreadPoolSize4IO(Integer.valueOf(threadPoolSize4IO));
}
- @Test
public void multiPublishBatchError2() throws Exception {
- logger.info("[ITProducer#multiPublishBatch] starts");
+ logger.info("[ITProducer#multiPublishBatchError2] starts");
Producer producer = null;
Consumer consumer = null;
Topic topic = new Topic("JavaTesting-Producer-Base");
@@ -98,9 +98,8 @@ public void process(NSQMessage message) {
}
}
- @Test
public void multiPublishBatchError1() throws Exception {
- logger.info("[ITProducer#multiPublishBatch] starts");
+ logger.info("[ITProducer#multiPublishBatchError1] starts");
Producer producer = null;
Consumer consumer = null;
Topic topic = new Topic("JavaTesting-Producer-Base");
@@ -149,7 +148,6 @@ public void process(NSQMessage message) {
}
}
- @Test
public void multiPublishBatch() throws Exception {
logger.info("[ITProducer#multiPublishBatch] starts");
Producer producer = null;
@@ -266,4 +264,29 @@ public void publishDeflate() throws Exception {
logger.info("[ITProducer#publishDeflate] ends");
}
}
+
+ public void publishTrace() throws Exception {
+ logger.info("[ITProducer#publishTrace] starts");
+ TopicUtil.emptyQueue(adminHttp, "JavaTesting-Producer-Base", "BaseConsumer");
+ Producer producer = new ProducerImplV2(config);
+ try {
+ Topic topic = new Topic("JavaTesting-Producer-Base");
+ producer.start();
+ String msgStr = "The quick brown fox jumps over the lazy dog, 那只迅捷的灰狐狸跳过了那条懒狗";
+ for (int i = 0; i < 10; i++) {
+ Message msg = Message.create(topic, msgStr);
+ msg.traced();
+ MessageReceipt receipt = producer.publishAndGetReceipt(msg);
+ Assert.assertNotNull(receipt.getTopicName());
+ Assert.assertNotNull(receipt.getNsqdAddr());
+ Assert.assertEquals(0, receipt.getTraceID());
+ Assert.assertNotEquals(-1, receipt.getDiskQueueOffset());
+ Assert.assertNotEquals(-1, receipt.getDiskQueueSize());
+ }
+ }finally {
+ producer.close();
+ TopicUtil.emptyQueue(adminHttp, "JavaTesting-Producer-Base", "BaseConsumer");
+ logger.info("[ITProducer#publishDeflate] ends");
+ }
+ }
}
diff --git a/src/test/java/it/youzan/nsq/client/ITTagConsumer.java b/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
index 239d63b..7ac8b6c 100644
--- a/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
+++ b/src/test/java/it/youzan/nsq/client/ITTagConsumer.java
@@ -3,6 +3,7 @@
import com.youzan.nsq.client.Consumer;
import com.youzan.nsq.client.ConsumerImplV2;
import com.youzan.nsq.client.MessageHandler;
+import com.youzan.nsq.client.entity.ConsumeMessageFilterMode;
import com.youzan.nsq.client.entity.DesiredTag;
import com.youzan.nsq.client.entity.NSQConfig;
import com.youzan.nsq.client.entity.NSQMessage;
@@ -64,6 +65,7 @@ public void process(NSQMessage message) {
public void process(NSQMessage message) {
logger.error("Message should not received: " + message.getReadableContent());
logger.error("message tag: " + message.getTag().toString());
+
}
});
consumer.setAutoFinish(true);
@@ -205,4 +207,35 @@ public void process(NSQMessage message) {
TopicUtil.emptyQueue("http://" + props.getProperty("admin-address"), topic, "BaseConsumer");
}
}
+
+ public void testConsumerWithHeaderFilter() throws Exception {
+ String topic = "JavaTesting-Ext";
+ Consumer consumer = null;
+ try{
+ logger.info("[testConsumerWithHeaderFilter] starts");
+ NSQConfig config = new NSQConfig("BaseConsumer");
+ config.setLookupAddresses(props.getProperty("lookup-addresses"));
+ config.setConsumeMessageFilter("filter_key1", "filter_val1");
+ config.setConsumeMessageFilterMode(ConsumeMessageFilterMode.EXACT_MATCH);
+ final AtomicBoolean fail = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(5);
+ consumer = new ConsumerImplV2(config, new MessageHandler() {
+ @Override
+ public void process(NSQMessage message) {
+ if (null == message.getExtByName("filter_key1") || !message.getExtByName("filter_key1").equals("filter_val1"))
+ fail.set(true);
+ latch.countDown();
+ }
+ });
+ consumer.setAutoFinish(true);
+ consumer.subscribe(topic);
+ consumer.start();
+ latch.await(1, TimeUnit.MINUTES);
+ Assert.assertFalse(fail.get());
+ }finally {
+ consumer.close();
+ TopicUtil.emptyQueue("http://" + props.getProperty("admin-address"), topic, "BaseConsumer");
+ logger.info("[testConsumerWithHeaderFilter] ends");
+ }
+ }
}
diff --git a/src/test/java/it/youzan/nsq/client/ITTagProducer.java b/src/test/java/it/youzan/nsq/client/ITTagProducer.java
index ceb21a6..917cc9b 100644
--- a/src/test/java/it/youzan/nsq/client/ITTagProducer.java
+++ b/src/test/java/it/youzan/nsq/client/ITTagProducer.java
@@ -121,6 +121,32 @@ public void publishWTagMixWHeader() throws Exception {
}
}
+ public void publishWFilterHeader() throws Exception {
+ TopicUtil.emptyQueue("http://" + props.getProperty("admin-address"), "testExt2Par2Rep", "BaseConsumer");
+ Topic topic = new Topic("JavaTesting-Ext");
+
+ Map ext = new HashMap<>();
+ ext.put("filter_key1" ,"filter_val1");
+ ext.put("key1" ,"this is value 1");
+
+ Map extMissing = new HashMap<>();
+ extMissing.put("filter_key1", "filter_val2");
+ extMissing.put("filter_key2" ,"filter_val2");
+
+
+ for (int i = 0; i < 5; i++) {
+ Message msg = Message.create(topic, "message");
+ msg.setJsonHeaderExt(ext);
+ producer.publish(msg);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ Message msg = Message.create(topic, "message");
+ msg.setJsonHeaderExt(extMissing);
+ producer.publish(msg);
+ }
+ }
+
@AfterClass
public void close() {
IOUtil.closeQuietly(producer);
diff --git a/src/test/resources/testng-base-suite.xml b/src/test/resources/testng-base-suite.xml
index 8c20287..7649e47 100644
--- a/src/test/resources/testng-base-suite.xml
+++ b/src/test/resources/testng-base-suite.xml
@@ -57,4 +57,16 @@
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/test/resources/testng-base-tag-suite.xml b/src/test/resources/testng-base-tag-suite.xml
index 79f1f9e..bd98892 100644
--- a/src/test/resources/testng-base-tag-suite.xml
+++ b/src/test/resources/testng-base-tag-suite.xml
@@ -60,4 +60,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+