diff --git a/.jenkins/build.sh b/.jenkins/build.sh
index be3e585f7f..b28f022fb4 100755
--- a/.jenkins/build.sh
+++ b/.jenkins/build.sh
@@ -19,4 +19,4 @@ PRJ_HOME=`cd ${JENKINS_DIR}/..;pwd`
cd ${PRJ_HOME}
-mvn clean license:check install
+mvn clean license:check checkstyle:check install spotbugs:check
diff --git a/pom.xml b/pom.xml
index 5111debf28..d60a845b56 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,27 +25,56 @@
+ 1.8
true
- 2.4.0
- 4.1.32.Final
+
+
+ 3.4
21.0
1.18.0
- 1.7.25
- 2.0.0
- 6.14.3
- 2.10.0
2.9.8
1.48
- 3.4
- 2.22.0
+ 2.0.0
+ 2.10.0
1.18.4
- 1.4.9
+ 2.22.0
+ 4.1.32.Final
+ 2.4.0
+ 1.7.25
+ 3.1.8
1.11.2
+ 6.14.3
+ 1.4.9
3.0.rc1
+ 3.0.0
+ 3.8.0
+ 3.0.0-M1
+ 1.4.1.Final
+ 6.19
+ 3.1.8
+
+
+
+
+
+ com.github.spotbugs
+ spotbugs-annotations
+ ${spotbugs-annotations.version}
+
+
+
+
+
+
+ com.github.spotbugs
+ spotbugs-annotations
+ provided
+
+
io.grpc
grpc-core
@@ -170,19 +199,68 @@
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ ${maven-checkstyle-plugin.version}
+
+
+ com.puppycrawl.tools
+ checkstyle
+ ${puppycrawl.checkstyle.version}
+
+
+
+ src/resources/streamnative/checkstyle.xml
+ src/resources/streamnative/suppressions.xml
+ UTF-8
+ true
+ true
+ false
+ true
+
+
+
+ checkstyle
+ validate
+
+ check
+
+
+
+
+
+
+
+ com.github.spotbugs
+ spotbugs-maven-plugin
+ ${spotbugs-maven-plugin.version}
+
+ ${session.executionRootDirectory}/src/resources/streamnative/findbugsExclude.xml
+
+
maven-compiler-plugin
- 3.8.0
+ ${maven-compiler-plugin.version}
- 1.8
- 1.8
+ ${javac.target}
+ ${javac.target}
+
+ -Werror
+ -Xlint:deprecation
+ -Xlint:unchecked
+
+ -Xpkginfo:always
+
maven-surefire-plugin
- 3.0.0-M1
+ ${maven-surefire-plugin.version}
false
1
diff --git a/src/main/java/io/streamnative/kop/KafkaBrokerService.java b/src/main/java/io/streamnative/kop/KafkaBrokerService.java
index 67d7372369..fdb5ec3059 100644
--- a/src/main/java/io/streamnative/kop/KafkaBrokerService.java
+++ b/src/main/java/io/streamnative/kop/KafkaBrokerService.java
@@ -27,9 +27,8 @@
import org.apache.pulsar.common.util.netty.EventLoopUtil;
/**
- * Main class for Pulsar kafkaBroker service
+ * Main class for Pulsar kafkaBroker service.
*/
-
@Slf4j
public class KafkaBrokerService extends BrokerService {
diff --git a/src/main/java/io/streamnative/kop/KafkaChannelInitializer.java b/src/main/java/io/streamnative/kop/KafkaChannelInitializer.java
index e9e0c57316..5c73573f6e 100644
--- a/src/main/java/io/streamnative/kop/KafkaChannelInitializer.java
+++ b/src/main/java/io/streamnative/kop/KafkaChannelInitializer.java
@@ -18,12 +18,16 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
+/**
+ * A channel initializer that initialize channels for kafka protocol.
+ */
public class KafkaChannelInitializer extends ChannelInitializer {
+ static final int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB
+
private final KafkaService kafkaService;
// TODO: handle TLS -- https://github.com/streamnative/kop/issues/2
private final boolean enableTls;
- final static int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB
public KafkaChannelInitializer(KafkaService kafkaService, boolean enableTLS) throws Exception {
super();
@@ -34,7 +38,8 @@ public KafkaChannelInitializer(KafkaService kafkaService, boolean enableTLS) thr
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4));
- ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
+ ch.pipeline().addLast("frameDecoder",
+ new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new KafkaRequestHandler(kafkaService));
}
}
diff --git a/src/main/java/io/streamnative/kop/KafkaCommandDecoder.java b/src/main/java/io/streamnative/kop/KafkaCommandDecoder.java
index 8a1d613ee2..fd2529a801 100644
--- a/src/main/java/io/streamnative/kop/KafkaCommandDecoder.java
+++ b/src/main/java/io/streamnative/kop/KafkaCommandDecoder.java
@@ -32,6 +32,9 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
+/**
+ * A decoder that decodes kafka requests and responses.
+ */
@Slf4j
public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
protected ChannelHandlerContext ctx;
@@ -69,8 +72,8 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg) {
}
protected ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
- try(KafkaHeaderAndResponse kafkaHeaderAndResponse
- = KafkaHeaderAndResponse.responseForRequest(request, response)) {
+ try (KafkaHeaderAndResponse kafkaHeaderAndResponse =
+ KafkaHeaderAndResponse.responseForRequest(request, response)) {
ByteBuffer serialized = kafkaHeaderAndResponse
.getResponse()
@@ -209,7 +212,10 @@ static class KafkaHeaderAndResponse implements Closeable {
private final AbstractResponse response;
private final ByteBuf buffer;
- private KafkaHeaderAndResponse(short apiVersion, ResponseHeader header, AbstractResponse response, ByteBuf buffer) {
+ private KafkaHeaderAndResponse(short apiVersion,
+ ResponseHeader header,
+ AbstractResponse response,
+ ByteBuf buffer) {
this.apiVersion = apiVersion;
this.header = header;
this.response = response;
@@ -229,11 +235,16 @@ public AbstractResponse getResponse() {
}
static KafkaHeaderAndResponse responseForRequest(KafkaHeaderAndRequest request, AbstractResponse response) {
- return new KafkaHeaderAndResponse(request.getHeader().apiVersion(), request.getHeader().toResponseHeader(), response, request.getBuffer());
+ return new KafkaHeaderAndResponse(
+ request.getHeader().apiVersion(),
+ request.getHeader().toResponseHeader(),
+ response,
+ request.getBuffer());
}
public String toString() {
- return String.format("KafkaHeaderAndResponse(header=%s,response=%s)", this.header.toStruct().toString(), this.response.toString(this.getApiVersion()));
+ return String.format("KafkaHeaderAndResponse(header=%s,response=%s)",
+ this.header.toStruct().toString(), this.response.toString(this.getApiVersion()));
}
@Override
diff --git a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java
index af528caea4..b4f9982809 100644
--- a/src/main/java/io/streamnative/kop/KafkaRequestHandler.java
+++ b/src/main/java/io/streamnative/kop/KafkaRequestHandler.java
@@ -22,6 +22,9 @@
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
+/**
+ * A request handler to handle kafka requests.
+ */
@Slf4j
public class KafkaRequestHandler extends KafkaCommandDecoder {
diff --git a/src/main/java/io/streamnative/kop/KafkaService.java b/src/main/java/io/streamnative/kop/KafkaService.java
index 25a6cde13a..011e8d7749 100644
--- a/src/main/java/io/streamnative/kop/KafkaService.java
+++ b/src/main/java/io/streamnative/kop/KafkaService.java
@@ -36,9 +36,8 @@
import org.eclipse.jetty.servlet.ServletHolder;
/**
- * Main class for Pulsar broker service
+ * Main class for Kafka-on-Pulsar broker service.
*/
-
@Slf4j
public class KafkaService extends PulsarService {
@@ -55,10 +54,12 @@ public void start() throws PulsarServerException {
ReentrantLock lock = ReflectionUtils.getField(this, "mutex");
lock.lock();
- log.info("Starting Pulsar Broker service powered by Pulsar version: '{}'", (getBrokerVersion() != null ? getBrokerVersion() : "unknown" ) );
- // TODO: add Kafka on Pulsar Verison support -- https://github.com/streamnative/kop/issues/3
try {
+ // TODO: add Kafka on Pulsar Verison support -- https://github.com/streamnative/kop/issues/3
+ log.info("Starting Pulsar Broker service powered by Pulsar version: '{}'",
+ (getBrokerVersion() != null ? getBrokerVersion() : "unknown"));
+
if (getState() != State.Init) {
throw new PulsarServerException("Cannot start the service once it was stopped");
}
@@ -143,12 +144,18 @@ public Boolean get() {
return getState() == State.Started;
}
});
- webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap);
- webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
- webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
- webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
- webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
- webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
+ webService.addRestResources("/",
+ VipStatus.class.getPackage().getName(), false, vipAttributeMap);
+ webService.addRestResources("/",
+ "org.apache.pulsar.broker.web", false, attributeMap);
+ webService.addRestResources("/admin",
+ "org.apache.pulsar.broker.admin.v1", true, attributeMap);
+ webService.addRestResources("/admin/v2",
+ "org.apache.pulsar.broker.admin.v2", true, attributeMap);
+ webService.addRestResources("/admin/v3",
+ "org.apache.pulsar.broker.admin.v3", true, attributeMap);
+ webService.addRestResources("/lookup",
+ "org.apache.pulsar.broker.lookup", true, attributeMap);
webService.addServlet("/metrics",
new ServletHolder(
@@ -196,10 +203,14 @@ public Boolean get() {
"acquireSLANamespace");
final String bootstrapMessage = "bootstrap service "
- + (kafkaConfig.getWebServicePort().isPresent() ? "port = " + kafkaConfig.getWebServicePort().get() : "")
- + (kafkaConfig.getWebServicePortTls().isPresent() ? "tls-port = " + kafkaConfig.getWebServicePortTls() : "")
- + (kafkaConfig.getKafkaServicePort().isPresent() ? "broker url= " + kafkaConfig.getKafkaServicePort() : "")
- + (kafkaConfig.getKafkaServicePortTls().isPresent() ? "broker url= " + kafkaConfig.getKafkaServicePortTls() : "");
+ + (kafkaConfig.getWebServicePort().isPresent()
+ ? "port = " + kafkaConfig.getWebServicePort().get() : "")
+ + (kafkaConfig.getWebServicePortTls().isPresent()
+ ? "tls-port = " + kafkaConfig.getWebServicePortTls() : "")
+ + (kafkaConfig.getKafkaServicePort().isPresent()
+ ? "broker url= " + kafkaConfig.getKafkaServicePort() : "")
+ + (kafkaConfig.getKafkaServicePortTls().isPresent()
+ ? "broker url= " + kafkaConfig.getKafkaServicePortTls() : "");
log.info("Kafka messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
kafkaConfig.getClusterName(), ReflectionToStringBuilder.toString(kafkaConfig));
diff --git a/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java b/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java
index 0834d948ad..0e2187e5ae 100644
--- a/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java
+++ b/src/main/java/io/streamnative/kop/KafkaServiceConfiguration.java
@@ -32,7 +32,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
private static final String CATEGORY_KOP = "Kafka on Pulsar";
- /***** --- Kafka on Pulsar Broker configuration --- ****/
+ //
+ // --- Kafka on Pulsar Broker configuration ---
+ //
+
@FieldContext(
category = CATEGORY_KOP,
required = true,
diff --git a/src/main/java/io/streamnative/kop/KafkaStandalone.java b/src/main/java/io/streamnative/kop/KafkaStandalone.java
index d0be15509c..25e8dd4f7d 100644
--- a/src/main/java/io/streamnative/kop/KafkaStandalone.java
+++ b/src/main/java/io/streamnative/kop/KafkaStandalone.java
@@ -27,6 +27,9 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+/**
+ * A standalone instance includes all the components for running Kafka-on-Pulsar.
+ */
@Slf4j
public class KafkaStandalone implements AutoCloseable {
KafkaService kafkaBroker;
@@ -58,7 +61,9 @@ public void setAdvertisedAddress(String advertisedAddress) {
this.advertisedAddress = advertisedAddress;
}
- public void setConfig(KafkaServiceConfiguration config) { this.config = config; }
+ public void setConfig(KafkaServiceConfiguration config) {
+ this.config = config;
+ }
public void setConfigFile(String configFile) {
this.configFile = configFile;
@@ -198,7 +203,7 @@ public boolean isHelp() {
public void start() throws Exception {
if (config == null) {
- System.exit(1);
+ throw new IllegalArgumentException("Null configuration is provided");
}
log.info("--- setup KafkaStandaloneStarter ---");
@@ -256,7 +261,8 @@ private void createDefaultNameSpace(URL webServiceUrl, String brokerServiceUrl,
}
if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
admin.namespaces().createNamespace(defaultNamespace);
- admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(config.getClusterName()));
+ admin.namespaces().setNamespaceReplicationClusters(
+ defaultNamespace, Sets.newHashSet(config.getClusterName()));
}
} catch (PulsarAdminException e) {
log.info("error while create default namespace: {}", e.getMessage());
diff --git a/src/main/java/io/streamnative/kop/KafkaStandaloneStarter.java b/src/main/java/io/streamnative/kop/KafkaStandaloneStarter.java
index a8854bd28e..101ba41b88 100644
--- a/src/main/java/io/streamnative/kop/KafkaStandaloneStarter.java
+++ b/src/main/java/io/streamnative/kop/KafkaStandaloneStarter.java
@@ -20,10 +20,12 @@
import java.io.FileInputStream;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+/**
+ * Starter to start kafka-on-pulsar broker.
+ */
@Slf4j
public class KafkaStandaloneStarter extends KafkaStandalone {
@@ -49,7 +51,9 @@ public KafkaStandaloneStarter(String[] args) throws Exception {
return;
}
- this.config = PulsarConfigurationLoader.create((new FileInputStream(this.getConfigFile())), KafkaServiceConfiguration.class);
+ this.config = PulsarConfigurationLoader.create(
+ new FileInputStream(this.getConfigFile()),
+ KafkaServiceConfiguration.class);
String zkServers = "127.0.0.1";
@@ -66,7 +70,7 @@ public KafkaStandaloneStarter(String[] args) throws Exception {
// Set ZK server's host to localhost
// Priority: args > conf > default
- if (argsContains(args,"--zookeeper-port")) {
+ if (argsContains(args, "--zookeeper-port")) {
config.setZookeeperServers(zkServers + ":" + this.getZkPort());
} else {
if (config.getZookeeperServers() != null) {
diff --git a/src/main/java/io/streamnative/kop/KafkaStarter.java b/src/main/java/io/streamnative/kop/KafkaStarter.java
index 01547a9ac9..de2f619eb2 100644
--- a/src/main/java/io/streamnative/kop/KafkaStarter.java
+++ b/src/main/java/io/streamnative/kop/KafkaStarter.java
@@ -24,6 +24,7 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.FileInputStream;
import java.net.MalformedURLException;
@@ -44,6 +45,9 @@
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.bridge.SLF4JBridgeHandler;
+/**
+ * A starter to start Kafka-on-Pulsar broker.
+ */
@Slf4j
public class KafkaStarter {
@@ -55,7 +59,7 @@ private static class BrokerStarter {
private final StatsProvider bookieStatsProvider;
private final ServerConfiguration bookieConfig;
- BrokerStarter(String[] args) throws Exception{
+ BrokerStarter(String[] args) throws Exception {
StarterArguments starterArguments = new StarterArguments();
JCommander jcommander = new JCommander(starterArguments);
jcommander.setProgramName("PulsarBrokerStarter");
@@ -64,7 +68,7 @@ private static class BrokerStarter {
jcommander.parse(args);
if (starterArguments.help) {
jcommander.usage();
- System.exit(-1);
+ Runtime.getRuntime().exit(-1);
}
// init broker config
@@ -85,12 +89,12 @@ private static class BrokerStarter {
// if no argument to run bookie in cmd line, read from pulsar config
if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
- checkState(starterArguments.runBookie == false,
+ checkState(!starterArguments.runBookie,
"runBookie should be false if has no argument specified");
starterArguments.runBookie = brokerConfig.isEnableRunBookieTogether();
}
if (!argsContains(args, "-ra") && !argsContains(args, "--run-bookie-autorecovery")) {
- checkState(starterArguments.runBookieAutoRecovery == false,
+ checkState(!starterArguments.runBookieAutoRecovery,
"runBookieAutoRecovery should be false if has no argument specified");
starterArguments.runBookieAutoRecovery = brokerConfig.isEnableRunBookieAutoRecoveryTogether();
}
@@ -160,6 +164,7 @@ public void join() throws InterruptedException {
}
}
+ @SuppressFBWarnings("RU_INVOKE_RUN")
public void shutdown() {
kafkaService.getShutdownService().run();
log.info("Shut down kafkaBroker service successfully.");
@@ -182,7 +187,11 @@ public void shutdown() {
public static void main(String[] args) throws Exception {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
- System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+ System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s",
+ dateFormat.format(new Date()),
+ thread.getContextClassLoader(),
+ thread.getName(),
+ exception.getMessage()));
});
BrokerStarter starter = new BrokerStarter(args);
@@ -209,19 +218,31 @@ public static void main(String[] args) throws Exception {
@VisibleForTesting
private static class StarterArguments {
- @Parameter(names = {"-c", "--kop-conf"}, description = "Configuration file for Kafka on Pulsar Broker")
- private String brokerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/kop.conf";
-
- @Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with Broker")
+ @Parameter(names = {
+ "-c", "--kop-conf"
+ }, description = "Configuration file for Kafka on Pulsar Broker")
+ private String brokerConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() + "/conf/kop.conf";
+
+ @Parameter(names = {
+ "-rb", "--run-bookie"
+ }, description = "Run Bookie together with Broker")
private boolean runBookie = false;
- @Parameter(names = {"-ra", "--run-bookie-autorecovery"}, description = "Run Bookie Autorecovery together with kafkaBroker")
+ @Parameter(names = {
+ "-ra", "--run-bookie-autorecovery"
+ }, description = "Run Bookie Autorecovery together with kafkaBroker")
private boolean runBookieAutoRecovery = false;
- @Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
- private String bookieConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
+ @Parameter(names = {
+ "-bc", "--bookie-conf"
+ }, description = "Configuration file for Bookie")
+ private String bookieConfigFile =
+ Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
- @Parameter(names = {"-h", "--help"}, description = "Show this help message")
+ @Parameter(names = {
+ "-h", "--help"
+ }, description = "Show this help message")
private boolean help = false;
}
diff --git a/src/main/java/io/streamnative/kop/package-info.java b/src/main/java/io/streamnative/kop/package-info.java
new file mode 100644
index 0000000000..3cf3f4ddef
--- /dev/null
+++ b/src/main/java/io/streamnative/kop/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for the classes running Kafka-on-Pulsar.
+ */
+package io.streamnative.kop;
\ No newline at end of file
diff --git a/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java b/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java
index 0a5638d8fd..287c20b675 100644
--- a/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java
+++ b/src/main/java/io/streamnative/kop/utils/ReflectionUtils.java
@@ -29,6 +29,7 @@ public final class ReflectionUtils {
* @param fieldName the private field name
* @return the value of the private field
*/
+ @SuppressWarnings("unchecked")
public static T getField(Object privateObject, String fieldName) {
try {
Field privateField = privateObject.getClass().getDeclaredField(fieldName);
diff --git a/src/main/java/org/apache/pulsar/broker/service/BrokerServiceUtil.java b/src/main/java/org/apache/pulsar/broker/service/BrokerServiceUtil.java
index 47488e6436..646ab034d6 100644
--- a/src/main/java/org/apache/pulsar/broker/service/BrokerServiceUtil.java
+++ b/src/main/java/org/apache/pulsar/broker/service/BrokerServiceUtil.java
@@ -14,12 +14,12 @@
package org.apache.pulsar.broker.service;
/**
- * Util class to access {@link BrokerService}
+ * Util class to access {@link BrokerService}.
*/
public final class BrokerServiceUtil {
/**
- * Start the stats updater for the given broker service
+ * Start the stats updater for the given broker service.
*
* @param service the broker service
* @param statsUpdateInitailDelayInSecs initial delay in seconds
diff --git a/src/main/java/org/apache/pulsar/broker/service/package-info.java b/src/main/java/org/apache/pulsar/broker/service/package-info.java
new file mode 100644
index 0000000000..5bf135c0bf
--- /dev/null
+++ b/src/main/java/org/apache/pulsar/broker/service/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for accessing broker service classes.
+ */
+package org.apache.pulsar.broker.service;
diff --git a/src/resources/streamnative/checkstyle.xml b/src/resources/streamnative/checkstyle.xml
new file mode 100644
index 0000000000..aa45b6a5e7
--- /dev/null
+++ b/src/resources/streamnative/checkstyle.xml
@@ -0,0 +1,440 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/resources/streamnative/findbugsExclude.xml b/src/resources/streamnative/findbugsExclude.xml
new file mode 100644
index 0000000000..572f91e27c
--- /dev/null
+++ b/src/resources/streamnative/findbugsExclude.xml
@@ -0,0 +1,17 @@
+
+
+
diff --git a/src/resources/streamnative/suppressions.xml b/src/resources/streamnative/suppressions.xml
new file mode 100644
index 0000000000..290e1800b0
--- /dev/null
+++ b/src/resources/streamnative/suppressions.xml
@@ -0,0 +1,47 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/test/java/io/streamnative/kop/KafkaRequestHandlerTest.java b/src/test/java/io/streamnative/kop/KafkaRequestHandlerTest.java
index b3ef172e46..6be8669d1a 100644
--- a/src/test/java/io/streamnative/kop/KafkaRequestHandlerTest.java
+++ b/src/test/java/io/streamnative/kop/KafkaRequestHandlerTest.java
@@ -39,6 +39,9 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+/**
+ * Unit test for {@link KafkaRequestHandler}.
+ */
public class KafkaRequestHandlerTest {
private KafkaService kafkaService;