diff --git a/Vagrantfile b/Vagrantfile new file mode 100644 index 0000000000000..a53de78c4dfbc --- /dev/null +++ b/Vagrantfile @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# -*- mode: ruby -*- +# vi: set ft=ruby : + +# Vagrantfile API/syntax version. Don't touch unless you know what you're doing! +VAGRANTFILE_API_VERSION = "2" + +# TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered. +Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| + config.vm.box = "precise64" + + # The url from where the 'config.vm.box' box will be fetched if it + # doesn't already exist on the user's system. + config.vm.box_url = "http://files.vagrantup.com/precise64.box" + + config.vm.define "zookeeper" do |zookeeper| + zookeeper.vm.network :private_network, ip: "192.168.50.5" + zookeeper.vm.provider :virtualbox do |vb| + vb.customize ["modifyvm", :id, "--memory", "512"] + end + zookeeper.vm.provision "shell", path: "vagrant/zk.sh" + end + + config.vm.define "brokerOne" do |brokerOne| + brokerOne.vm.network :private_network, ip: "192.168.50.10" + brokerOne.vm.provider :virtualbox do |vb| + vb.customize ["modifyvm", :id, "--memory", "512"] + end + brokerOne.vm.provision "shell", path: "vagrant/broker.sh", :args => "1" + end + + config.vm.define "brokerTwo" do |brokerTwo| + brokerTwo.vm.network :private_network, ip: "192.168.50.20" + brokerTwo.vm.provider :virtualbox do |vb| + vb.customize ["modifyvm", :id, "--memory", "512"] + end + brokerTwo.vm.provision "shell", path: "vagrant/broker.sh", :args => "2" + end + + config.vm.define "brokerThree" do |brokerThree| + brokerThree.vm.network :private_network, ip: "192.168.50.30" + brokerThree.vm.provider :virtualbox do |vb| + vb.customize ["modifyvm", :id, "--memory", "512"] + end + brokerThree.vm.provision "shell", path: "vagrant/broker.sh", :args => "3" + end + +end diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index a3eb53e826211..7698260c5f1ac 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -19,7 +19,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils} import kafka.common.KafkaException private[kafka] trait TopicCount { @@ -142,7 +142,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams) + def dbString = "{ \"%s\" : %d }".format(Utils.quoteJsonLiteral(topicFilter.regex), numStreams) def pattern: String = { topicFilter match { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 60a019c5018b5..5eae7215da58a 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -476,6 +476,28 @@ object Utils extends Logging { builder.toString } + def quoteJsonLiteral (s : String) : String = + s.map { + case '"' => "\\\"" + case '\\' => "\\\\" + case '/' => "\\/" + case '\b' => "\\b" + case '\f' => "\\f" + case '\n' => "\\n" + case '\r' => "\\r" + case '\t' => "\\t" + /* We'll unicode escape any control characters. These include: + * 0x0 -> 0x1f : ASCII Control (C0 Control Codes) + * 0x7f : ASCII DELETE + * 0x80 -> 0x9f : C1 Control Codes + * + * Per RFC4627, section 2.5, we're not technically required to + * encode the C1 codes, but we do to be safe. + */ + case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) + case c => c + }.mkString + /** * Format a Map[String, String] as JSON object. */ diff --git a/vagrant/README.md b/vagrant/README.md new file mode 100644 index 0000000000000..ba07e00e2dfd9 --- /dev/null +++ b/vagrant/README.md @@ -0,0 +1,31 @@ +# Apache Kafka # + +Using Vagrant to get up and running. + +1) Install Vagrant [http://www.vagrantup.com/](http://www.vagrantup.com/) +2) Install Virtual Box [https://www.virtualbox.org/](https://www.virtualbox.org/) + +In the main kafka folder + +1) ./sbt update +2) ./sbt package +3) ./sbt assembly-package-dependency +4) vagrant up + +once this is done +* Zookeeper will be running 192.168.50.5 +* Broker 1 on 192.168.50.10 +* Broker 2 on 192.168.50.20 +* Broker 3 on 192.168.50.30 + +When you are all up and running you will be back at a command brompt. + +If you want you can login to the machines using vagrant ssh but you don't need to. + +You can access the brokers and zookeeper by their IP + +e.g. + +bin/kafka-console-producer.sh --broker-list 192.168.50.10:9092,192.168.50.20:9092,192.168.50.30:9092 --topic sandbox + +bin/kafka-console-consumer.sh --zookeeper 192.168.50.5:2181 --topic sandbox --from-beginning diff --git a/vagrant/broker.sh b/vagrant/broker.sh new file mode 100755 index 0000000000000..9be2a3414e380 --- /dev/null +++ b/vagrant/broker.sh @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash +apt-get -y update +apt-get install -y software-properties-common python-software-properties +add-apt-repository -y ppa:webupd8team/java +apt-get -y update +/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections +apt-get -y install oracle-java7-installer oracle-java7-set-default + +chmod a+rw /opt +cd /opt +ln -s /vagrant kafka +cd kafka +IP=$(ifconfig | grep 'inet addr:'| grep 168 | grep 192|cut -d: -f2 | awk '{ print $1}') +sed 's/broker.id=0/'broker.id=$1'/' /opt/kafka/config/server.properties > /tmp/prop1.tmp +sed 's/#advertised.host.name=/'advertised.host.name=$IP'/' /tmp/prop1.tmp > /tmp/prop2.tmp +sed 's/#host.name=localhost/'host.name=$IP'/' /tmp/prop2.tmp > /tmp/prop3.tmp +sed 's/zookeeper.connect=localhost:2181/'zookeeper.connect=192.168.50.5:2181'/' /tmp/prop3.tmp > /opt/server.properties + +bin/kafka-server-start.sh /opt/server.properties 1>> /tmp/broker.log 2>> /tmp/broker.log & diff --git a/vagrant/zk.sh b/vagrant/zk.sh new file mode 100755 index 0000000000000..3cc8dfedcce8d --- /dev/null +++ b/vagrant/zk.sh @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash +apt-get -y update +apt-get install -y software-properties-common python-software-properties +add-apt-repository -y ppa:webupd8team/java +apt-get -y update +/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections +apt-get -y install oracle-java7-installer oracle-java7-set-default + +chmod a+rw /opt +cd /opt +ln -s /vagrant kafka +cd kafka +bin/zookeeper-server-start.sh config/zookeeper.properties 1>> /tmp/zk.log 2>> /tmp/zk.log & \ No newline at end of file