diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java new file mode 100644 index 0000000000000..c3d2db9916c25 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.logger; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class VerboseLoggerThrottle { + + /** + * How many permits per filling. + */ + private final int permitsPerFilling; + + /** + * How often should the basket be filled. + */ + private final long periodInMilliSecond; + + /** + * The last time of refreshing the bucket of permits. + * No thread-safe support because a particularly precise count is not required. + */ + private long lastFillTimestamp; + + /** + * How many permits are there now. + * No thread-safe support because a particularly precise count is not required. + */ + private int permits; + + public VerboseLoggerThrottle(long periodInSecond, int permits) { + if (permits <= 0) { + throw new IllegalArgumentException("permitsPerPeriod should be larger than 0"); + } + this.periodInMilliSecond = periodInSecond * 1000; + this.permitsPerFilling = permits; + this.permits = permitsPerFilling; + this.lastFillTimestamp = System.currentTimeMillis(); + } + + public boolean tryAcquire() { + if (System.currentTimeMillis() - lastFillTimestamp >= periodInMilliSecond) { + permits = permitsPerFilling; + lastFillTimestamp = System.currentTimeMillis(); + } + return --permits >= 0; + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/package-info.java new file mode 100644 index 0000000000000..574ed560e1a27 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.logger; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java new file mode 100644 index 0000000000000..764e0183ec952 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.logger; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertFalse; +import org.testng.annotations.Test; + +public class VerboseLoggerThrottleTest { + + @Test + public void testAcquire() throws Exception { + long periodInSecond = 5; + int permits = 10; + VerboseLoggerThrottle throttle = new VerboseLoggerThrottle(periodInSecond, permits); + for (int i = 0; i < permits; i++) { + assertTrue(throttle.tryAcquire()); + } + assertFalse(throttle.tryAcquire()); + Thread.sleep(periodInSecond * 1000); + for (int i = 0; i < permits; i++) { + assertTrue(throttle.tryAcquire()); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 5b6a723a250f7..47373e1636a98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.Collections; @@ -59,6 +60,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; +import org.apache.pulsar.logger.VerboseLoggerThrottle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -238,6 +240,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he if (!verifyChecksum(headersAndPayload)) { cnx.execute(() -> { + cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"); cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); @@ -301,7 +304,16 @@ private boolean verifyChecksum(ByteBuf headersAndPayload) { if (checksum == computedChecksum) { return true; } else { - log.error("[{}] [{}] Failed to verify checksum", topic, producerName); + if (verboseLoggerThrottle.tryAcquire()) { + headersAndPayload.readerIndex(readerIndex); + String hexHeadersAndPayload = ByteBufUtil.prettyHexDump(headersAndPayload); + log.error("[{}] [{}] Failed to verify checksum, client-side checksum: {}," + + " broker-side checksum: {}, hexHeadersAndPayload: {}", + topic, producerName, checksum, computedChecksum, hexHeadersAndPayload); + } else { + log.error("[{}] [{}] Failed to verify checksum, client checksum: {}, broker checksum: {}", + topic, producerName, checksum, computedChecksum); + } return false; } } finally { @@ -852,4 +864,6 @@ public boolean isDisconnecting() { private static final Logger log = LoggerFactory.getLogger(Producer.class); + private static final VerboseLoggerThrottle verboseLoggerThrottle = new VerboseLoggerThrottle(1, 3600); + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index a17d4a06f02a6..5c99a80865455 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -32,6 +32,7 @@ import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -1341,6 +1342,17 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { // msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload int metadataChecksum = computeChecksum(headerFrame); long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond()); + if (log.isTraceEnabled()) { + int payloadReaderIndex = msg.getSecond().readerIndex(); + try { + log.trace("[{}] [{}] verify checksum message with id {}, metadata checksum: {}," + + " computed checksum: {}, headerFrame: {}, payload: {}", + topic, producerName, op.sequenceId, metadataChecksum, computedChecksum, + ByteBufUtil.prettyHexDump(headerFrame), ByteBufUtil.prettyHexDump(msg.getSecond())); + } finally { + msg.getSecond().readerIndex(payloadReaderIndex); + } + } return checksum == computedChecksum; } else { log.warn("[{}] [{}] checksum is not present into message with id {}", topic, producerName,