Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .jenkins/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ PRJ_HOME=`cd ${JENKINS_DIR}/..;pwd`

cd ${PRJ_HOME}

mvn clean license:check install
mvn clean license:check checkstyle:check install spotbugs:check
104 changes: 91 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,56 @@


<properties>
<javac.target>1.8</javac.target>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<pulsar.version>2.4.0</pulsar.version>
<netty.version>4.1.32.Final</netty.version>

<!-- dependencies -->
<commons-lang3.version>3.4</commons-lang3.version>
<guava.version>21.0</guava.version>
<grpc.version>1.18.0</grpc.version>
<slf4j.version>1.7.25</slf4j.version>
<kafka.version>2.0.0</kafka.version>
<testng.version>6.14.3</testng.version>
<log4j2.version>2.10.0</log4j2.version>
<jackson.version>2.9.8</jackson.version>
<jcommander.version>1.48</jcommander.version>
<commons-lang3.version>3.4</commons-lang3.version>
<mockito.version>2.22.0</mockito.version>
<kafka.version>2.0.0</kafka.version>
<log4j2.version>2.10.0</log4j2.version>
<lombok.version>1.18.4</lombok.version>
<dockerfile-maven.version>1.4.9</dockerfile-maven.version>
<mockito.version>2.22.0</mockito.version>
<netty.version>4.1.32.Final</netty.version>
<pulsar.version>2.4.0</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
<spotbugs-annotations.version>3.1.8</spotbugs-annotations.version>
<testcontainers.version>1.11.2</testcontainers.version>
<testng.version>6.14.3</testng.version>
<!-- plugin dependencies -->
<dockerfile-maven.version>1.4.9</dockerfile-maven.version>
<license-maven-plugin.version>3.0.rc1</license-maven-plugin.version>
<maven-checkstyle-plugin.version>3.0.0</maven-checkstyle-plugin.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>
<os-maven-plugin.version>1.4.1.Final</os-maven-plugin.version>
<puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
<spotbugs-maven-plugin.version>3.1.8</spotbugs-maven-plugin.version>
</properties>

<!-- dependency definitions -->
<dependencyManagement>
<dependencies>
<!-- provided dependencies -->
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>${spotbugs-annotations.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- provided dependencies (available at compilation and test classpths and *NOT* packaged) -->
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
Expand Down Expand Up @@ -170,19 +199,68 @@
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>${maven-checkstyle-plugin.version}</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>${puppycrawl.checkstyle.version}</version>
</dependency>
</dependencies>
<configuration>
<configLocation>src/resources/streamnative/checkstyle.xml</configLocation>
<suppressionsLocation>src/resources/streamnative/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeResources>false</includeResources>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<id>checkstyle</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<version>${spotbugs-maven-plugin.version}</version>
<configuration>
<excludeFilterFile>${session.executionRootDirectory}/src/resources/streamnative/findbugsExclude.xml</excludeFilterFile>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>${javac.target}</source>
<target>${javac.target}</target>
<compilerArgs>
<compilerArg>-Werror</compilerArg>
<compilerArg>-Xlint:deprecation</compilerArg>
<compilerArg>-Xlint:unchecked</compilerArg>
<!-- https://issues.apache.org/jira/browse/MCOMPILER-205 -->
<compilerArg>-Xpkginfo:always</compilerArg>
</compilerArgs>
</configuration>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M1</version>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<reuseForks>false</reuseForks>
<forkCount>1</forkCount>
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/streamnative/kop/KafkaBrokerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@
import org.apache.pulsar.common.util.netty.EventLoopUtil;

/**
* Main class for Pulsar kafkaBroker service
* Main class for Pulsar kafkaBroker service.
*/

@Slf4j
public class KafkaBrokerService extends BrokerService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

/**
* A channel initializer that initialize channels for kafka protocol.
*/
public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {

static final int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB

private final KafkaService kafkaService;
// TODO: handle TLS -- https://github.com/streamnative/kop/issues/2
private final boolean enableTls;
final static int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB

public KafkaChannelInitializer(KafkaService kafkaService, boolean enableTLS) throws Exception {
super();
Expand All @@ -34,7 +38,8 @@ public KafkaChannelInitializer(KafkaService kafkaService, boolean enableTLS) thr
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new KafkaRequestHandler(kafkaService));
}
}
21 changes: 16 additions & 5 deletions src/main/java/io/streamnative/kop/KafkaCommandDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;

/**
* A decoder that decodes kafka requests and responses.
*/
@Slf4j
public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
protected ChannelHandlerContext ctx;
Expand Down Expand Up @@ -69,8 +72,8 @@ protected KafkaHeaderAndRequest byteBufToRequest(ByteBuf msg) {
}

protected ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
try(KafkaHeaderAndResponse kafkaHeaderAndResponse
= KafkaHeaderAndResponse.responseForRequest(request, response)) {
try (KafkaHeaderAndResponse kafkaHeaderAndResponse =
KafkaHeaderAndResponse.responseForRequest(request, response)) {

ByteBuffer serialized = kafkaHeaderAndResponse
.getResponse()
Expand Down Expand Up @@ -209,7 +212,10 @@ static class KafkaHeaderAndResponse implements Closeable {
private final AbstractResponse response;
private final ByteBuf buffer;

private KafkaHeaderAndResponse(short apiVersion, ResponseHeader header, AbstractResponse response, ByteBuf buffer) {
private KafkaHeaderAndResponse(short apiVersion,
ResponseHeader header,
AbstractResponse response,
ByteBuf buffer) {
this.apiVersion = apiVersion;
this.header = header;
this.response = response;
Expand All @@ -229,11 +235,16 @@ public AbstractResponse getResponse() {
}

static KafkaHeaderAndResponse responseForRequest(KafkaHeaderAndRequest request, AbstractResponse response) {
return new KafkaHeaderAndResponse(request.getHeader().apiVersion(), request.getHeader().toResponseHeader(), response, request.getBuffer());
return new KafkaHeaderAndResponse(
request.getHeader().apiVersion(),
request.getHeader().toResponseHeader(),
response,
request.getBuffer());
}

public String toString() {
return String.format("KafkaHeaderAndResponse(header=%s,response=%s)", this.header.toStruct().toString(), this.response.toString(this.getApiVersion()));
return String.format("KafkaHeaderAndResponse(header=%s,response=%s)",
this.header.toStruct().toString(), this.response.toString(this.getApiVersion()));
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;

/**
* A request handler to handle kafka requests.
*/
@Slf4j
public class KafkaRequestHandler extends KafkaCommandDecoder {

Expand Down
39 changes: 25 additions & 14 deletions src/main/java/io/streamnative/kop/KafkaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
import org.eclipse.jetty.servlet.ServletHolder;

/**
* Main class for Pulsar broker service
* Main class for Kafka-on-Pulsar broker service.
*/

@Slf4j
public class KafkaService extends PulsarService {

Expand All @@ -55,10 +54,12 @@ public void start() throws PulsarServerException {
ReentrantLock lock = ReflectionUtils.getField(this, "mutex");

lock.lock();
log.info("Starting Pulsar Broker service powered by Pulsar version: '{}'", (getBrokerVersion() != null ? getBrokerVersion() : "unknown" ) );
// TODO: add Kafka on Pulsar Verison support -- https://github.com/streamnative/kop/issues/3

try {
// TODO: add Kafka on Pulsar Verison support -- https://github.com/streamnative/kop/issues/3
log.info("Starting Pulsar Broker service powered by Pulsar version: '{}'",
(getBrokerVersion() != null ? getBrokerVersion() : "unknown"));

if (getState() != State.Init) {
throw new PulsarServerException("Cannot start the service once it was stopped");
}
Expand Down Expand Up @@ -143,12 +144,18 @@ public Boolean get() {
return getState() == State.Started;
}
});
webService.addRestResources("/", VipStatus.class.getPackage().getName(), false, vipAttributeMap);
webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
webService.addRestResources("/",
VipStatus.class.getPackage().getName(), false, vipAttributeMap);
webService.addRestResources("/",
"org.apache.pulsar.broker.web", false, attributeMap);
webService.addRestResources("/admin",
"org.apache.pulsar.broker.admin.v1", true, attributeMap);
webService.addRestResources("/admin/v2",
"org.apache.pulsar.broker.admin.v2", true, attributeMap);
webService.addRestResources("/admin/v3",
"org.apache.pulsar.broker.admin.v3", true, attributeMap);
webService.addRestResources("/lookup",
"org.apache.pulsar.broker.lookup", true, attributeMap);

webService.addServlet("/metrics",
new ServletHolder(
Expand Down Expand Up @@ -196,10 +203,14 @@ public Boolean get() {
"acquireSLANamespace");

final String bootstrapMessage = "bootstrap service "
+ (kafkaConfig.getWebServicePort().isPresent() ? "port = " + kafkaConfig.getWebServicePort().get() : "")
+ (kafkaConfig.getWebServicePortTls().isPresent() ? "tls-port = " + kafkaConfig.getWebServicePortTls() : "")
+ (kafkaConfig.getKafkaServicePort().isPresent() ? "broker url= " + kafkaConfig.getKafkaServicePort() : "")
+ (kafkaConfig.getKafkaServicePortTls().isPresent() ? "broker url= " + kafkaConfig.getKafkaServicePortTls() : "");
+ (kafkaConfig.getWebServicePort().isPresent()
? "port = " + kafkaConfig.getWebServicePort().get() : "")
+ (kafkaConfig.getWebServicePortTls().isPresent()
? "tls-port = " + kafkaConfig.getWebServicePortTls() : "")
+ (kafkaConfig.getKafkaServicePort().isPresent()
? "broker url= " + kafkaConfig.getKafkaServicePort() : "")
+ (kafkaConfig.getKafkaServicePortTls().isPresent()
? "broker url= " + kafkaConfig.getKafkaServicePortTls() : "");

log.info("Kafka messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage,
kafkaConfig.getClusterName(), ReflectionToStringBuilder.toString(kafkaConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
private static final String CATEGORY_KOP = "Kafka on Pulsar";


/***** --- Kafka on Pulsar Broker configuration --- ****/
//
// --- Kafka on Pulsar Broker configuration ---
//

@FieldContext(
category = CATEGORY_KOP,
required = true,
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/io/streamnative/kop/KafkaStandalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;

/**
* A standalone instance includes all the components for running Kafka-on-Pulsar.
*/
@Slf4j
public class KafkaStandalone implements AutoCloseable {
KafkaService kafkaBroker;
Expand Down Expand Up @@ -58,7 +61,9 @@ public void setAdvertisedAddress(String advertisedAddress) {
this.advertisedAddress = advertisedAddress;
}

public void setConfig(KafkaServiceConfiguration config) { this.config = config; }
public void setConfig(KafkaServiceConfiguration config) {
this.config = config;
}

public void setConfigFile(String configFile) {
this.configFile = configFile;
Expand Down Expand Up @@ -198,7 +203,7 @@ public boolean isHelp() {
public void start() throws Exception {

if (config == null) {
System.exit(1);
throw new IllegalArgumentException("Null configuration is provided");
}

log.info("--- setup KafkaStandaloneStarter ---");
Expand Down Expand Up @@ -256,7 +261,8 @@ private void createDefaultNameSpace(URL webServiceUrl, String brokerServiceUrl,
}
if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
admin.namespaces().createNamespace(defaultNamespace);
admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(config.getClusterName()));
admin.namespaces().setNamespaceReplicationClusters(
defaultNamespace, Sets.newHashSet(config.getClusterName()));
}
} catch (PulsarAdminException e) {
log.info("error while create default namespace: {}", e.getMessage());
Expand Down
Loading