Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 50 additions & 27 deletions src/main/java/com/youzan/nsq/client/ConsumerImplV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {

private volatile long lastConnecting = 0L;
private volatile long lastSuccess = 0L;
private volatile float consumptionRate = 0f;

private final AtomicLong received = new AtomicLong(0);
private final AtomicLong success = new AtomicLong(0);
Expand All @@ -81,8 +80,7 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
/*
* schedule executor for updating nsqd connections in effect
*/
private final ScheduledExecutorService scheduler = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getSimpleName(), Thread.NORM_PRIORITY));
private ScheduledExecutorService scheduler;
/*
* message handler
*/
Expand Down Expand Up @@ -113,7 +111,7 @@ public class ConsumerImplV2 implements Consumer, IConsumeInfo {
*/
public ConsumerImplV2(NSQConfig config) {
this.config = config;
this.simpleClient = new NSQSimpleClient(Role.Consumer, this.config.getUserSpecifiedLookupAddress());
this.simpleClient = new NSQSimpleClient(Role.Consumer, this.config.getUserSpecifiedLookupAddress(), this.config);

//initialize netty component
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(config.getNettyPoolSize());
Expand All @@ -125,7 +123,11 @@ public ConsumerImplV2(NSQConfig config) {
bootstrap.handler(new NSQClientInitializer());
//initialize consumer worker size
executor = Executors.newFixedThreadPool(this.config.getConsumerWorkerPoolSize(),
new NamedThreadFactory(this.getClass().getSimpleName() + "-ClientBusiness", Thread.MAX_PRIORITY));
new NamedThreadFactory(this.getClass().getSimpleName() + "-ClientBusiness-" + this.config.getConsumerName(), Thread.MAX_PRIORITY));
String consumerName = (null == this.config) ? "-null" : "-" + this.config.getConsumerName();
//intialize consumer simple client thread
scheduler = Executors
.newSingleThreadScheduledExecutor(new NamedThreadFactory(this.getClass().getSimpleName() + consumerName, Thread.NORM_PRIORITY));
}

/**
Expand Down Expand Up @@ -231,7 +233,7 @@ public void start() throws NSQException {
//start consumer
if (this.started.compareAndSet(Boolean.FALSE, Boolean.TRUE) && cLock.writeLock().tryLock()) {
String configJsonStr = NSQConfig.parseNSQConfig(this.config);
logger.info("Config for consumer {}: {}", this, configJsonStr);
logger.info("Consumer {} starts with config: {}", this, configJsonStr);
try {
//validate that consumer have right lookup address source
if (!validateLookupdSource()) {
Expand All @@ -252,14 +254,6 @@ public void start() throws NSQException {
}
}

/**
* update consumption rate according to message consumption last _INTERVAL_IN_SECOND
*/
private void updateConsumptionRate() {
consumptionRate = ((long)success.get() - lastSuccess) / _INTERVAL_IN_SECOND;
lastSuccess = success.get();
}

public boolean isConsumptionEstimateElapseTimeout() {
// return consumptionRate * queue4Consume.get() *1000 >= this.config.getMsgTimeoutInMillisecond();
return false;
Expand Down Expand Up @@ -594,6 +588,18 @@ public void incoming(final NSQFrame frame, final NSQConnection conn) throws NSQE
}
}

protected boolean checkExtFilter(final NSQMessage message, final NSQConnection conn) {
String filterKey = this.config.getConsumeMessageFilterKey();
if (conn.isExtend() && StringUtils.isNotBlank(filterKey)) {
String filterDataInHeader = (String) message.getExtByName(filterKey);
String filterDataInConf = this.config.getConsumeMessageFilterValue();
if(!this.config.getConsumeMessageFilterMode().getFilter().apply(filterDataInConf, filterDataInHeader)) {
return false;
}
}
return true;
}

private void processMessage(final NSQMessage message, final NSQConnection connection) {
if (logger.isDebugEnabled()) {
logger.debug(message.toString());
Expand Down Expand Up @@ -658,6 +664,7 @@ private void consume(final NSQMessage message, final NSQConnection connection) {
boolean retry;
boolean explicitRequeue = false;
boolean skip = needSkip(message);
skip = skip || !checkExtFilter(message, connection);

long start = System.currentTimeMillis();
try {
Expand Down Expand Up @@ -740,6 +747,9 @@ private void consume(final NSQMessage message, final NSQConnection connection) {
final byte[] id = message.getMessageID();
logger.info("Client does a re-queue explicitly. MessageID: {} , Hex: {}", id, message.newHexString(id));
}
} else if (skip) {
// Finish
cmd = new Finish(message.getMessageID());
} else {
// ignore actions
cmd = null;
Expand All @@ -760,17 +770,34 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
}
});
if (cmd instanceof Finish) {
finished.incrementAndGet();
} else {
re.incrementAndGet();
if(!skip) {
if (cmd instanceof Finish) {
finished.incrementAndGet();
} else {
re.incrementAndGet();
}
}
}
}
}
// Post
//log warn
if (!ok) {
int attempt = message.getReadableAttempts();
int warningThreshold = this.config.getAttemptWarningThresdhold();
int errorThreshold = this.config.getAttemptErrorThresdhold();

if(errorThreshold > 0 && attempt > errorThreshold){
if(0==attempt%errorThreshold) {
logger.error("Message attempts number has been {}, consider logging message content and finish. {}", attempt, message.toString());
}
} else if (warningThreshold > 0 && attempt > warningThreshold){
if(attempt > warningThreshold) {
if(0==attempt%warningThreshold) {
logger.warn("Message attempts number has been {}, consider logging message content and finish. {}", attempt, message.toString());
}
}
}
//TODO: connection.setMessageConsumptionFailed(start);
// logger.warn("Exception occurs in message handler. Please check it right now {} , Original message: {}.", message, message.getReadableContent());
} else if (!this.config.isOrdered()){
Expand Down Expand Up @@ -957,6 +984,10 @@ public void finish(final NSQMessage message) throws NSQException {
return;
}
final NSQConnection conn = address_2_conn.get(message.getAddress());
_finish(message, conn);
}

private void _finish(final NSQMessage message, final NSQConnection conn) throws NSQNoConnectionException {
if (conn != null) {
if (conn.getId() == message.getConnectionID().intValue()) {
if (conn.isConnected()) {
Expand Down Expand Up @@ -1046,19 +1077,11 @@ public String toString() {
} catch (IOException e) {
logger.warn(e.getLocalizedMessage());
}
return "[Consumer] at " + ipStr;
return "[Consumer@" + this.config.getConsumerName() + "] " + super.toString() + " at " + ipStr;
}

@Override
public float getLoadFactor() {
// ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
// int active = pool.getActiveCount();
// long queueSize = queue4Consume.get();
// if(active > 0)
// return (float)queueSize/active;
// else {
// return 0f;
// }
return 0f;
}

Expand Down
96 changes: 96 additions & 0 deletions src/main/java/com/youzan/nsq/client/MessageReceipt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.youzan.nsq.client;

import com.youzan.nsq.client.entity.Message;

/**
* Message receipt for publish response, for normal publish(PUB),
* it contains: topic name, partition number, nsqd address(host:port);
* if current receipt is for trace publish(PUB_TARCE), when {@link Message#traced()},
* following info contains, besides topic and nsqd info mentioned above:
*
* [internalID]
* [traceID]
* [diskQueueOffset]
* [diskQueueDataSize]
*/
public class MessageReceipt implements MessageMetadata {
private long internalID;
private long traceID;
private long diskQueueOffset=-1l;
private int diskQueueSize=-1;
private String topicName;
private int partition;
private String nsqdAddr;

public String getTopicName() {
return topicName;
}

public void setTopicName(String topicName) {
this.topicName = topicName;
}

public int getPartition() {
return partition;
}

public void setPartition(int partition) {
this.partition = partition;
}

public String getNsqdAddr() {
return nsqdAddr;
}

public void setNsqdAddr(String nsqdAddr) {
this.nsqdAddr = nsqdAddr;
}

public MessageReceipt() {

}

public void setInternalID(long internalID) {
this.internalID = internalID;
}

public long getInternalID() {
return internalID;
}

public long getTraceID() {
return traceID;
}

public void setTraceID(long traceID) {
this.traceID = traceID;
}

public long getDiskQueueOffset() {
return diskQueueOffset;
}

public void setDiskQueueOffset(long diskQueueOffset) {
this.diskQueueOffset = diskQueueOffset;
}

public int getDiskQueueSize() {
return diskQueueSize;
}

public void setDiskQueueSize(int diskQueueSize) {
this.diskQueueSize = diskQueueSize;
}

@Override
public String toMetadataStr() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().toString() + " meta-data:").append("\n");
sb.append("\t[internalID]:\t").append(internalID).append("\n");
sb.append("\t[traceID]:\t").append(traceID).append("\n");
sb.append("\t[diskQueueOffset]:\t").append(diskQueueOffset).append("\n");
sb.append("\t[diskQueueDataSize]:\t").append(diskQueueSize).append("\n");
sb.append(this.getClass().toString() + " end.");
return sb.toString();
}
}
9 changes: 9 additions & 0 deletions src/main/java/com/youzan/nsq/client/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public interface Producer extends Client, Closeable {

void publish(Message message) throws NSQException;

/**
* publish message and return receipt contains target nsqd&topic info for normal publish,
* and message meta info(internal id, disk offset, disk size, trace id) if message is traced.
* @param message
* @return receipt
* @throws NSQException
*/
MessageReceipt publishAndGetReceipt(Message message) throws NSQException;

/**
* Use it to produce only one 'message' sending to MQ.
* partition info is not specified in this function,
Expand Down
Loading