From b1a161d7506e5b3dc8967edb5e37292f174b9c99 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Dec 2023 15:36:56 +0800 Subject: [PATCH 1/5] - --- .../apache/pulsar/logger/ThrottledLog.java | 967 ++++++++++++++++++ .../apache/pulsar/logger/package-info.java | 19 + .../pulsar/broker/service/Producer.java | 15 +- 3 files changed, 1000 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/logger/package-info.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java new file mode 100644 index 0000000000000..f838a96d0dda2 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java @@ -0,0 +1,967 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.logger; + +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.Marker; + +public class ThrottledLog implements Logger { + + private final Logger delegate; + + /** + * How many permits per filling. + */ + private final int permitsPerFilling; + + /** + * How often should the basket be filled. + */ + private final long periodInMilliSecond; + + /** + * The last time of refreshing the bucket of permits. + * No thread-safe processing is done because a particularly precise count is not required. + */ + private long lastFillTimestamp; + + /** + * How many permits are there now. + * No thread-safe processing is done because a particularly precise count is not required. + */ + private int permits = 0; + + public ThrottledLog(Logger log, long periodInSecond, int permitsPerPeriod) { + if (permitsPerPeriod <= 0) { + throw new IllegalArgumentException("permitsPerPeriod should be larger than 0"); + } + this.delegate = log; + this.periodInMilliSecond = periodInSecond * 1000; + this.permitsPerFilling = permitsPerPeriod; + this.permits = permitsPerFilling; + this.lastFillTimestamp = System.currentTimeMillis(); + } + + private boolean acquire() { + if (System.currentTimeMillis() - lastFillTimestamp > periodInMilliSecond) { + permits = permitsPerFilling; + lastFillTimestamp = System.currentTimeMillis(); + } + return --permits >= 0; + } + + private Object[] calculateGeneratedArgs(Object... arguments) { + Object[] args = new Object[arguments.length]; + for (int i = 0; i < arguments.length; i++) { + if (arguments[i] instanceof Supplier) { + Supplier supplier = (Supplier) arguments[i]; + args[0] = supplier.get(); + } else { + args[0] = arguments[i]; + } + } + return args; + } + + @Override + public String getName() { + return delegate.getName(); + } + + @Override + public boolean isTraceEnabled() { + return delegate.isTraceEnabled(); + } + + @Override + public boolean isTraceEnabled(Marker marker) { + return delegate.isTraceEnabled(marker); + } + + @Override + public boolean isDebugEnabled() { + return delegate.isDebugEnabled(); + } + + @Override + public boolean isDebugEnabled(Marker marker) { + return delegate.isDebugEnabled(marker); + } + + @Override + public boolean isInfoEnabled() { + return delegate.isInfoEnabled(); + } + + @Override + public boolean isInfoEnabled(Marker marker) { + return delegate.isInfoEnabled(marker); + } + + @Override + public boolean isWarnEnabled() { + return delegate.isWarnEnabled(); + } + + @Override + public boolean isWarnEnabled(Marker marker) { + return delegate.isWarnEnabled(marker); + } + + @Override + public boolean isErrorEnabled() { + return delegate.isErrorEnabled(); + } + + @Override + public boolean isErrorEnabled(Marker marker) { + return delegate.isErrorEnabled(marker); + } + + @Override + public void trace(String msg) { + if (acquire()) { + delegate.trace(msg); + } + } + + public void traceWithGeneratedArgs(Supplier msg) { + if (acquire()) { + delegate.trace(msg.get()); + } + } + + @Override + public void trace(String format, Object arg) { + if (acquire()) { + delegate.trace(format, arg); + } + } + + @Override + public void trace(String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.trace(format, arg1, arg2); + } + } + + public void traceWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.trace(format, arg1.get(), arg2.get()); + } + } + + public void traceWithGeneratedArgs(String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.trace(format, arg1.get(), arg2); + } + } + + public void traceWithGeneratedArgs(String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.trace(format, arg1, arg2.get()); + } + } + + @Override + public void trace(String format, Object... arguments) { + if (acquire()) { + delegate.trace(format, arguments); + } + } + + public void traceWithGeneratedArgs(String format, Object... args) { + if (acquire()) { + delegate.trace(format, calculateGeneratedArgs(args)); + } + } + + @Override + public void trace(String msg, Throwable t) { + if (acquire()) { + delegate.trace(msg, t); + } + } + + public void traceWithGeneratedArgs(Supplier msg, Throwable t) { + if (acquire()) { + delegate.trace(msg.get(), t); + } + } + + public void traceWithGeneratedArgs(Supplier msg, Supplier t) { + if (acquire()) { + delegate.trace(msg.get(), t.get()); + } + } + + public void traceWithGeneratedArgs(String msg, Supplier t) { + if (acquire()) { + delegate.trace(msg, t.get()); + } + } + + @Override + public void trace(Marker marker, String msg) { + if (acquire()) { + delegate.trace(marker, msg); + } + } + + public void traceWithGeneratedArgs(Marker marker, Supplier msg) { + if (acquire()) { + delegate.trace(marker, msg.get()); + } + } + + @Override + public void trace(Marker marker, String format, Object arg) { + if (acquire()) { + delegate.trace(marker, format, arg); + } + } + + @Override + public void trace(Marker marker, String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.trace(marker, format, arg1, arg2); + } + } + + public void traceWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.trace(marker, format, arg1.get(), arg2.get()); + } + } + + public void traceWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.trace(marker, format, arg1, arg2.get()); + } + } + + public void traceWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.trace(marker, format, arg1.get(), arg2); + } + } + + @Override + public void trace(Marker marker, String format, Object... argArray) { + if (acquire()) { + delegate.trace(marker, format, argArray); + } + } + + public void traceWithGeneratedArgs(Marker marker, String format, Object... args) { + if (acquire()) { + delegate.trace(marker, format, calculateGeneratedArgs(args)); + } + } + + @Override + public void trace(Marker marker, String msg, Throwable t) { + if (acquire()) { + delegate.trace(marker, msg, t); + } + } + + public void traceWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { + if (acquire()) { + delegate.trace(marker, msg.get(), t.get()); + } + } + + public void traceWithGeneratedArgs(Marker marker, String msg, Supplier t) { + if (acquire()) { + delegate.trace(marker, msg, t.get()); + } + } + + public void traceWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { + if (acquire()) { + delegate.trace(marker, msg.get(), t); + } + } + + @Override + public void debug(String msg) { + if (acquire()) { + delegate.debug(msg); + } + } + + public void debugWithGeneratedArgs(Supplier msg) { + if (acquire()) { + delegate.debug(msg.get()); + } + } + + @Override + public void debug(String format, Object arg) { + if (acquire()) { + delegate.debug(format, arg); + } + } + + @Override + public void debug(String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.debug(format, arg1, arg2); + } + } + + public void debugWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.debug(format, arg1.get(), arg2.get()); + } + } + + public void debugWithGeneratedArgs(String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.debug(format, arg1.get(), arg2); + } + } + + public void debugWithGeneratedArgs(String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.debug(format, arg1, arg2.get()); + } + } + + @Override + public void debug(String format, Object... arguments) { + if (acquire()) { + delegate.debug(format, arguments); + } + } + + public void debugWithGeneratedArgs(String format, Object... args) { + if (acquire()) { + delegate.debug(format, calculateGeneratedArgs(args)); + } + } + + @Override + public void debug(String msg, Throwable t) { + if (acquire()) { + delegate.debug(msg, t); + } + } + + public void debugWithGeneratedArgs(Supplier msg, Throwable t) { + if (acquire()) { + delegate.debug(msg.get(), t); + } + } + + public void debugWithGeneratedArgs(Supplier msg, Supplier t) { + if (acquire()) { + delegate.debug(msg.get(), t.get()); + } + } + + public void debugWithGeneratedArgs(String msg, Supplier t) { + if (acquire()) { + delegate.debug(msg, t.get()); + } + } + + @Override + public void debug(Marker marker, String msg) { + if (acquire()) { + delegate.debug(marker, msg); + } + } + + public void debugWithGeneratedArgs(Marker marker, Supplier msg) { + if (acquire()) { + delegate.debug(marker, msg.get()); + } + } + + @Override + public void debug(Marker marker, String format, Object arg) { + if (acquire()) { + delegate.debug(marker, format, arg); + } + } + + @Override + public void debug(Marker marker, String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.debug(marker, format, arg1, arg2); + } + } + + public void debugWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.debug(marker, format, arg1.get(), arg2.get()); + } + } + + public void debugWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.debug(marker, format, arg1, arg2.get()); + } + } + + public void debugWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.debug(marker, format, arg1.get(), arg2); + } + } + + @Override + public void debug(Marker marker, String format, Object... argArray) { + if (acquire()) { + delegate.debug(marker, format, argArray); + } + } + + public void debugWithGeneratedArgs(Marker marker, String format, Object... args) { + if (acquire()) { + delegate.debug(marker, format, calculateGeneratedArgs(args)); + } + } + + @Override + public void debug(Marker marker, String msg, Throwable t) { + if (acquire()) { + delegate.debug(marker, msg, t); + } + } + + public void debugWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { + if (acquire()) { + delegate.debug(marker, msg.get(), t.get()); + } + } + + public void debugWithGeneratedArgs(Marker marker, String msg, Supplier t) { + if (acquire()) { + delegate.debug(marker, msg, t.get()); + } + } + + public void debugWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { + if (acquire()) { + delegate.debug(marker, msg.get(), t); + } + } + + @Override + public void info(String msg) { + if (acquire()) { + delegate.info(msg); + } + } + + public void infoWithGeneratedArgs(Supplier msg) { + if (acquire()) { + delegate.info(msg.get()); + } + } + + @Override + public void info(String format, Object arg) { + if (acquire()) { + delegate.info(format, arg); + } + } + + @Override + public void info(String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.info(format, arg1, arg2); + } + } + + public void infoWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.info(format, arg1.get(), arg2.get()); + } + } + + public void infoWithGeneratedArgs(String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.info(format, arg1.get(), arg2); + } + } + + public void infoWithGeneratedArgs(String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.info(format, arg1, arg2.get()); + } + } + + @Override + public void info(String format, Object... arguments) { + if (acquire()) { + delegate.info(format, arguments); + } + } + + public void infoWithGeneratedArgs(String format, Object... args) { + if (acquire()) { + delegate.info(format, calculateGeneratedArgs(args)); + } + } + + @Override + public void info(String msg, Throwable t) { + if (acquire()) { + delegate.info(msg, t); + } + } + + public void infoWithGeneratedArgs(Supplier msg, Throwable t) { + if (acquire()) { + delegate.info(msg.get(), t); + } + } + + public void infoWithGeneratedArgs(Supplier msg, Supplier t) { + if (acquire()) { + delegate.info(msg.get(), t.get()); + } + } + + public void infoWithGeneratedArgs(String msg, Supplier t) { + if (acquire()) { + delegate.info(msg, t.get()); + } + } + + @Override + public void info(Marker marker, String msg) { + if (acquire()) { + delegate.info(marker, msg); + } + } + + public void infoWithGeneratedArgs(Marker marker, Supplier msg) { + if (acquire()) { + delegate.info(marker, msg.get()); + } + } + + @Override + public void info(Marker marker, String format, Object arg) { + if (acquire()) { + delegate.info(marker, format, arg); + } + } + + @Override + public void info(Marker marker, String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.info(marker, format, arg1, arg2); + } + } + + public void infoWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.info(marker, format, arg1.get(), arg2.get()); + } + } + + public void infoWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.info(marker, format, arg1, arg2.get()); + } + } + + public void infoWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.info(marker, format, arg1.get(), arg2); + } + } + + @Override + public void info(Marker marker, String format, Object... argArray) { + if (acquire()) { + delegate.info(marker, format, argArray); + } + } + + public void infoWithGeneratedArgs(Marker marker, String format, Object... args) { + if (acquire()) { + delegate.info(marker, format, calculateGeneratedArgs(args)); + } + } + + @Override + public void info(Marker marker, String msg, Throwable t) { + if (acquire()) { + delegate.info(marker, msg, t); + } + } + + public void infoWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { + if (acquire()) { + delegate.info(marker, msg.get(), t.get()); + } + } + + public void infoWithGeneratedArgs(Marker marker, String msg, Supplier t) { + if (acquire()) { + delegate.info(marker, msg, t.get()); + } + } + + public void infoWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { + if (acquire()) { + delegate.info(marker, msg.get(), t); + } + } + + @Override + public void warn(String msg) { + if (acquire()) { + delegate.warn(msg); + } + } + + public void warnWithGeneratedArgs(Supplier msg) { + if (acquire()) { + delegate.warn(msg.get()); + } + } + + @Override + public void warn(String format, Object arg) { + if (acquire()) { + delegate.warn(format, arg); + } + } + + @Override + public void warn(String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.warn(format, arg1, arg2); + } + } + + public void warnWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.warn(format, arg1.get(), arg2.get()); + } + } + + public void warnWithGeneratedArgs(String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.warn(format, arg1.get(), arg2); + } + } + + public void warnWithGeneratedArgs(String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.warn(format, arg1, arg2.get()); + } + } + + @Override + public void warn(String format, Object... arguments) { + if (acquire()) { + delegate.warn(format, arguments); + } + } + + public void warnWithGeneratedArgs(String format, Object... args) { + if (acquire()) { + delegate.warn(format, calculateGeneratedArgs(args)); + } + } + + @Override + public void warn(String msg, Throwable t) { + if (acquire()) { + delegate.warn(msg, t); + } + } + + public void warnWithGeneratedArgs(Supplier msg, Throwable t) { + if (acquire()) { + delegate.warn(msg.get(), t); + } + } + + public void warnWithGeneratedArgs(Supplier msg, Supplier t) { + if (acquire()) { + delegate.warn(msg.get(), t.get()); + } + } + + public void warnWithGeneratedArgs(String msg, Supplier t) { + if (acquire()) { + delegate.warn(msg, t.get()); + } + } + + @Override + public void warn(Marker marker, String msg) { + if (acquire()) { + delegate.warn(marker, msg); + } + } + + public void warnWithGeneratedArgs(Marker marker, Supplier msg) { + if (acquire()) { + delegate.warn(marker, msg.get()); + } + } + + @Override + public void warn(Marker marker, String format, Object arg) { + if (acquire()) { + delegate.warn(marker, format, arg); + } + } + + @Override + public void warn(Marker marker, String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.warn(marker, format, arg1, arg2); + } + } + + public void warnWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.warn(marker, format, arg1.get(), arg2.get()); + } + } + + public void warnWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.warn(marker, format, arg1, arg2.get()); + } + } + + public void warnWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.warn(marker, format, arg1.get(), arg2); + } + } + + @Override + public void warn(Marker marker, String format, Object... argArray) { + if (acquire()) { + delegate.warn(marker, format, argArray); + } + } + + public void warnWithGeneratedArgs(Marker marker, String format, Object... args) { + if (acquire()) { + delegate.warn(marker, format, calculateGeneratedArgs(args)); + } + } + + @Override + public void warn(Marker marker, String msg, Throwable t) { + if (acquire()) { + delegate.warn(marker, msg, t); + } + } + + public void warnWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { + if (acquire()) { + delegate.warn(marker, msg.get(), t.get()); + } + } + + public void warnWithGeneratedArgs(Marker marker, String msg, Supplier t) { + if (acquire()) { + delegate.warn(marker, msg, t.get()); + } + } + + public void warnWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { + if (acquire()) { + delegate.warn(marker, msg.get(), t); + } + } + + @Override + public void error(String msg) { + if (acquire()) { + delegate.error(msg); + } + } + + public void errorWithGeneratedArgs(Supplier msg) { + if (acquire()) { + delegate.error(msg.get()); + } + } + + @Override + public void error(String format, Object arg) { + if (acquire()) { + delegate.error(format, arg); + } + } + + @Override + public void error(String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.error(format, arg1, arg2); + } + } + + public void errorWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.error(format, arg1.get(), arg2.get()); + } + } + + public void errorWithGeneratedArgs(String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.error(format, arg1.get(), arg2); + } + } + + public void errorWithGeneratedArgs(String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.error(format, arg1, arg2.get()); + } + } + + @Override + public void error(String format, Object... arguments) { + if (acquire()) { + delegate.error(format, arguments); + } + } + + public void errorWithGeneratedArgs(String format, Object... args) { + if (acquire()) { + delegate.error(format, calculateGeneratedArgs(args)); + } + } + + @Override + public void error(String msg, Throwable t) { + if (acquire()) { + delegate.error(msg, t); + } + } + + public void errorWithGeneratedArgs(Supplier msg, Throwable t) { + if (acquire()) { + delegate.error(msg.get(), t); + } + } + + public void errorWithGeneratedArgs(Supplier msg, Supplier t) { + if (acquire()) { + delegate.error(msg.get(), t.get()); + } + } + + public void errorWithGeneratedArgs(String msg, Supplier t) { + if (acquire()) { + delegate.error(msg, t.get()); + } + } + + @Override + public void error(Marker marker, String msg) { + if (acquire()) { + delegate.error(marker, msg); + } + } + + public void errorWithGeneratedArgs(Marker marker, Supplier msg) { + if (acquire()) { + delegate.error(marker, msg.get()); + } + } + + @Override + public void error(Marker marker, String format, Object arg) { + if (acquire()) { + delegate.error(marker, format, arg); + } + } + + @Override + public void error(Marker marker, String format, Object arg1, Object arg2) { + if (acquire()) { + delegate.error(marker, format, arg1, arg2); + } + } + + public void errorWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { + if (acquire()) { + delegate.error(marker, format, arg1.get(), arg2.get()); + } + } + + public void errorWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { + if (acquire()) { + delegate.error(marker, format, arg1, arg2.get()); + } + } + + public void errorWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { + if (acquire()) { + delegate.error(marker, format, arg1.get(), arg2); + } + } + + @Override + public void error(Marker marker, String format, Object... argArray) { + if (acquire()) { + delegate.error(marker, format, argArray); + } + } + + public void errorWithGeneratedArgs(Marker marker, String format, Object... args) { + if (acquire()) { + delegate.error(marker, format, calculateGeneratedArgs(args)); + } + } + + @Override + public void error(Marker marker, String msg, Throwable t) { + if (acquire()) { + delegate.error(marker, msg, t); + } + } + + public void errorWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { + if (acquire()) { + delegate.error(marker, msg.get(), t.get()); + } + } + + public void errorWithGeneratedArgs(Marker marker, String msg, Supplier t) { + if (acquire()) { + delegate.error(marker, msg, t.get()); + } + } + + public void errorWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { + if (acquire()) { + delegate.error(marker, msg.get(), t); + } + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/package-info.java new file mode 100644 index 0000000000000..574ed560e1a27 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.logger; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 5b6a723a250f7..1ab481dbe79ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.util.Collections; @@ -59,6 +60,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; +import org.apache.pulsar.logger.ThrottledLog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -238,6 +240,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he if (!verifyChecksum(headersAndPayload)) { cnx.execute(() -> { + cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"); cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); @@ -301,7 +304,15 @@ private boolean verifyChecksum(ByteBuf headersAndPayload) { if (checksum == computedChecksum) { return true; } else { - log.error("[{}] [{}] Failed to verify checksum", topic, producerName); + headersAndPayload.readerIndex(readerIndex); + String hexHeadersAndPayload = ByteBufUtil.prettyHexDump(headersAndPayload); + log.error("[{}] [{}] Failed to verify checksum, client checksum: {}, broker checksum: {}", + topic, producerName, checksum, computedChecksum); + throttledLog.errorWithGeneratedArgs("[{}] [{}] Failed to verify checksum, client checksum: {}," + + " broker checksum: {}", + topic, producerName, checksum, () -> { + return null; + }); return false; } } finally { @@ -852,4 +863,6 @@ public boolean isDisconnecting() { private static final Logger log = LoggerFactory.getLogger(Producer.class); + private static final ThrottledLog throttledLog = new ThrottledLog(log, 1, 3600); + } From a9e34a7f0d2c501b4524aa4f5dd89d1f989bf979 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Dec 2023 16:12:19 +0800 Subject: [PATCH 2/5] add logs for checksum --- .../apache/pulsar/logger/ThrottledLog.java | 967 ------------------ .../pulsar/logger/VerboseLoggerThrottle.java | 65 ++ .../logger/VerboseLoggerThrottleTest.java | 23 + .../pulsar/broker/service/Producer.java | 23 +- .../pulsar/client/impl/ProducerImpl.java | 12 + 5 files changed, 112 insertions(+), 978 deletions(-) delete mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java create mode 100644 pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java deleted file mode 100644 index f838a96d0dda2..0000000000000 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/ThrottledLog.java +++ /dev/null @@ -1,967 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.logger; - -import java.util.function.Supplier; -import org.slf4j.Logger; -import org.slf4j.Marker; - -public class ThrottledLog implements Logger { - - private final Logger delegate; - - /** - * How many permits per filling. - */ - private final int permitsPerFilling; - - /** - * How often should the basket be filled. - */ - private final long periodInMilliSecond; - - /** - * The last time of refreshing the bucket of permits. - * No thread-safe processing is done because a particularly precise count is not required. - */ - private long lastFillTimestamp; - - /** - * How many permits are there now. - * No thread-safe processing is done because a particularly precise count is not required. - */ - private int permits = 0; - - public ThrottledLog(Logger log, long periodInSecond, int permitsPerPeriod) { - if (permitsPerPeriod <= 0) { - throw new IllegalArgumentException("permitsPerPeriod should be larger than 0"); - } - this.delegate = log; - this.periodInMilliSecond = periodInSecond * 1000; - this.permitsPerFilling = permitsPerPeriod; - this.permits = permitsPerFilling; - this.lastFillTimestamp = System.currentTimeMillis(); - } - - private boolean acquire() { - if (System.currentTimeMillis() - lastFillTimestamp > periodInMilliSecond) { - permits = permitsPerFilling; - lastFillTimestamp = System.currentTimeMillis(); - } - return --permits >= 0; - } - - private Object[] calculateGeneratedArgs(Object... arguments) { - Object[] args = new Object[arguments.length]; - for (int i = 0; i < arguments.length; i++) { - if (arguments[i] instanceof Supplier) { - Supplier supplier = (Supplier) arguments[i]; - args[0] = supplier.get(); - } else { - args[0] = arguments[i]; - } - } - return args; - } - - @Override - public String getName() { - return delegate.getName(); - } - - @Override - public boolean isTraceEnabled() { - return delegate.isTraceEnabled(); - } - - @Override - public boolean isTraceEnabled(Marker marker) { - return delegate.isTraceEnabled(marker); - } - - @Override - public boolean isDebugEnabled() { - return delegate.isDebugEnabled(); - } - - @Override - public boolean isDebugEnabled(Marker marker) { - return delegate.isDebugEnabled(marker); - } - - @Override - public boolean isInfoEnabled() { - return delegate.isInfoEnabled(); - } - - @Override - public boolean isInfoEnabled(Marker marker) { - return delegate.isInfoEnabled(marker); - } - - @Override - public boolean isWarnEnabled() { - return delegate.isWarnEnabled(); - } - - @Override - public boolean isWarnEnabled(Marker marker) { - return delegate.isWarnEnabled(marker); - } - - @Override - public boolean isErrorEnabled() { - return delegate.isErrorEnabled(); - } - - @Override - public boolean isErrorEnabled(Marker marker) { - return delegate.isErrorEnabled(marker); - } - - @Override - public void trace(String msg) { - if (acquire()) { - delegate.trace(msg); - } - } - - public void traceWithGeneratedArgs(Supplier msg) { - if (acquire()) { - delegate.trace(msg.get()); - } - } - - @Override - public void trace(String format, Object arg) { - if (acquire()) { - delegate.trace(format, arg); - } - } - - @Override - public void trace(String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.trace(format, arg1, arg2); - } - } - - public void traceWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.trace(format, arg1.get(), arg2.get()); - } - } - - public void traceWithGeneratedArgs(String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.trace(format, arg1.get(), arg2); - } - } - - public void traceWithGeneratedArgs(String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.trace(format, arg1, arg2.get()); - } - } - - @Override - public void trace(String format, Object... arguments) { - if (acquire()) { - delegate.trace(format, arguments); - } - } - - public void traceWithGeneratedArgs(String format, Object... args) { - if (acquire()) { - delegate.trace(format, calculateGeneratedArgs(args)); - } - } - - @Override - public void trace(String msg, Throwable t) { - if (acquire()) { - delegate.trace(msg, t); - } - } - - public void traceWithGeneratedArgs(Supplier msg, Throwable t) { - if (acquire()) { - delegate.trace(msg.get(), t); - } - } - - public void traceWithGeneratedArgs(Supplier msg, Supplier t) { - if (acquire()) { - delegate.trace(msg.get(), t.get()); - } - } - - public void traceWithGeneratedArgs(String msg, Supplier t) { - if (acquire()) { - delegate.trace(msg, t.get()); - } - } - - @Override - public void trace(Marker marker, String msg) { - if (acquire()) { - delegate.trace(marker, msg); - } - } - - public void traceWithGeneratedArgs(Marker marker, Supplier msg) { - if (acquire()) { - delegate.trace(marker, msg.get()); - } - } - - @Override - public void trace(Marker marker, String format, Object arg) { - if (acquire()) { - delegate.trace(marker, format, arg); - } - } - - @Override - public void trace(Marker marker, String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.trace(marker, format, arg1, arg2); - } - } - - public void traceWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.trace(marker, format, arg1.get(), arg2.get()); - } - } - - public void traceWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.trace(marker, format, arg1, arg2.get()); - } - } - - public void traceWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.trace(marker, format, arg1.get(), arg2); - } - } - - @Override - public void trace(Marker marker, String format, Object... argArray) { - if (acquire()) { - delegate.trace(marker, format, argArray); - } - } - - public void traceWithGeneratedArgs(Marker marker, String format, Object... args) { - if (acquire()) { - delegate.trace(marker, format, calculateGeneratedArgs(args)); - } - } - - @Override - public void trace(Marker marker, String msg, Throwable t) { - if (acquire()) { - delegate.trace(marker, msg, t); - } - } - - public void traceWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { - if (acquire()) { - delegate.trace(marker, msg.get(), t.get()); - } - } - - public void traceWithGeneratedArgs(Marker marker, String msg, Supplier t) { - if (acquire()) { - delegate.trace(marker, msg, t.get()); - } - } - - public void traceWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { - if (acquire()) { - delegate.trace(marker, msg.get(), t); - } - } - - @Override - public void debug(String msg) { - if (acquire()) { - delegate.debug(msg); - } - } - - public void debugWithGeneratedArgs(Supplier msg) { - if (acquire()) { - delegate.debug(msg.get()); - } - } - - @Override - public void debug(String format, Object arg) { - if (acquire()) { - delegate.debug(format, arg); - } - } - - @Override - public void debug(String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.debug(format, arg1, arg2); - } - } - - public void debugWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.debug(format, arg1.get(), arg2.get()); - } - } - - public void debugWithGeneratedArgs(String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.debug(format, arg1.get(), arg2); - } - } - - public void debugWithGeneratedArgs(String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.debug(format, arg1, arg2.get()); - } - } - - @Override - public void debug(String format, Object... arguments) { - if (acquire()) { - delegate.debug(format, arguments); - } - } - - public void debugWithGeneratedArgs(String format, Object... args) { - if (acquire()) { - delegate.debug(format, calculateGeneratedArgs(args)); - } - } - - @Override - public void debug(String msg, Throwable t) { - if (acquire()) { - delegate.debug(msg, t); - } - } - - public void debugWithGeneratedArgs(Supplier msg, Throwable t) { - if (acquire()) { - delegate.debug(msg.get(), t); - } - } - - public void debugWithGeneratedArgs(Supplier msg, Supplier t) { - if (acquire()) { - delegate.debug(msg.get(), t.get()); - } - } - - public void debugWithGeneratedArgs(String msg, Supplier t) { - if (acquire()) { - delegate.debug(msg, t.get()); - } - } - - @Override - public void debug(Marker marker, String msg) { - if (acquire()) { - delegate.debug(marker, msg); - } - } - - public void debugWithGeneratedArgs(Marker marker, Supplier msg) { - if (acquire()) { - delegate.debug(marker, msg.get()); - } - } - - @Override - public void debug(Marker marker, String format, Object arg) { - if (acquire()) { - delegate.debug(marker, format, arg); - } - } - - @Override - public void debug(Marker marker, String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.debug(marker, format, arg1, arg2); - } - } - - public void debugWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.debug(marker, format, arg1.get(), arg2.get()); - } - } - - public void debugWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.debug(marker, format, arg1, arg2.get()); - } - } - - public void debugWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.debug(marker, format, arg1.get(), arg2); - } - } - - @Override - public void debug(Marker marker, String format, Object... argArray) { - if (acquire()) { - delegate.debug(marker, format, argArray); - } - } - - public void debugWithGeneratedArgs(Marker marker, String format, Object... args) { - if (acquire()) { - delegate.debug(marker, format, calculateGeneratedArgs(args)); - } - } - - @Override - public void debug(Marker marker, String msg, Throwable t) { - if (acquire()) { - delegate.debug(marker, msg, t); - } - } - - public void debugWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { - if (acquire()) { - delegate.debug(marker, msg.get(), t.get()); - } - } - - public void debugWithGeneratedArgs(Marker marker, String msg, Supplier t) { - if (acquire()) { - delegate.debug(marker, msg, t.get()); - } - } - - public void debugWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { - if (acquire()) { - delegate.debug(marker, msg.get(), t); - } - } - - @Override - public void info(String msg) { - if (acquire()) { - delegate.info(msg); - } - } - - public void infoWithGeneratedArgs(Supplier msg) { - if (acquire()) { - delegate.info(msg.get()); - } - } - - @Override - public void info(String format, Object arg) { - if (acquire()) { - delegate.info(format, arg); - } - } - - @Override - public void info(String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.info(format, arg1, arg2); - } - } - - public void infoWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.info(format, arg1.get(), arg2.get()); - } - } - - public void infoWithGeneratedArgs(String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.info(format, arg1.get(), arg2); - } - } - - public void infoWithGeneratedArgs(String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.info(format, arg1, arg2.get()); - } - } - - @Override - public void info(String format, Object... arguments) { - if (acquire()) { - delegate.info(format, arguments); - } - } - - public void infoWithGeneratedArgs(String format, Object... args) { - if (acquire()) { - delegate.info(format, calculateGeneratedArgs(args)); - } - } - - @Override - public void info(String msg, Throwable t) { - if (acquire()) { - delegate.info(msg, t); - } - } - - public void infoWithGeneratedArgs(Supplier msg, Throwable t) { - if (acquire()) { - delegate.info(msg.get(), t); - } - } - - public void infoWithGeneratedArgs(Supplier msg, Supplier t) { - if (acquire()) { - delegate.info(msg.get(), t.get()); - } - } - - public void infoWithGeneratedArgs(String msg, Supplier t) { - if (acquire()) { - delegate.info(msg, t.get()); - } - } - - @Override - public void info(Marker marker, String msg) { - if (acquire()) { - delegate.info(marker, msg); - } - } - - public void infoWithGeneratedArgs(Marker marker, Supplier msg) { - if (acquire()) { - delegate.info(marker, msg.get()); - } - } - - @Override - public void info(Marker marker, String format, Object arg) { - if (acquire()) { - delegate.info(marker, format, arg); - } - } - - @Override - public void info(Marker marker, String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.info(marker, format, arg1, arg2); - } - } - - public void infoWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.info(marker, format, arg1.get(), arg2.get()); - } - } - - public void infoWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.info(marker, format, arg1, arg2.get()); - } - } - - public void infoWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.info(marker, format, arg1.get(), arg2); - } - } - - @Override - public void info(Marker marker, String format, Object... argArray) { - if (acquire()) { - delegate.info(marker, format, argArray); - } - } - - public void infoWithGeneratedArgs(Marker marker, String format, Object... args) { - if (acquire()) { - delegate.info(marker, format, calculateGeneratedArgs(args)); - } - } - - @Override - public void info(Marker marker, String msg, Throwable t) { - if (acquire()) { - delegate.info(marker, msg, t); - } - } - - public void infoWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { - if (acquire()) { - delegate.info(marker, msg.get(), t.get()); - } - } - - public void infoWithGeneratedArgs(Marker marker, String msg, Supplier t) { - if (acquire()) { - delegate.info(marker, msg, t.get()); - } - } - - public void infoWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { - if (acquire()) { - delegate.info(marker, msg.get(), t); - } - } - - @Override - public void warn(String msg) { - if (acquire()) { - delegate.warn(msg); - } - } - - public void warnWithGeneratedArgs(Supplier msg) { - if (acquire()) { - delegate.warn(msg.get()); - } - } - - @Override - public void warn(String format, Object arg) { - if (acquire()) { - delegate.warn(format, arg); - } - } - - @Override - public void warn(String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.warn(format, arg1, arg2); - } - } - - public void warnWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.warn(format, arg1.get(), arg2.get()); - } - } - - public void warnWithGeneratedArgs(String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.warn(format, arg1.get(), arg2); - } - } - - public void warnWithGeneratedArgs(String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.warn(format, arg1, arg2.get()); - } - } - - @Override - public void warn(String format, Object... arguments) { - if (acquire()) { - delegate.warn(format, arguments); - } - } - - public void warnWithGeneratedArgs(String format, Object... args) { - if (acquire()) { - delegate.warn(format, calculateGeneratedArgs(args)); - } - } - - @Override - public void warn(String msg, Throwable t) { - if (acquire()) { - delegate.warn(msg, t); - } - } - - public void warnWithGeneratedArgs(Supplier msg, Throwable t) { - if (acquire()) { - delegate.warn(msg.get(), t); - } - } - - public void warnWithGeneratedArgs(Supplier msg, Supplier t) { - if (acquire()) { - delegate.warn(msg.get(), t.get()); - } - } - - public void warnWithGeneratedArgs(String msg, Supplier t) { - if (acquire()) { - delegate.warn(msg, t.get()); - } - } - - @Override - public void warn(Marker marker, String msg) { - if (acquire()) { - delegate.warn(marker, msg); - } - } - - public void warnWithGeneratedArgs(Marker marker, Supplier msg) { - if (acquire()) { - delegate.warn(marker, msg.get()); - } - } - - @Override - public void warn(Marker marker, String format, Object arg) { - if (acquire()) { - delegate.warn(marker, format, arg); - } - } - - @Override - public void warn(Marker marker, String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.warn(marker, format, arg1, arg2); - } - } - - public void warnWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.warn(marker, format, arg1.get(), arg2.get()); - } - } - - public void warnWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.warn(marker, format, arg1, arg2.get()); - } - } - - public void warnWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.warn(marker, format, arg1.get(), arg2); - } - } - - @Override - public void warn(Marker marker, String format, Object... argArray) { - if (acquire()) { - delegate.warn(marker, format, argArray); - } - } - - public void warnWithGeneratedArgs(Marker marker, String format, Object... args) { - if (acquire()) { - delegate.warn(marker, format, calculateGeneratedArgs(args)); - } - } - - @Override - public void warn(Marker marker, String msg, Throwable t) { - if (acquire()) { - delegate.warn(marker, msg, t); - } - } - - public void warnWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { - if (acquire()) { - delegate.warn(marker, msg.get(), t.get()); - } - } - - public void warnWithGeneratedArgs(Marker marker, String msg, Supplier t) { - if (acquire()) { - delegate.warn(marker, msg, t.get()); - } - } - - public void warnWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { - if (acquire()) { - delegate.warn(marker, msg.get(), t); - } - } - - @Override - public void error(String msg) { - if (acquire()) { - delegate.error(msg); - } - } - - public void errorWithGeneratedArgs(Supplier msg) { - if (acquire()) { - delegate.error(msg.get()); - } - } - - @Override - public void error(String format, Object arg) { - if (acquire()) { - delegate.error(format, arg); - } - } - - @Override - public void error(String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.error(format, arg1, arg2); - } - } - - public void errorWithGeneratedArgs(String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.error(format, arg1.get(), arg2.get()); - } - } - - public void errorWithGeneratedArgs(String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.error(format, arg1.get(), arg2); - } - } - - public void errorWithGeneratedArgs(String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.error(format, arg1, arg2.get()); - } - } - - @Override - public void error(String format, Object... arguments) { - if (acquire()) { - delegate.error(format, arguments); - } - } - - public void errorWithGeneratedArgs(String format, Object... args) { - if (acquire()) { - delegate.error(format, calculateGeneratedArgs(args)); - } - } - - @Override - public void error(String msg, Throwable t) { - if (acquire()) { - delegate.error(msg, t); - } - } - - public void errorWithGeneratedArgs(Supplier msg, Throwable t) { - if (acquire()) { - delegate.error(msg.get(), t); - } - } - - public void errorWithGeneratedArgs(Supplier msg, Supplier t) { - if (acquire()) { - delegate.error(msg.get(), t.get()); - } - } - - public void errorWithGeneratedArgs(String msg, Supplier t) { - if (acquire()) { - delegate.error(msg, t.get()); - } - } - - @Override - public void error(Marker marker, String msg) { - if (acquire()) { - delegate.error(marker, msg); - } - } - - public void errorWithGeneratedArgs(Marker marker, Supplier msg) { - if (acquire()) { - delegate.error(marker, msg.get()); - } - } - - @Override - public void error(Marker marker, String format, Object arg) { - if (acquire()) { - delegate.error(marker, format, arg); - } - } - - @Override - public void error(Marker marker, String format, Object arg1, Object arg2) { - if (acquire()) { - delegate.error(marker, format, arg1, arg2); - } - } - - public void errorWithGeneratedArgs(Marker marker, String format, Supplier arg1, Supplier arg2) { - if (acquire()) { - delegate.error(marker, format, arg1.get(), arg2.get()); - } - } - - public void errorWithGeneratedArgs(Marker marker, String format, Object arg1, Supplier arg2) { - if (acquire()) { - delegate.error(marker, format, arg1, arg2.get()); - } - } - - public void errorWithGeneratedArgs(Marker marker, String format, Supplier arg1, Object arg2) { - if (acquire()) { - delegate.error(marker, format, arg1.get(), arg2); - } - } - - @Override - public void error(Marker marker, String format, Object... argArray) { - if (acquire()) { - delegate.error(marker, format, argArray); - } - } - - public void errorWithGeneratedArgs(Marker marker, String format, Object... args) { - if (acquire()) { - delegate.error(marker, format, calculateGeneratedArgs(args)); - } - } - - @Override - public void error(Marker marker, String msg, Throwable t) { - if (acquire()) { - delegate.error(marker, msg, t); - } - } - - public void errorWithGeneratedArgs(Marker marker, Supplier msg, Supplier t) { - if (acquire()) { - delegate.error(marker, msg.get(), t.get()); - } - } - - public void errorWithGeneratedArgs(Marker marker, String msg, Supplier t) { - if (acquire()) { - delegate.error(marker, msg, t.get()); - } - } - - public void errorWithGeneratedArgs(Marker marker, Supplier msg, Throwable t) { - if (acquire()) { - delegate.error(marker, msg.get(), t); - } - } -} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java new file mode 100644 index 0000000000000..50bccc8b6ef92 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.logger; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class VerboseLoggerThrottle { + + /** + * How many permits per filling. + */ + private final int permitsPerFilling; + + /** + * How often should the basket be filled. + */ + private final long periodInMilliSecond; + + /** + * The last time of refreshing the bucket of permits. + * No thread-safe processing is done because a particularly precise count is not required. + */ + private long lastFillTimestamp; + + /** + * How many permits are there now. + * No thread-safe processing is done because a particularly precise count is not required. + */ + private int permits; + + public VerboseLoggerThrottle(long periodInSecond, int permits) { + if (permits <= 0) { + throw new IllegalArgumentException("permitsPerPeriod should be larger than 0"); + } + this.periodInMilliSecond = periodInSecond * 1000; + this.permitsPerFilling = permits; + this.permits = permitsPerFilling; + this.lastFillTimestamp = System.currentTimeMillis(); + } + + public boolean acquire() { + if (System.currentTimeMillis() - lastFillTimestamp >= periodInMilliSecond) { + permits = permitsPerFilling; + lastFillTimestamp = System.currentTimeMillis(); + } + return --permits >= 0; + } +} diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java new file mode 100644 index 0000000000000..4dd6fabaaf241 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java @@ -0,0 +1,23 @@ +package org.apache.pulsar.logger; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertFalse; +import org.testng.annotations.Test; + +public class VerboseLoggerThrottleTest { + + @Test + public void testAcquire() throws Exception { + long periodInSecond = 5; + int permits = 10; + VerboseLoggerThrottle throttle = new VerboseLoggerThrottle(periodInSecond, permits); + for (int i = 0; i < permits; i++) { + assertTrue(throttle.acquire()); + } + assertFalse(throttle.acquire()); + Thread.sleep(periodInSecond * 1000); + for (int i = 0; i < permits; i++) { + assertTrue(throttle.acquire()); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 1ab481dbe79ea..8f52bd6ba0817 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -60,7 +60,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; -import org.apache.pulsar.logger.ThrottledLog; +import org.apache.pulsar.logger.VerboseLoggerThrottle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -304,15 +304,16 @@ private boolean verifyChecksum(ByteBuf headersAndPayload) { if (checksum == computedChecksum) { return true; } else { - headersAndPayload.readerIndex(readerIndex); - String hexHeadersAndPayload = ByteBufUtil.prettyHexDump(headersAndPayload); - log.error("[{}] [{}] Failed to verify checksum, client checksum: {}, broker checksum: {}", - topic, producerName, checksum, computedChecksum); - throttledLog.errorWithGeneratedArgs("[{}] [{}] Failed to verify checksum, client checksum: {}," - + " broker checksum: {}", - topic, producerName, checksum, () -> { - return null; - }); + if (verboseLoggerThrottle.acquire()) { + headersAndPayload.readerIndex(readerIndex); + String hexHeadersAndPayload = ByteBufUtil.prettyHexDump(headersAndPayload); + log.error("[{}] [{}] Failed to verify checksum, client-side checksum: {}," + + " broker-side checksum: {}, hexHeadersAndPayload: {}", + topic, producerName, checksum, computedChecksum, hexHeadersAndPayload); + } else { + log.error("[{}] [{}] Failed to verify checksum, client checksum: {}, broker checksum: {}", + topic, producerName, checksum, computedChecksum); + } return false; } } finally { @@ -863,6 +864,6 @@ public boolean isDisconnecting() { private static final Logger log = LoggerFactory.getLogger(Producer.class); - private static final ThrottledLog throttledLog = new ThrottledLog(log, 1, 3600); + private static final VerboseLoggerThrottle verboseLoggerThrottle = new VerboseLoggerThrottle(1, 3600); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index a17d4a06f02a6..5c99a80865455 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -32,6 +32,7 @@ import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -1341,6 +1342,17 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { // msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload int metadataChecksum = computeChecksum(headerFrame); long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond()); + if (log.isTraceEnabled()) { + int payloadReaderIndex = msg.getSecond().readerIndex(); + try { + log.trace("[{}] [{}] verify checksum message with id {}, metadata checksum: {}," + + " computed checksum: {}, headerFrame: {}, payload: {}", + topic, producerName, op.sequenceId, metadataChecksum, computedChecksum, + ByteBufUtil.prettyHexDump(headerFrame), ByteBufUtil.prettyHexDump(msg.getSecond())); + } finally { + msg.getSecond().readerIndex(payloadReaderIndex); + } + } return checksum == computedChecksum; } else { log.warn("[{}] [{}] checksum is not present into message with id {}", topic, producerName, From 77f4ac4151a989743a5d8c63fdbb89b79bb1e63a Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Dec 2023 16:22:00 +0800 Subject: [PATCH 3/5] - --- .../logger/VerboseLoggerThrottleTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java index 4dd6fabaaf241..9fb550125a517 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pulsar.logger; import static org.testng.Assert.assertTrue; From 987251eceb69526879acb88ab0337acb611de395 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Dec 2023 16:31:55 +0800 Subject: [PATCH 4/5] - --- .../org/apache/pulsar/logger/VerboseLoggerThrottle.java | 2 +- .../org/apache/pulsar/logger/VerboseLoggerThrottleTest.java | 6 +++--- .../java/org/apache/pulsar/broker/service/Producer.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java index 50bccc8b6ef92..aa9b9716e6b7f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java @@ -55,7 +55,7 @@ public VerboseLoggerThrottle(long periodInSecond, int permits) { this.lastFillTimestamp = System.currentTimeMillis(); } - public boolean acquire() { + public boolean tryAcquire() { if (System.currentTimeMillis() - lastFillTimestamp >= periodInMilliSecond) { permits = permitsPerFilling; lastFillTimestamp = System.currentTimeMillis(); diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java index 9fb550125a517..764e0183ec952 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/logger/VerboseLoggerThrottleTest.java @@ -30,12 +30,12 @@ public void testAcquire() throws Exception { int permits = 10; VerboseLoggerThrottle throttle = new VerboseLoggerThrottle(periodInSecond, permits); for (int i = 0; i < permits; i++) { - assertTrue(throttle.acquire()); + assertTrue(throttle.tryAcquire()); } - assertFalse(throttle.acquire()); + assertFalse(throttle.tryAcquire()); Thread.sleep(periodInSecond * 1000); for (int i = 0; i < permits; i++) { - assertTrue(throttle.acquire()); + assertTrue(throttle.tryAcquire()); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 8f52bd6ba0817..47373e1636a98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -304,7 +304,7 @@ private boolean verifyChecksum(ByteBuf headersAndPayload) { if (checksum == computedChecksum) { return true; } else { - if (verboseLoggerThrottle.acquire()) { + if (verboseLoggerThrottle.tryAcquire()) { headersAndPayload.readerIndex(readerIndex); String hexHeadersAndPayload = ByteBufUtil.prettyHexDump(headersAndPayload); log.error("[{}] [{}] Failed to verify checksum, client-side checksum: {}," From 5102cf8ffaff5cb603fc768bb30b33e17d984e16 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 Dec 2023 16:32:27 +0800 Subject: [PATCH 5/5] - --- .../java/org/apache/pulsar/logger/VerboseLoggerThrottle.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java index aa9b9716e6b7f..c3d2db9916c25 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/logger/VerboseLoggerThrottle.java @@ -35,13 +35,13 @@ public class VerboseLoggerThrottle { /** * The last time of refreshing the bucket of permits. - * No thread-safe processing is done because a particularly precise count is not required. + * No thread-safe support because a particularly precise count is not required. */ private long lastFillTimestamp; /** * How many permits are there now. - * No thread-safe processing is done because a particularly precise count is not required. + * No thread-safe support because a particularly precise count is not required. */ private int permits;