diff --git a/.arcconfig b/.arcconfig new file mode 100644 index 0000000000000..8903ba67eac5d --- /dev/null +++ b/.arcconfig @@ -0,0 +1,6 @@ +{ + "project_id": "kafka-0.8.1", + "conduit_uri": "https://code.uberinternal.com/", + "git.default-relative-commit": "origin/0.8.1", + "arc.land.onto.default": "0.8.1" +} diff --git a/.gitignore b/.gitignore index 553a077d031a3..db830e032cf6c 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,27 @@ project/sbt_project_definition.iml .#* rat.out TAGS + +# Directories # +build/ +core/data + +# OS Files # +.DS_Store + +# Package Files # +*.jar +*.war +*.ear +*.db + +# Intellji Files # +.out +.idea +*.ipr +*.iws +*.iml +*.MF + +# Gradle # +.gradle diff --git a/build.gradle b/build.gradle index 7ff670e5873d5..e475fe8959350 100644 --- a/build.gradle +++ b/build.gradle @@ -168,12 +168,12 @@ for ( sv in ['2_8_0', '2_9_1', '2_9_2', '2_10_1'] ) { } } -tasks.create(name: "jarAll", dependsOn: ['jar_core_2_8_0', 'jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_1', 'clients:jar', 'perf:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar']) { +tasks.create(name: "jarAll", dependsOn: ['jar_core_2_8_0', 'jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_1', 'clients:jar', 'perf:jar', 'migration:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar']) { } -tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_8_0', 'srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_1', 'clients:srcJar', 'perf:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar']) { } +tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_8_0', 'srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_1', 'clients:srcJar', 'perf:srcJar', 'migration:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar']) { } -tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_8_0', 'docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_1', 'clients:docsJar', 'perf:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar']) { } +tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_8_0', 'docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_1', 'clients:docsJar', 'perf:docsJar', 'migration:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar']) { } tasks.create(name: "testAll", dependsOn: ['test_core_2_8_0', 'test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_1', 'clients:test']) { } @@ -181,7 +181,7 @@ tasks.create(name: "testAll", dependsOn: ['test_core_2_8_0', 'test_core_2_9_1', tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_8_0', 'releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_1']) { } -tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_8_0', 'uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_1', 'perf:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) { +tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_8_0', 'uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_1', 'perf:uploadArchives', 'migration:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives']) { } project(':core') { @@ -218,6 +218,8 @@ project(':core') { compile 'net.sf.jopt-simple:jopt-simple:3.2' compile 'org.xerial.snappy:snappy-java:1.0.5' + runtime 'org.jmxtrans.agent:jmxtrans-agent:1.0.8' + testCompile 'junit:junit:4.1' testCompile 'org.easymock:easymock:3.0' testCompile 'org.objenesis:objenesis:1.2' @@ -294,6 +296,52 @@ project(':perf') { } } +project(':migration') { + archivesBaseName = "kafka-migration" + + dependencies { + compile project(':core') + compile 'com.google.guava:guava:18.0' + + testCompile 'junit:junit:4.1' + testCompile 'org.mockito:mockito-core:1.+' + testCompile 'commons-io:commons-io:2.4' + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + sourceSets { + test { + output.resourcesDir = "build/classes/test" + } + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + into "$buildDir/dependant-libs-${scalaVersion}" + from configurations.runtime + } + + + jar { + dependsOn 'copyDependantLibs' + } + } + + configurations { + // manually excludes some unnecessary dependencies + compile.exclude module: 'javax' + compile.exclude module: 'jms' + compile.exclude module: 'jmxri' + compile.exclude module: 'jmxtools' + compile.exclude module: 'mail' + compile.exclude module: 'netty' + } +} + project(':contrib:hadoop-consumer') { archivesBaseName = "kafka-hadoop-consumer" diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties index 7924049014983..aea6f47330590 100644 --- a/config/tools-log4j.properties +++ b/config/tools-log4j.properties @@ -13,10 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=WARN, stdout +log4j.rootLogger=WARN, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %t %-5p %c{1}:%L - %m%n + +log4j.logger.com.uber.kafka=INFO diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 0000000000000..d93aa3d28228d --- /dev/null +++ b/debian/changelog @@ -0,0 +1,5 @@ +kafka-0.8.1 (0) unstable; urgency=low + + * Create debian package + + -- Norbert Hu Fri, 10 Feb 2015 00:00:00 +0000 diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000000000..45a4fb75db864 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +8 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000000000..a0af28485d403 --- /dev/null +++ b/debian/control @@ -0,0 +1,14 @@ +Source: kafka-0.8.1 +Section: database +Priority: extra +Maintainer: Norbert Hu +Build-Depends: debhelper (>= 8.0.0), openjdk-7-jdk, curl +Standards-Version: 3.9.3 +Homepage: http://kafka.apache.org/ + +Package: kafka-0.8.1 +Architecture: all +Depends: openjdk-7-jre-headless +Description: Distributed, partitioned, replicated commit log service. + Kafka is a distributed, partitioned, replicated commit log service. + It provides the functionality of a messaging system, but with a unique design. diff --git a/debian/copyright b/debian/copyright new file mode 100644 index 0000000000000..920d37f024ffc --- /dev/null +++ b/debian/copyright @@ -0,0 +1,35 @@ +Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ +Upstream-Name: kafka +Source: http://kafka.apache.org/downloads.html + +Files: * +Copyright: 2013 Basho Technologies, Inc +License: Apache License, Version 2.0 + You may not use this file except in compliance with the License. + You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + . + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Files: debian/* +Copyright: 2014 Aleksey Morarash +License: GPL-2+ + This package is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + . + This package is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + . + You should have received a copy of the GNU General Public License + along with this program. If not, see + . + On Debian systems, the complete text of the GNU General + Public License version 2 can be found in "/usr/share/common-licenses/GPL-2". diff --git a/debian/docs b/debian/docs new file mode 100644 index 0000000000000..86b1f713d483d --- /dev/null +++ b/debian/docs @@ -0,0 +1,4 @@ +HEADER +LICENSE +README.md +NOTICE diff --git a/debian/rules b/debian/rules new file mode 100755 index 0000000000000..3f5872c732be6 --- /dev/null +++ b/debian/rules @@ -0,0 +1,43 @@ +#!/usr/bin/make -f + +%: + dh $@ + +SLF4J_VERSION = 1.7.7 +SLF4J = slf4j-$(SLF4J_VERSION) + +override_dh_auto_build: + # Build with scala 2.8.0 so that it matches with the scala version currently to build + # our existing kafka 0.7.0 for our kafka7 deployment. The reason why we need this specific + # build is to run the migration tool. + ./gradlew -PscalaVersion=2.8.0 jar + +# Do not install init script automatically +override_dh_installinit: + +DESTDIR = debian/kafka-0.8.1 +override_dh_auto_install: + install -m 755 -d $(DESTDIR)/etc/kafka-leaf + install -m 644 config/*.properties $(DESTDIR)/etc/kafka-leaf + install -m 755 -d $(DESTDIR)/usr/lib/kafka-leaf + for i in `ls | grep -vE config\|debian\|perf\|examples\|gradle\|system_test`; do \ + cp -r $$i $(DESTDIR)/usr/lib/kafka-leaf || exit $$?; \ + done + find $(DESTDIR)/usr/lib/kafka-leaf -type f -a \ + \( -name \*.java -o -name \*.class -o \ + -name \*.scala -o -name \*.gradle -o -name \*.MF -o -name \*.html \) \ + -print -delete + for i in `seq 10`; do \ + find $(DESTDIR) -type d -empty -print -exec rmdir '{}' ';' || :; \ + done + find $(DESTDIR)/usr/lib/kafka-leaf -type f -a \ + \( -name README\* -o -name LICENSE -o -name NOTICE -o -name HEADER \) \ + -print -delete || : + find $(DESTDIR)/usr/lib/kafka-leaf -type d -a \ + \( -name test -o -name src -o -name tmp \) \ + -print -exec rm -rf '{}' ';' || : + ln -s /etc/kafka-leaf $(DESTDIR)/usr/lib/kafka-leaf/config + ln -s /var/log/kafka-leaf $(DESTDIR)/usr/lib/kafka-leaf/logs + sed -i 's#/tmp/zookeeper#/var/lib/kafka-leaf/zookeeper#' $(DESTDIR)/etc/kafka-leaf/zookeeper.properties + sed -i 's#/tmp/kafka-logs#/var/lib/kafka-leaf/logs#' $(DESTDIR)/etc/kafka-leaf/server.properties + install -m 755 -d $(DESTDIR)/var/lib/kafka-leaf $(DESTDIR)/var/log/kafka-leaf diff --git a/debian/source/format b/debian/source/format new file mode 100644 index 0000000000000..d3827e75a5cad --- /dev/null +++ b/debian/source/format @@ -0,0 +1 @@ +1.0 diff --git a/foo b/foo new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/migration/src/main/java/com/uber/kafka/tools/Kafka7LatestOffsets.java b/migration/src/main/java/com/uber/kafka/tools/Kafka7LatestOffsets.java new file mode 100644 index 0000000000000..e98ddf20c8ea5 --- /dev/null +++ b/migration/src/main/java/com/uber/kafka/tools/Kafka7LatestOffsets.java @@ -0,0 +1,14 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +public interface Kafka7LatestOffsets { + + /** + * Returns the latest offset for a given Kafka 0.7 topic and partition. + */ + long get(String topic, int partition); + + void close(); +} diff --git a/migration/src/main/java/com/uber/kafka/tools/Kafka7LatestOffsetsImpl.java b/migration/src/main/java/com/uber/kafka/tools/Kafka7LatestOffsetsImpl.java new file mode 100644 index 0000000000000..580091543533e --- /dev/null +++ b/migration/src/main/java/com/uber/kafka/tools/Kafka7LatestOffsetsImpl.java @@ -0,0 +1,64 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; + +/** + * Wrapper around Kafka 0.7 SimpleConsumer for fetching the latest + * offset. The shenanigans with class loader is necessary because + * of class name collisions between Kafka 0.7 and 0.8. + */ +class Kafka7LatestOffsetsImpl implements Kafka7LatestOffsets { + + private static final long LATEST_OFFSET = -1L; + private static final String KAFKA_07_STATIC_SIMPLE_CONSUMER_CLASS_NAME = + "kafka.javaapi.consumer.SimpleConsumer"; + private static final int SO_TIMEOUT_MS = 10 * 1000; + private static final int BUFFER_SIZE_BYTES = 1000 * 1024; + private static final String LEAF_KAFKA_07_HOST = "localhost"; + private static final int LEAF_KAKFA_07_PORT = 9093; + + private final Object simpleConsumer_07; + private final Method simpleConsumerGetOffsetBeforeMethod_07; + private final Method simpleConsumerCloseMethod_07; + + Kafka7LatestOffsetsImpl(ClassLoader cl) throws Exception { + Class simpleConsumerClass_07 = cl.loadClass(KAFKA_07_STATIC_SIMPLE_CONSUMER_CLASS_NAME); + Constructor simpleConsumerConstructor_07 = simpleConsumerClass_07.getConstructor( + String.class, int.class, int.class, int.class); + simpleConsumer_07 = simpleConsumerConstructor_07.newInstance( + LEAF_KAFKA_07_HOST, LEAF_KAKFA_07_PORT, SO_TIMEOUT_MS, BUFFER_SIZE_BYTES); + simpleConsumerGetOffsetBeforeMethod_07 = simpleConsumerClass_07.getMethod( + "getOffsetsBefore", String.class, int.class, long.class, int.class); + simpleConsumerCloseMethod_07 = simpleConsumerClass_07.getMethod("close"); + } + + @Override + public long get(String topic, int partition) { + long[] offsets = null; + try { + offsets = (long[]) simpleConsumerGetOffsetBeforeMethod_07.invoke( + simpleConsumer_07, topic, partition, LATEST_OFFSET, 1); + } catch (Exception e) { + throw new RuntimeException(e); + } + if (offsets.length < 1) { + throw new RuntimeException("Failed to find latest offset for " + + topic + "_" + partition); + } + return offsets[0]; + } + + @Override + public void close() { + try { + simpleConsumerCloseMethod_07.invoke(simpleConsumer_07); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/migration/src/main/java/com/uber/kafka/tools/Kafka7OffsetFixer.java b/migration/src/main/java/com/uber/kafka/tools/Kafka7OffsetFixer.java new file mode 100644 index 0000000000000..a40f2343e0acb --- /dev/null +++ b/migration/src/main/java/com/uber/kafka/tools/Kafka7OffsetFixer.java @@ -0,0 +1,110 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import java.io.File; +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * Fixes corrupt Kafka 0.7 offset using offset index file (containing a list of latest + * offsets written out periodically by a cron job). + */ +public class Kafka7OffsetFixer { + + private static final Logger LOGGER = Logger.getLogger(Kafka7OffsetFixer.class); + + private static final long OFFSET_INDEX_MAX_AGE_MS = TimeUnit.MINUTES.toMillis(10L); + + private static final int PARTITION_0 = 0; + + private static final String LEAF_KAFKA07_ZK_HOST = "localhost:2182"; + + private static final String MIRRORMAKER_PATH = "/var/mirrormaker"; + private static final String CURRENT_OFFSET_PATH = "/consumers/%s/offsets/%s/0-0"; + + private final Kafka7LatestOffsets latestOffsets; + private final ZkClient zkClient; + private final String mirrorMakerPath; + + public Kafka7OffsetFixer(Kafka7LatestOffsets latestOffsetProvider) { + this(latestOffsetProvider, MIRRORMAKER_PATH); + } + + public Kafka7OffsetFixer(Kafka7LatestOffsets latestOffsets, String mirrorMakerPath) { + this(latestOffsets, mirrorMakerPath, + MigrationUtils.get().newZkClient(LEAF_KAFKA07_ZK_HOST)); + } + + public Kafka7OffsetFixer(Kafka7LatestOffsets latestOffsets, + String mirrorMakerPath, ZkClient zkClient) { + this.latestOffsets = latestOffsets; + this.mirrorMakerPath = mirrorMakerPath; + this.zkClient = zkClient; + } + + public void close() { + try { + latestOffsets.close(); + zkClient.close(); + } catch (Exception e) { + LOGGER.warn("Failed to clean-up Kafka offset fixer", e); + } + } + + public static String getCurrentOffsetZkPath(String consumerGroup, String topic) { + return String.format(CURRENT_OFFSET_PATH, consumerGroup, topic); + } + + public void fixOffset(String consumerGroup, String topic) { + Preconditions.checkNotNull(topic, "Topic can't be null"); + // Load offset index. + File offsetIndexFile = new File(mirrorMakerPath, topic + "_0"); + String offsetIndexPath = offsetIndexFile.getPath(); + if (!offsetIndexFile.exists()) { + throw new RuntimeException("Offset index file doesn't exist at path: " + + offsetIndexPath); + } + // Check whether offset index is stale. + if (offsetIndexFile.lastModified() < System.currentTimeMillis() - OFFSET_INDEX_MAX_AGE_MS) { + throw new RuntimeException("Offset index file is stale. Path: " + offsetIndexPath + + ", last modified time: " + new Date(offsetIndexFile.lastModified())); + } + + LOGGER.info("Loading offset file for topic: " + topic + ", path: " + offsetIndexPath); + OffsetIndex index = OffsetIndex.load(offsetIndexPath); + + // Get current offset from kafka leaf zookeeper. + String currentOffsetPath = getCurrentOffsetZkPath(consumerGroup, topic); + LOGGER.info("Reading current offset for topic: " + topic + ", zk path: " + currentOffsetPath); + if (!zkClient.exists(currentOffsetPath)) { + throw new RuntimeException("Missing current offset for " + topic + "at zk path: " + + currentOffsetPath); + } + byte[] currentOffsetBytes = zkClient.readData(currentOffsetPath); + long currentOffset = Long.parseLong(new String(currentOffsetBytes)); + LOGGER.info("Current offset: " + currentOffset + ", topic: " + topic); + + // Find the next valid offset. + long newOffset = index.getNextOffset(currentOffset); + if (newOffset == OffsetIndex.LATEST_OFFSET) { + // Resolve latest offset to actual offset. + newOffset = latestOffsets.get(topic, PARTITION_0); + LOGGER.info("Resolved latest offset to " + newOffset + ", topic " + topic); + } + + // Update the current offset in zookeeper. + byte[] newOffsetBytes = Long.toString(newOffset).getBytes(); + zkClient.writeData(currentOffsetPath, newOffsetBytes); + + LOGGER.info(String.format("Updated offset for %s from %d to %d", + topic, currentOffset, newOffset)); + } + +} diff --git a/migration/src/main/java/com/uber/kafka/tools/KafkaMigrationTool.java b/migration/src/main/java/com/uber/kafka/tools/KafkaMigrationTool.java new file mode 100644 index 0000000000000..cc059d5429200 --- /dev/null +++ b/migration/src/main/java/com/uber/kafka/tools/KafkaMigrationTool.java @@ -0,0 +1,605 @@ +package com.uber.kafka.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.utils.Utils; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + +import com.google.common.base.Joiner; + + +/** + * kafka 0.7 to 0.8 online migration tool based on kafka.tools.KafkaMigrationTool. + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +public class KafkaMigrationTool +{ + private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); + private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer"; + private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig"; + private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream"; + private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator"; + private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector"; + private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata"; + private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message"; + private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist"; + private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter"; + private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist"; + private static final String KAFKA_07_CONSUMER_GROUP_PROPERTY = "groupid"; + + private static Class KafkaStaticConsumer_07 = null; + private static Class ConsumerConfig_07 = null; + private static Class ConsumerConnector_07 = null; + private static Class KafkaStream_07 = null; + private static Class TopicFilter_07 = null; + private static Class WhiteList_07 = null; + private static Class BlackList_07 = null; + private static Class KafkaConsumerIteratorClass_07 = null; + private static Class KafkaMessageAndMetatDataClass_07 = null; + private static Class KafkaMessageClass_07 = null; + + /** + * Snoops into error logs, and tracks topics whose offsets are invalid. This hack is + * necessary because while high-level consumer (i.e. ConsumerIterator.hasNext()) throws + * an exception when it encounters a message with invalid offset, the exception doesn't + * contain any info about the topic associated with the message. + */ + private static class InvalidMessageAppender extends AppenderSkeleton { + private static final String FETCH_RUNNABLE_ERROR_PREFIX = "error in FetcherRunnable for "; + + private final MigrationContext context; + + public InvalidMessageAppender(MigrationContext context) { + this.context = context; + } + + @Override + protected void append(LoggingEvent event) { + if (event.getLevel() != Level.ERROR) { + return; + } + String message = event.getRenderedMessage(); + if (message == null || !message.startsWith(FETCH_RUNNABLE_ERROR_PREFIX)) { + return; + } + int beginIdx = FETCH_RUNNABLE_ERROR_PREFIX.length(); + int endIdx = message.indexOf(':'); + // e.g. error in FetcherRunnable for foo:0-0: fetched offset = 12: consumed offset = 25 + String topic = message.substring(beginIdx, endIdx); + context.addTopicWithCorruptOffset(topic); + logger.info("Detected topic with invalid offset: " + topic); + } + + @Override + public void close() { } + + @Override + public boolean requiresLayout() { return false; } + } + + public static void main(String[] args) throws InterruptedException, IOException { + final MigrationContext context = new MigrationContext(); + InvalidMessageAppender foo = new InvalidMessageAppender(context); + Logger.getRootLogger().addAppender(foo); + + OptionParser parser = new OptionParser(); + ArgumentAcceptingOptionSpec consumerConfigOpt + = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + + ArgumentAcceptingOptionSpec producerConfigOpt + = parser.accepts("producer.config", "Producer config.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + + ArgumentAcceptingOptionSpec numProducersOpt + = parser.accepts("num.producers", "Number of producer instances") + .withRequiredArg() + .describedAs("Number of producers") + .ofType(Integer.class) + .defaultsTo(1); + + ArgumentAcceptingOptionSpec zkClient01JarOpt + = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file") + .withRequiredArg() + .describedAs("zkClient 0.1 jar file required by Kafka 0.7") + .ofType(String.class); + + ArgumentAcceptingOptionSpec kafka07JarOpt + = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file") + .withRequiredArg() + .describedAs("kafka 0.7 jar") + .ofType(String.class); + + ArgumentAcceptingOptionSpec kafka08ZKHostsOpt + = parser.accepts("kafka08.zookeeper.connect", "Zookeeper chroot path for the 0.8 cluster") + .withRequiredArg() + .describedAs("Zookeeper chroot path for the 0.8 cluster") + .ofType(String.class); + + ArgumentAcceptingOptionSpec numStreamsOpt + = parser.accepts("num.streams", "Number of consumer streams") + .withRequiredArg() + .describedAs("Number of consumer threads") + .ofType(Integer.class) + .defaultsTo(1); + + ArgumentAcceptingOptionSpec whitelistOpt + = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); + + ArgumentAcceptingOptionSpec blacklistOpt + = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); + + ArgumentAcceptingOptionSpec queueSizeOpt + = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer") + .withRequiredArg() + .describedAs("Queue size in terms of number of messages") + .ofType(Integer.class) + .defaultsTo(10000); + + OptionSpecBuilder helpOpt + = parser.accepts("help", "Print this message."); + + OptionSet options = parser.parse(args); + + if (options.has(helpOpt)) { + parser.printHelpOn(System.out); + System.exit(0); + } + + checkRequiredArgs(parser, options, new OptionSpec[]{ + consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt, kafka08ZKHostsOpt}); + int whiteListCount = options.has(whitelistOpt) ? 1 : 0; + int blackListCount = options.has(blacklistOpt) ? 1 : 0; + if(whiteListCount + blackListCount != 1) { + System.err.println("Exactly one of whitelist or blacklist is required."); + System.exit(1); + } + + String kafkaJarFile_07 = options.valueOf(kafka07JarOpt); + String zkClientJarFile = options.valueOf(zkClient01JarOpt); + String consumerConfigFile_07 = options.valueOf(consumerConfigOpt); + int numConsumers = options.valueOf(numStreamsOpt); + String producerConfigFile_08 = options.valueOf(producerConfigOpt); + int numProducers = options.valueOf(numProducersOpt); + String kafka08ZKHosts = options.valueOf(kafka08ZKHostsOpt); + final List migrationThreads = new ArrayList(numConsumers); + final List producerThreads = new ArrayList(numProducers); + + try { + File kafkaJar_07 = new File(kafkaJarFile_07); + File zkClientJar = new File(zkClientJarFile); + final ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] { + kafkaJar_07.toURI().toURL(), + zkClientJar.toURI().toURL() + }); + + /** Construct the 07 consumer config **/ + ConsumerConfig_07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME); + KafkaStaticConsumer_07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME); + ConsumerConnector_07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME); + KafkaStream_07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME); + TopicFilter_07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME); + WhiteList_07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME); + BlackList_07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME); + KafkaMessageClass_07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME); + KafkaConsumerIteratorClass_07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME); + KafkaMessageAndMetatDataClass_07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME); + + Constructor ConsumerConfigConstructor_07 = ConsumerConfig_07.getConstructor(Properties.class); + final Properties kafkaConsumerProperties_07 = new Properties(); + kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07)); + /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/ + if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) { + logger.warn("Shallow iterator should not be used in the migration tool"); + kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false"); + } + Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07); + + /** Construct the 07 consumer connector **/ + Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07); + final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07); + Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod( + "createMessageStreamsByFilter", + TopicFilter_07, int.class); + final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown"); + Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class); + Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class); + Object filterSpec = null; + if (options.has(whitelistOpt)) { + String whitelist = MigrationUtils.get().rewriteTopicWhitelist( + kafka08ZKHosts, options.valueOf(whitelistOpt)); + logger.info("Whitelist after rewrite: " + whitelist); + filterSpec = WhiteListConstructor_07.newInstance(whitelist); + } else { + String blacklist = MigrationUtils.get().rewriteTopicBlacklist( + kafka08ZKHosts, options.valueOf(blacklistOpt)); + logger.info("Blacklist after rewrite: " + blacklist); + filterSpec = BlackListConstructor_07.newInstance(blacklist); + } + + Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers); + + Properties kafkaProducerProperties_08 = new Properties(); + kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08)); + kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + // create a producer channel instead + int queueSize = options.valueOf(queueSizeOpt); + ProducerDataChannel> producerDataChannel = new ProducerDataChannel>(context, queueSize); + int threadId = 0; + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + logger.info("Shutting down migration tool..."); + try { + ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07); + } catch(Exception e) { + logger.error("Error while shutting down Kafka consumer", e); + } + for(MigrationThread migrationThread : migrationThreads) { + migrationThread.shutdown(); + } + for(ProducerThread producerThread : producerThreads) { + producerThread.shutdown(); + } + for(ProducerThread producerThread : producerThreads) { + producerThread.awaitShutdown(); + } + if (context.failed()) { + Set topics = context.getTopicsWithCorruptOffset(); + if (!topics.isEmpty()) { + String consumerGroup = kafkaConsumerProperties_07.getProperty( + KAFKA_07_CONSUMER_GROUP_PROPERTY); + logger.info("Fixing corrupt offsets for the following topics: " + + Joiner.on(", ").join(topics)); + + Kafka7OffsetFixer fixer = null; + try { + Kafka7LatestOffsets latestOffsets = new Kafka7LatestOffsetsImpl(c1); + fixer = new Kafka7OffsetFixer(latestOffsets); + for (String topic : topics) { + fixer.fixOffset(consumerGroup, topic); + } + } catch (Throwable t) { + logger.error("Unexpected failure when fixing corrupt offset", t); + } finally { + try { + if (fixer != null) { + fixer.close(); + } + } catch (Throwable t) { + logger.warn("Unexpected failure when closing offset fixer", t); + } + } + } + } + logger.info("Kafka migration tool shutdown successfully"); + } + }); + + // start consumer threads + logger.info("Starting " + numConsumers + " migration threads"); + for(Object stream: (List)retKafkaStreams) { + MigrationThread thread = new MigrationThread( + context, stream, producerDataChannel, threadId); + threadId ++; + thread.start(); + migrationThreads.add(thread); + } + + String clientId = kafkaProducerProperties_08.getProperty("client.id"); + // start producer threads + logger.info("Starting " + numProducers + " producer threads"); + for (int i = 0; i < numProducers; i++) { + kafkaProducerProperties_08.put("client.id", clientId + "-" + i); + ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08); + Producer producer = new Producer(producerConfig_08); + ProducerThread producerThread = new ProducerThread( + context, producerDataChannel, producer, i); + producerThread.start(); + producerThreads.add(producerThread); + } + + // Block while the migration tool is running. We need to call + // System.exit(0) below to force trigger the shutdown hook to be + // called since SimpleConsumer internally runs a user thread. + while (!context.failed()) { + Thread.sleep(100L); + } + } + catch (Throwable e){ + System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e)); + logger.error("Kafka migration tool failed: ", e); + context.setFailed(); + } + + System.exit(0); + } + + private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException { + for(OptionSpec arg : required) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\""); + parser.printHelpOn(System.err); + System.exit(1); + } + } + } + + static class ProducerDataChannel { + private static final long OFFER_INTERVAL_MS = 100L; + private static final long POLL_INTERVAL_MS = 100L; + + private final MigrationContext context; + private final int producerQueueSize; + private final BlockingQueue producerRequestQueue; + + public ProducerDataChannel(MigrationContext context, int queueSize) { + this.context = context; + producerQueueSize = queueSize; + producerRequestQueue = new ArrayBlockingQueue(producerQueueSize); + } + + public void sendRequest(T data) throws InterruptedException { + while (!context.failed()) { + if (producerRequestQueue.offer(data, OFFER_INTERVAL_MS, TimeUnit.MILLISECONDS)) { + return; + } + } + throw new RuntimeException("Migration failed. Failed to offer request"); + } + + public T receiveRequest() throws InterruptedException { + while (!context.failed()) { + T data = producerRequestQueue.poll(POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); + if (data != null) { + return data; + } + } + throw new RuntimeException("Migration failed. Failed to get poll request"); + } + + public int size() { + return producerRequestQueue.size(); + } + } + + private static class MigrationThread extends Thread { + private final Object stream; + private final ProducerDataChannel> producerDataChannel; + private final int threadId; + private final String threadName; + private final org.apache.log4j.Logger logger; + private CountDownLatch shutdownComplete = new CountDownLatch(1); + private final MigrationContext context; + + MigrationThread(MigrationContext context, Object _stream, + ProducerDataChannel> _producerDataChannel, + int _threadId) { + this.context = context; + stream = _stream; + producerDataChannel = _producerDataChannel; + threadId = _threadId; + threadName = "MigrationThread-" + threadId; + logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName()); + this.setName(threadName); + } + + public void run() { + try { + int count = 0; + Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload"); + Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message"); + Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic"); + Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator"); + Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext"); + Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next"); + Object iterator = ConsumerIteratorMethod.invoke(stream); + + while (!context.failed() && ((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) { + Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator); + Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07); + Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07); + Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07); + int size = ((ByteBuffer)payload_07).remaining(); + byte[] bytes = new byte[size]; + ((ByteBuffer)payload_07).get(bytes); + if(count % 100 == 0) { + logger.info("Migration thread " + threadId + " sending message of size " + + bytes.length + " to topic " + topic); + } + count++; + KeyedMessage producerData = new KeyedMessage((String)topic, null, bytes); + producerDataChannel.sendRequest(producerData); + } + logger.info("Migration thread " + threadName + " finished running"); + } catch (InvocationTargetException t){ + logger.fatal("Migration thread failure due to root cause ", t.getCause()); + context.setFailed(); + } catch (Throwable t){ + logger.fatal("Migration thread failure due to ", t); + context.setFailed(); + } finally { + shutdownComplete.countDown(); + } + } + + public void shutdown() { + logger.info("Migration thread " + threadName + " shutting down"); + interrupt(); + try { + shutdownComplete.await(); + } catch(InterruptedException ie) { + logger.warn("Interrupt during shutdown of MigrationThread", ie); + } + logger.info("Migration thread " + threadName + " shutdown complete"); + } + } + + static class ProducerThread extends Thread { + private final ProducerDataChannel> producerDataChannel; + private final Producer producer; + private final int threadId; + private final MigrationContext context; + private String threadName; + private org.apache.log4j.Logger logger; + private CountDownLatch shutdownComplete = new CountDownLatch(1); + private KeyedMessage shutdownMessage = new KeyedMessage("shutdown", null, null); + + public ProducerThread(MigrationContext context, + ProducerDataChannel> _producerDataChannel, + Producer _producer, int _threadId) { + this.context = context; + producerDataChannel = _producerDataChannel; + producer = _producer; + threadId = _threadId; + threadName = "ProducerThread-" + threadId; + logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName()); + this.setName(threadName); + } + + public void run() { + try{ + while(!context.failed()) { + KeyedMessage data = producerDataChannel.receiveRequest(); + if(!data.equals(shutdownMessage)) { + producer.send(data); + if(logger.isDebugEnabled()) { + logger.debug("Sending message " + new String(data.message())); + } + } else { + break; + } + } + logger.info("Producer thread " + threadName + " finished running"); + } catch (Throwable t){ + logger.fatal("Producer thread failure due to ", t); + context.setFailed(); + } finally { + shutdownComplete.countDown(); + } + } + + public void shutdown() { + try { + logger.info("Producer thread " + threadName + " shutting down"); + if (!context.failed()) { + producerDataChannel.sendRequest(shutdownMessage); + } + } catch(InterruptedException ie) { + logger.warn("Interrupt during shutdown of ProducerThread", ie); + } + } + + public void awaitShutdown() { + try { + shutdownComplete.await(); + producer.close(); + logger.info("Producer thread " + threadName + " shutdown complete"); + } catch(InterruptedException ie) { + logger.warn("Interrupt during shutdown of ProducerThread", ie); + } + } + } + + /** + * A parent-last class loader that will try the child class loader first and then the parent. + * This takes a fair bit of doing because java really prefers parent-first. + */ + static class ParentLastURLClassLoader extends ClassLoader { + private ChildURLClassLoader childClassLoader; + + /** + * This class allows me to call findClass on a class loader + */ + private static class FindClassClassLoader extends ClassLoader { + public FindClassClassLoader(ClassLoader parent) { + super(parent); + } + @Override + public Class findClass(String name) throws ClassNotFoundException { + return super.findClass(name); + } + } + + /** + * This class delegates (child then parent) for the findClass method for a URLClassLoader. + * We need this because findClass is protected in URLClassLoader + */ + private static class ChildURLClassLoader extends URLClassLoader { + private FindClassClassLoader realParent; + public ChildURLClassLoader( URL[] urls, FindClassClassLoader realParent) { + super(urls, null); + this.realParent = realParent; + } + + @Override + public Class findClass(String name) throws ClassNotFoundException { + try{ + // first try to use the URLClassLoader findClass + return super.findClass(name); + } + catch( ClassNotFoundException e ) { + // if that fails, we ask our real parent class loader to load the class (we give up) + return realParent.loadClass(name); + } + } + } + + public ParentLastURLClassLoader(URL[] urls) { + super(Thread.currentThread().getContextClassLoader()); + childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent())); + } + + @Override + protected synchronized Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + try { + // first we try to find a class inside the child class loader + return childClassLoader.findClass(name); + } + catch( ClassNotFoundException e ) { + // didn't find it, try the parent + return super.loadClass(name, resolve); + } + } + } +} + diff --git a/migration/src/main/java/com/uber/kafka/tools/MigrationContext.java b/migration/src/main/java/com/uber/kafka/tools/MigrationContext.java new file mode 100644 index 0000000000000..142c0b8fb1e10 --- /dev/null +++ b/migration/src/main/java/com/uber/kafka/tools/MigrationContext.java @@ -0,0 +1,54 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +/** + * Context object for migration job. Used for coordination + * of migrator and producer threads. + * + * This class is thread-safe. + */ +public class MigrationContext { + + private final AtomicBoolean failed; + private final Set topicsWithCorruptOffset; + + public MigrationContext() { + this.failed = new AtomicBoolean(false); + this.topicsWithCorruptOffset = Sets.newHashSet(); + } + + /** + * Returns true if migration job failed; false otherwise. + */ + public boolean failed() { + return failed.get(); + } + + public + void setFailed() { + failed.set(true); + } + + /** + * Returns a set of Kafka 0.7 topics with corrupt offsets. + */ + public Set getTopicsWithCorruptOffset() { + synchronized (topicsWithCorruptOffset) { + return ImmutableSet.copyOf(topicsWithCorruptOffset); + } + } + + public void addTopicWithCorruptOffset(String topic) { + synchronized (topicsWithCorruptOffset) { + topicsWithCorruptOffset.add(topic); + } + } +} diff --git a/migration/src/main/java/com/uber/kafka/tools/MigrationUtils.java b/migration/src/main/java/com/uber/kafka/tools/MigrationUtils.java new file mode 100644 index 0000000000000..014a80d9c34c9 --- /dev/null +++ b/migration/src/main/java/com/uber/kafka/tools/MigrationUtils.java @@ -0,0 +1,85 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; +import org.apache.log4j.Logger; + +import scala.collection.Iterator; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +/** + * Utility methods for Kafka 0.7 to Kafak 0.8 migrator. + */ +public class MigrationUtils { + + private static final Logger LOGGER = Logger.getLogger(MigrationUtils.class); + + private static final int ZK_CONN_TIMEOUT_MS = 5 * 1000; + private static final int ZK_SOCKET_TIMEOUT_MS = 30 * 1000; + + private static final Joiner OR_DELIMITER = Joiner.on('|'); + + private static final MigrationUtils INSTANCE = new MigrationUtils(); + + // For tests. + MigrationUtils() {} + + public static MigrationUtils get() { + return INSTANCE; + } + + public String rewriteTopicWhitelist(String kafka08ZKHosts, String whitelist) { + return getTopicList(kafka08ZKHosts, whitelist, true); + } + + public String rewriteTopicBlacklist(String kafka08ZKHosts, String blacklist) { + return getTopicList(kafka08ZKHosts, blacklist, false); + } + + public ZkClient newZkClient(String zkServers) { + return new ZkClient(zkServers, ZK_CONN_TIMEOUT_MS, ZK_SOCKET_TIMEOUT_MS, + new BytesPushThroughSerializer()); + } + + private String getTopicList(String kafka08ZKHosts, String topicList, boolean isWhitelist) { + Pattern pattern = Pattern.compile(topicList); + List allTopics = getAllTopicsInKafka08(kafka08ZKHosts); + List filteredTopics = Lists.newArrayList(); + for (String topic : allTopics) { + Matcher matcher = pattern.matcher(topic); + if (matcher.find() ^ !isWhitelist) { + filteredTopics.add(topic); + } else { + LOGGER.warn("Attempting to migrate topic that doesn't exist in " + + "kafka8, topic: " + topic); + } + } + return OR_DELIMITER.join(filteredTopics); + } + + public List getAllTopicsInKafka08(String kafka08ZKHosts) { + ZkClient zkClient = newZkClient(kafka08ZKHosts); + try { + Iterator allTopics = ZkUtils.getAllTopics(zkClient).toIterator(); + List res = Lists.newArrayList(); + while (allTopics.hasNext()) { + res.add(allTopics.next()); + } + return res; + } finally { + zkClient.close(); + } + } + +} diff --git a/migration/src/main/java/com/uber/kafka/tools/OffsetIndex.java b/migration/src/main/java/com/uber/kafka/tools/OffsetIndex.java new file mode 100644 index 0000000000000..b5a93401049c0 --- /dev/null +++ b/migration/src/main/java/com/uber/kafka/tools/OffsetIndex.java @@ -0,0 +1,71 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Collections; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.Files; + +/** + * Represents a list of latest Kafka offsets in increasing order. The latest offsets for every + * topic are periodically written to a file on every leaf Kafka host by a cron job. The file is + * encoded as two parallel timestamp and offsets arrays. Timestamps are encoded as 4-bytes + * little endian unsigned ints, and offsets 8-bytes little endian unsigned longs. + * + * See https://code.uberinternal.com/diffusion/SO/browse/master/sortsol/meta_client.py for + * more details. + */ +public class OffsetIndex { + + public static final long LATEST_OFFSET = -2; + + private static final int SIZE_OF_INT = 4; + private static final int SIZE_OF_LONG = 8; + + private final List offsets; + + public OffsetIndex(byte[] encodedOffsets) { + Preconditions.checkArgument(encodedOffsets.length > 0, "Encoded offsets are empty"); + Preconditions.checkArgument(encodedOffsets.length % 3 == 0, "Invalid encoded offsets"); + final int size = encodedOffsets.length / 3 / SIZE_OF_INT; + + offsets = Lists.newArrayListWithCapacity(size); + + // Skip timestamps since we don't use them. + int bufOffset = SIZE_OF_INT * size; + int bufLen = SIZE_OF_LONG * size; + ByteBuffer buffer = ByteBuffer.wrap(encodedOffsets, bufOffset, bufLen) + .order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < size; i++) { + long offset = buffer.getLong(); + offsets.add(offset); + } + Collections.sort(offsets); + } + + public long getNextOffset(long offset) { + int idx = Collections.binarySearch(offsets, offset); + if (idx >= 0) { + return offsets.get(idx + 1); + } + int insertionPoint = (idx + 1) * -1; + return insertionPoint >= offsets.size() ? LATEST_OFFSET : offsets.get(insertionPoint); + } + + public static OffsetIndex load(String path) { + try { + byte[] encodedOffsets = Files.toByteArray(new File(path)); + return new OffsetIndex(encodedOffsets); + } catch (IOException e) { + throw new RuntimeException("Failed to read encoded bytes from " + path, e); + } + } +} diff --git a/migration/src/test/java/com/uber/kafka/tools/Kafka7OffsetFixerTest.java b/migration/src/test/java/com/uber/kafka/tools/Kafka7OffsetFixerTest.java new file mode 100644 index 0000000000000..7ee14bdf3ce83 --- /dev/null +++ b/migration/src/test/java/com/uber/kafka/tools/Kafka7OffsetFixerTest.java @@ -0,0 +1,112 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; +import com.uber.kafka.tools.OffsetIndexTestUtil.Offset; + +/** + * Tests for {@link com.uber.kafka.tools.Kafka7OffsetFixer} + */ +public class Kafka7OffsetFixerTest { + + private static final String TEST_CONSUMER_GROUP = "test_fixer_consumer"; + private static final String TEST_TOPIC = "test_fixer_topic"; + + private Kafka7LatestOffsets latestOffsets; + private ZkClient zkClient; + private File mirrorkMakerDir; + private String mirrorMakerPath; + private String currentOffsetZkPath; + private List offsets; + private byte[] offsetsBytes; + + @Before + public void setUp() throws Exception { + latestOffsets = mock(Kafka7LatestOffsets.class); + zkClient = mock(ZkClient.class); + mirrorkMakerDir = Files.createTempDir(); + mirrorMakerPath = mirrorkMakerDir.getPath(); + currentOffsetZkPath = Kafka7OffsetFixer.getCurrentOffsetZkPath( + TEST_CONSUMER_GROUP, TEST_TOPIC); + + // Set up offset index file. + offsets = ImmutableList.of( + new Offset(3, 30L), + new Offset(4, 40L), + new Offset(5, 50L), + new Offset(1, 10L), + new Offset(2, 20L) + ); + offsetsBytes = OffsetIndexTestUtil.toByteArray(offsets); + File offsetIndexFile = new File(mirrorkMakerDir, TEST_TOPIC + "_0"); + Files.write(offsetsBytes, offsetIndexFile); + } + + @After + public void tearDown() throws IOException { + // Clean up offset index file. + FileUtils.deleteDirectory(mirrorkMakerDir); + } + + @Test + public void testBasic() { + final long CURRENT_OFFSET = 25L; + + // Mock zkClient + when(zkClient.exists(currentOffsetZkPath)).thenReturn(true); + byte[] currentOffsetBytes = Long.toString(CURRENT_OFFSET).getBytes(); + when(zkClient.readData(currentOffsetZkPath)).thenReturn(currentOffsetBytes); + + // Run fixer + Kafka7OffsetFixer fixer = new Kafka7OffsetFixer( + latestOffsets, mirrorMakerPath, zkClient); + fixer.fixOffset(TEST_CONSUMER_GROUP, TEST_TOPIC); + + // Verify that the new offset should be 30 since it's the next + // biggest offset after the current offset 25 in the offset index. + byte[] newOffset = Long.toString(30L).getBytes(); + verify(zkClient).writeData(currentOffsetZkPath, newOffset); + } + + @Test + public void testFixWithLatestOffset() { + final long LATEST_OFFSET = 60L; + final long CURRENT_OFFSET = 55L; + + // Mock zkClient + when(zkClient.exists(currentOffsetZkPath)).thenReturn(true); + byte[] currentOffsetBytes = Long.toString(CURRENT_OFFSET).getBytes(); + when(zkClient.readData(currentOffsetZkPath)).thenReturn(currentOffsetBytes); + + // Mock Kafka7LatestOffsets + when(latestOffsets.get(TEST_TOPIC, 0)).thenReturn(LATEST_OFFSET); + + // Run fixer + Kafka7OffsetFixer fixer = new Kafka7OffsetFixer( + latestOffsets, mirrorMakerPath, zkClient); + fixer.fixOffset(TEST_CONSUMER_GROUP, TEST_TOPIC); + + // Verify that the new offset is latest offset since the current + // offset 55 is bigger than any other offsets in the offset index. + byte[] newOffset = Long.toString(LATEST_OFFSET).getBytes(); + verify(zkClient).writeData(currentOffsetZkPath, newOffset); + } + +} diff --git a/migration/src/test/java/com/uber/kafka/tools/MigrationUtilsTest.java b/migration/src/test/java/com/uber/kafka/tools/MigrationUtilsTest.java new file mode 100644 index 0000000000000..ade3793b97856 --- /dev/null +++ b/migration/src/test/java/com/uber/kafka/tools/MigrationUtilsTest.java @@ -0,0 +1,45 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; + +/** + * Tests for {@link com.uber.kafka.tools.MigrationUtils} + */ +public class MigrationUtilsTest { + + private static final String TEST_ZK_HOSTS = "localhost:2182"; + private static final List KAFAK08_TOPICS = ImmutableList.of("a", "b", "c"); + + private MigrationUtils utils; + + @Before + public void setUp() { + utils = new MigrationUtils() { + @Override + public List getAllTopicsInKafka08(String kafka08ZKHosts) { + return KAFAK08_TOPICS; + } + }; + } + + @Test + public void testRewriteWhitelist() { + assertEquals("a", utils.rewriteTopicWhitelist(TEST_ZK_HOSTS, "a|d")); + } + + @Test + public void testRewriteBlacklist() { + assertEquals("b|c", utils.rewriteTopicBlacklist(TEST_ZK_HOSTS, "a|d")); + } + +} diff --git a/migration/src/test/java/com/uber/kafka/tools/OffsetIndexTest.java b/migration/src/test/java/com/uber/kafka/tools/OffsetIndexTest.java new file mode 100644 index 0000000000000..2c58208bbcbad --- /dev/null +++ b/migration/src/test/java/com/uber/kafka/tools/OffsetIndexTest.java @@ -0,0 +1,63 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.uber.kafka.tools.OffsetIndexTestUtil.Offset; + +/** + * Tests for {@link com.uber.kafka.tools.OffsetIndex}. + **/ +public class OffsetIndexTest { + + private List offsets; + private byte[] offsetsBytes; + + @Before + public void setUp() { + offsets = ImmutableList.of( + new Offset(3, 30L), + new Offset(4, 40L), + new Offset(5, 50L), + new Offset(1, 10L), + new Offset(2, 20L) + ); + offsetsBytes = OffsetIndexTestUtil.toByteArray(offsets); + } + + @Test + public void testSimple() { + OffsetIndex index = new OffsetIndex(offsetsBytes); + assertEquals(10L, index.getNextOffset(5L)); + assertEquals(30L, index.getNextOffset(20L)); + assertEquals(50L, index.getNextOffset(43L)); + assertEquals(OffsetIndex.LATEST_OFFSET, index.getNextOffset(55L)); + } + + @Test + public void testEmptyOffsetIndex() { + try { + OffsetIndex index = new OffsetIndex(new byte[]{}); + fail("Loading empty encoded offsets should fail"); + } catch (IllegalArgumentException e) { + // Expected. + } + } + + @Test + public void testLoadFromFile() { + String path = getClass().getResource("/offset_idx").getPath(); + OffsetIndex index = OffsetIndex.load(path); + assertEquals(9777L, index.getNextOffset(0L)); + } + +} diff --git a/migration/src/test/java/com/uber/kafka/tools/OffsetIndexTestUtil.java b/migration/src/test/java/com/uber/kafka/tools/OffsetIndexTestUtil.java new file mode 100644 index 0000000000000..26e9c250d605a --- /dev/null +++ b/migration/src/test/java/com/uber/kafka/tools/OffsetIndexTestUtil.java @@ -0,0 +1,39 @@ +// Copyright (c) 2015 Uber Technologies, Inc. All rights reserved. +// @author Seung-Yeoul Yang (syyang@uber.com) + +package com.uber.kafka.tools; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; + +public class OffsetIndexTestUtil { + + private static final int SIZE_OF_INT = 4; + private static final int SIZE_OF_LONG = 8; + + public static class Offset { + public final int ts; + public final long offset; + + public Offset(int ts, long offset) { + this.ts = ts; + this.offset = offset; + } + } + + public static byte[] toByteArray(List offsets) { + int bufLen = offsets.size() * (SIZE_OF_INT + SIZE_OF_LONG); + byte[] encodedBytes = new byte[bufLen]; + ByteBuffer buf = ByteBuffer.wrap(encodedBytes).order(ByteOrder.LITTLE_ENDIAN); + for (Offset offset : offsets) { + buf.putInt(offset.ts); + } + for (Offset offset : offsets) { + buf.putLong(offset.offset); + } + return encodedBytes; + } + + private OffsetIndexTestUtil() {} +} diff --git a/migration/src/test/resources/offset_idx b/migration/src/test/resources/offset_idx new file mode 100644 index 0000000000000..3781d05c7f733 Binary files /dev/null and b/migration/src/test/resources/offset_idx differ diff --git a/settings.gradle b/settings.gradle index 6041784d6f84c..3b6dd91b31c47 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,4 @@ // limitations under the License. apply from: file('scala.gradle') -include 'core', 'perf', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients' +include 'core', 'perf', 'migration', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients'