use producer to publish message for metrics collected by pulsar#228
use producer to publish message for metrics collected by pulsar#228BewareMyPower merged 5 commits intostreamnative:masterfrom
Conversation
|
The main issue is that Pulsar consumer cannot consume messages produced from Kafka producer. It looks like the original implementation does some extra work. The following is just an experiment. Let's change // >1. the old way
persistentTopic.publishMessage(
headerAndPayload,
MessagePublishContext.get(
offsetFuture, persistentTopic, System.nanoTime()));
// >2. the new way
topicManager.registerProducerInPersistentTopic(topic.toString(), persistentTopic);
topicManager.getReferenceProducer(topic.toString()).publishMessage(0, 0,
headerAndPayload, size, false);
offsetFuture.complete(Long.valueOf(size));to produce 2 same messages once. And change the Then the unit test passed even if By the way, the NPE of |
|
It looks like that the original implementation with a stats update could solve the problem: topicManager.getReferenceProducer(topic.toString())
.getTopic().incrementPublishCount(size, headerAndPayload.readableBytes());
persistentTopic.publishMessage(
headerAndPayload,
MessagePublishContext.get(
offsetFuture, persistentTopic, System.nanoTime()));In addition, if the problem was solved, a unit test should be added to verify the stats has been updated:
|
|
Also I found the possible reason. It still may be related to the NPE of Producer$MessagePublishContext.completed(Producer.java:377). When use public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
pendingWriteOps.incrementAndGet(); // ** [1] pendingWriteOps++ here, we need to -- later **/
/* ... */
switch (status) {
case NotDup:
// this is a PersistentTopic instance that has implemented addComplete/addFailed methods
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
break;
/* other cases... */
}
}The only difference is the while public void completed(Exception exception, long ledgerId, long entryId) {
if (exception != null) {
/* ... */
} else {
/* ... */
this.ledgerId = ledgerId;
this.entryId = entryId;
// NPE happens here, producer.cnx.ctx() == null
producer.cnx.ctx().channel().eventLoop().execute(this);
}
}And see the public void addComplete(Position pos, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
PositionImpl position = (PositionImpl) pos;
messageDeduplication.recordMessagePersisted(publishContext, position);
// NPE caused by Producer.MessagePublishContext#complete, so the following
// pendingWriteOps-- would be skipped.
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
// ** [2] pendingWriteOps-- here, it has ++ in publishMessage **/
decrementPendingWriteOpsAndCheck();
}I'm not sure if the reference count leak is the reason, but the NPE here is really an issue. See the example stack: |
26b69af to
3425df8
Compare
|
Add an explanation for
After discussing with @dockerzhang , the reason is that the original commit used |
to solve #226, and this patch can improve producing performance about 5 times.