Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- mode: ruby -*-
# vi: set ft=ruby :

# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
VAGRANTFILE_API_VERSION = "2"

# TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered.
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
config.vm.box = "precise64"

# The url from where the 'config.vm.box' box will be fetched if it
# doesn't already exist on the user's system.
config.vm.box_url = "http://files.vagrantup.com/precise64.box"

config.vm.define "zookeeper" do |zookeeper|
zookeeper.vm.network :private_network, ip: "192.168.50.5"
zookeeper.vm.provider :virtualbox do |vb|
vb.customize ["modifyvm", :id, "--memory", "512"]
end
zookeeper.vm.provision "shell", path: "vagrant/zk.sh"
end

config.vm.define "brokerOne" do |brokerOne|
brokerOne.vm.network :private_network, ip: "192.168.50.10"
brokerOne.vm.provider :virtualbox do |vb|
vb.customize ["modifyvm", :id, "--memory", "512"]
end
brokerOne.vm.provision "shell", path: "vagrant/broker.sh", :args => "1"
end

config.vm.define "brokerTwo" do |brokerTwo|
brokerTwo.vm.network :private_network, ip: "192.168.50.20"
brokerTwo.vm.provider :virtualbox do |vb|
vb.customize ["modifyvm", :id, "--memory", "512"]
end
brokerTwo.vm.provision "shell", path: "vagrant/broker.sh", :args => "2"
end

config.vm.define "brokerThree" do |brokerThree|
brokerThree.vm.network :private_network, ip: "192.168.50.30"
brokerThree.vm.provider :virtualbox do |vb|
vb.customize ["modifyvm", :id, "--memory", "512"]
end
brokerThree.vm.provision "shell", path: "vagrant/broker.sh", :args => "3"
end

end
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/consumer/TopicCount.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.consumer

import scala.collection._
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging}
import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils}
import kafka.common.KafkaException

private[kafka] trait TopicCount {
Expand Down Expand Up @@ -142,7 +142,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient,
makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
}

def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams)
def dbString = "{ \"%s\" : %d }".format(Utils.quoteJsonLiteral(topicFilter.regex), numStreams)

def pattern: String = {
topicFilter match {
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/kafka/utils/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,28 @@ object Utils extends Logging {
builder.toString
}

def quoteJsonLiteral (s : String) : String =
s.map {
case '"' => "\\\""
case '\\' => "\\\\"
case '/' => "\\/"
case '\b' => "\\b"
case '\f' => "\\f"
case '\n' => "\\n"
case '\r' => "\\r"
case '\t' => "\\t"
/* We'll unicode escape any control characters. These include:
* 0x0 -> 0x1f : ASCII Control (C0 Control Codes)
* 0x7f : ASCII DELETE
* 0x80 -> 0x9f : C1 Control Codes
*
* Per RFC4627, section 2.5, we're not technically required to
* encode the C1 codes, but we do to be safe.
*/
case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int)
case c => c
}.mkString

/**
* Format a Map[String, String] as JSON object.
*/
Expand Down
31 changes: 31 additions & 0 deletions vagrant/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Apache Kafka #

Using Vagrant to get up and running.

1) Install Vagrant [http://www.vagrantup.com/](http://www.vagrantup.com/)
2) Install Virtual Box [https://www.virtualbox.org/](https://www.virtualbox.org/)

In the main kafka folder

1) ./sbt update
2) ./sbt package
3) ./sbt assembly-package-dependency
4) vagrant up

once this is done
* Zookeeper will be running 192.168.50.5
* Broker 1 on 192.168.50.10
* Broker 2 on 192.168.50.20
* Broker 3 on 192.168.50.30

When you are all up and running you will be back at a command brompt.

If you want you can login to the machines using vagrant ssh <machineName> but you don't need to.

You can access the brokers and zookeeper by their IP

e.g.

bin/kafka-console-producer.sh --broker-list 192.168.50.10:9092,192.168.50.20:9092,192.168.50.30:9092 --topic sandbox

bin/kafka-console-consumer.sh --zookeeper 192.168.50.5:2181 --topic sandbox --from-beginning
34 changes: 34 additions & 0 deletions vagrant/broker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#!/bin/bash
apt-get -y update
apt-get install -y software-properties-common python-software-properties
add-apt-repository -y ppa:webupd8team/java
apt-get -y update
/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections
apt-get -y install oracle-java7-installer oracle-java7-set-default

chmod a+rw /opt
cd /opt
ln -s /vagrant kafka
cd kafka
IP=$(ifconfig | grep 'inet addr:'| grep 168 | grep 192|cut -d: -f2 | awk '{ print $1}')
sed 's/broker.id=0/'broker.id=$1'/' /opt/kafka/config/server.properties > /tmp/prop1.tmp
sed 's/#advertised.host.name=<hostname routable by clients>/'advertised.host.name=$IP'/' /tmp/prop1.tmp > /tmp/prop2.tmp
sed 's/#host.name=localhost/'host.name=$IP'/' /tmp/prop2.tmp > /tmp/prop3.tmp
sed 's/zookeeper.connect=localhost:2181/'zookeeper.connect=192.168.50.5:2181'/' /tmp/prop3.tmp > /opt/server.properties

bin/kafka-server-start.sh /opt/server.properties 1>> /tmp/broker.log 2>> /tmp/broker.log &
28 changes: 28 additions & 0 deletions vagrant/zk.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#!/bin/bash
apt-get -y update
apt-get install -y software-properties-common python-software-properties
add-apt-repository -y ppa:webupd8team/java
apt-get -y update
/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections
apt-get -y install oracle-java7-installer oracle-java7-set-default

chmod a+rw /opt
cd /opt
ln -s /vagrant kafka
cd kafka
bin/zookeeper-server-start.sh config/zookeeper.properties 1>> /tmp/zk.log 2>> /tmp/zk.log &