Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .arcconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"project_id": "kafka-0.8.1",
"conduit_uri": "https://code.uberinternal.com/",
"git.default-relative-commit": "origin/0.8.1",
"arc.land.onto.default": "0.8.1"
}
24 changes: 24 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,27 @@ project/sbt_project_definition.iml
.#*
rat.out
TAGS

# Directories #
build/
core/data

# OS Files #
.DS_Store

# Package Files #
*.jar
*.war
*.ear
*.db

# Intellji Files #
.out
.idea
*.ipr
*.iws
*.iml
*.MF

# Gradle #
.gradle
56 changes: 52 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,20 @@ for ( sv in ['2_8_0', '2_9_1', '2_9_2', '2_10_1'] ) {
}
}

tasks.create(name: "jarAll", dependsOn: ['jar_core_2_8_0', 'jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_1', 'clients:jar', 'perf:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar']) {
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_8_0', 'jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_1', 'clients:jar', 'perf:jar', 'migration:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar']) {
}

tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_8_0', 'srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_1', 'clients:srcJar', 'perf:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar']) { }
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_8_0', 'srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_1', 'clients:srcJar', 'perf:srcJar', 'migration:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar']) { }

tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_8_0', 'docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_1', 'clients:docsJar', 'perf:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_8_0', 'docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_1', 'clients:docsJar', 'perf:docsJar', 'migration:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar']) { }

tasks.create(name: "testAll", dependsOn: ['test_core_2_8_0', 'test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_1', 'clients:test']) {
}

tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_8_0', 'releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_1']) {
}

tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_8_0', 'uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_1', 'perf:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) {
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_8_0', 'uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_1', 'perf:uploadArchives', 'migration:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) {
}

project(':core') {
Expand Down Expand Up @@ -218,6 +218,8 @@ project(':core') {
compile 'net.sf.jopt-simple:jopt-simple:3.2'
compile 'org.xerial.snappy:snappy-java:1.0.5'

runtime 'org.jmxtrans.agent:jmxtrans-agent:1.0.8'

testCompile 'junit:junit:4.1'
testCompile 'org.easymock:easymock:3.0'
testCompile 'org.objenesis:objenesis:1.2'
Expand Down Expand Up @@ -294,6 +296,52 @@ project(':perf') {
}
}

project(':migration') {
archivesBaseName = "kafka-migration"

dependencies {
compile project(':core')
compile 'com.google.guava:guava:18.0'

testCompile 'junit:junit:4.1'
testCompile 'org.mockito:mockito-core:1.+'
testCompile 'commons-io:commons-io:2.4'

test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}

sourceSets {
test {
output.resourcesDir = "build/classes/test"
}
}

tasks.create(name: "copyDependantLibs", type: Copy) {
into "$buildDir/dependant-libs-${scalaVersion}"
from configurations.runtime
}


jar {
dependsOn 'copyDependantLibs'
}
}

configurations {
// manually excludes some unnecessary dependencies
compile.exclude module: 'javax'
compile.exclude module: 'jms'
compile.exclude module: 'jmxri'
compile.exclude module: 'jmxtools'
compile.exclude module: 'mail'
compile.exclude module: 'netty'
}
}

project(':contrib:hadoop-consumer') {
archivesBaseName = "kafka-hadoop-consumer"

Expand Down
6 changes: 4 additions & 2 deletions config/tools-log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

log4j.rootLogger=WARN, stdout
log4j.rootLogger=WARN, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %t %-5p %c{1}:%L - %m%n

log4j.logger.com.uber.kafka=INFO


5 changes: 5 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kafka-0.8.1 (0) unstable; urgency=low

* Create debian package

-- Norbert Hu <norbert@uber.com> Fri, 10 Feb 2015 00:00:00 +0000
1 change: 1 addition & 0 deletions debian/compat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8
14 changes: 14 additions & 0 deletions debian/control
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Source: kafka-0.8.1
Section: database
Priority: extra
Maintainer: Norbert Hu <norbert@uber.com>
Build-Depends: debhelper (>= 8.0.0), openjdk-7-jdk, curl
Standards-Version: 3.9.3
Homepage: http://kafka.apache.org/

Package: kafka-0.8.1
Architecture: all
Depends: openjdk-7-jre-headless
Description: Distributed, partitioned, replicated commit log service.
Kafka is a distributed, partitioned, replicated commit log service.
It provides the functionality of a messaging system, but with a unique design.
35 changes: 35 additions & 0 deletions debian/copyright
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: kafka
Source: http://kafka.apache.org/downloads.html

Files: *
Copyright: 2013 Basho Technologies, Inc <packaging@basho.com>
License: Apache License, Version 2.0
You may not use this file except in compliance with the License.
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
.
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Files: debian/*
Copyright: 2014 Aleksey Morarash <aleksey.morarash@gmail.com>
License: GPL-2+
This package is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
.
This package is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
.
On Debian systems, the complete text of the GNU General
Public License version 2 can be found in "/usr/share/common-licenses/GPL-2".
4 changes: 4 additions & 0 deletions debian/docs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
HEADER
LICENSE
README.md
NOTICE
43 changes: 43 additions & 0 deletions debian/rules
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/make -f

%:
dh $@

SLF4J_VERSION = 1.7.7
SLF4J = slf4j-$(SLF4J_VERSION)

override_dh_auto_build:
# Build with scala 2.8.0 so that it matches with the scala version currently to build
# our existing kafka 0.7.0 for our kafka7 deployment. The reason why we need this specific
# build is to run the migration tool.
./gradlew -PscalaVersion=2.8.0 jar

# Do not install init script automatically
override_dh_installinit:

DESTDIR = debian/kafka-0.8.1
override_dh_auto_install:
install -m 755 -d $(DESTDIR)/etc/kafka-leaf
install -m 644 config/*.properties $(DESTDIR)/etc/kafka-leaf
install -m 755 -d $(DESTDIR)/usr/lib/kafka-leaf
for i in `ls | grep -vE config\|debian\|perf\|examples\|gradle\|system_test`; do \
cp -r $$i $(DESTDIR)/usr/lib/kafka-leaf || exit $$?; \
done
find $(DESTDIR)/usr/lib/kafka-leaf -type f -a \
\( -name \*.java -o -name \*.class -o \
-name \*.scala -o -name \*.gradle -o -name \*.MF -o -name \*.html \) \
-print -delete
for i in `seq 10`; do \
find $(DESTDIR) -type d -empty -print -exec rmdir '{}' ';' || :; \
done
find $(DESTDIR)/usr/lib/kafka-leaf -type f -a \
\( -name README\* -o -name LICENSE -o -name NOTICE -o -name HEADER \) \
-print -delete || :
find $(DESTDIR)/usr/lib/kafka-leaf -type d -a \
\( -name test -o -name src -o -name tmp \) \
-print -exec rm -rf '{}' ';' || :
ln -s /etc/kafka-leaf $(DESTDIR)/usr/lib/kafka-leaf/config
ln -s /var/log/kafka-leaf $(DESTDIR)/usr/lib/kafka-leaf/logs
sed -i 's#/tmp/zookeeper#/var/lib/kafka-leaf/zookeeper#' $(DESTDIR)/etc/kafka-leaf/zookeeper.properties
sed -i 's#/tmp/kafka-logs#/var/lib/kafka-leaf/logs#' $(DESTDIR)/etc/kafka-leaf/server.properties
install -m 755 -d $(DESTDIR)/var/lib/kafka-leaf $(DESTDIR)/var/log/kafka-leaf
1 change: 1 addition & 0 deletions debian/source/format
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.0
Empty file added foo
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved.
// @author Seung-Yeoul Yang (syyang@uber.com)

package com.uber.kafka.tools;

public interface Kafka7LatestOffsets {

/**
* Returns the latest offset for a given Kafka 0.7 topic and partition.
*/
long get(String topic, int partition);

void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved.
// @author Seung-Yeoul Yang (syyang@uber.com)

package com.uber.kafka.tools;

import java.lang.reflect.Constructor;
import java.lang.reflect.Method;

/**
* Wrapper around Kafka 0.7 SimpleConsumer for fetching the latest
* offset. The shenanigans with class loader is necessary because
* of class name collisions between Kafka 0.7 and 0.8.
*/
class Kafka7LatestOffsetsImpl implements Kafka7LatestOffsets {

private static final long LATEST_OFFSET = -1L;
private static final String KAFKA_07_STATIC_SIMPLE_CONSUMER_CLASS_NAME =
"kafka.javaapi.consumer.SimpleConsumer";
private static final int SO_TIMEOUT_MS = 10 * 1000;
private static final int BUFFER_SIZE_BYTES = 1000 * 1024;
private static final String LEAF_KAFKA_07_HOST = "localhost";
private static final int LEAF_KAKFA_07_PORT = 9093;

private final Object simpleConsumer_07;
private final Method simpleConsumerGetOffsetBeforeMethod_07;
private final Method simpleConsumerCloseMethod_07;

Kafka7LatestOffsetsImpl(ClassLoader cl) throws Exception {
Class<?> simpleConsumerClass_07 = cl.loadClass(KAFKA_07_STATIC_SIMPLE_CONSUMER_CLASS_NAME);
Constructor simpleConsumerConstructor_07 = simpleConsumerClass_07.getConstructor(
String.class, int.class, int.class, int.class);
simpleConsumer_07 = simpleConsumerConstructor_07.newInstance(
LEAF_KAFKA_07_HOST, LEAF_KAKFA_07_PORT, SO_TIMEOUT_MS, BUFFER_SIZE_BYTES);
simpleConsumerGetOffsetBeforeMethod_07 = simpleConsumerClass_07.getMethod(
"getOffsetsBefore", String.class, int.class, long.class, int.class);
simpleConsumerCloseMethod_07 = simpleConsumerClass_07.getMethod("close");
}

@Override
public long get(String topic, int partition) {
long[] offsets = null;
try {
offsets = (long[]) simpleConsumerGetOffsetBeforeMethod_07.invoke(
simpleConsumer_07, topic, partition, LATEST_OFFSET, 1);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (offsets.length < 1) {
throw new RuntimeException("Failed to find latest offset for " +
topic + "_" + partition);
}
return offsets[0];
}

@Override
public void close() {
try {
simpleConsumerCloseMethod_07.invoke(simpleConsumer_07);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Loading