Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 72 additions & 34 deletions metron-sensors/bro-plugin-kafka/README.md
Original file line number Diff line number Diff line change
@@ -1,60 +1,98 @@
Bro Logging Output to Kafka
Logging Bro Output to Kafka
===========================

A Bro log writer that sends logging output to Kafka. This provides a convenient
means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to
process the data generated by Bro.
A Bro log writer that sends logging output to Kafka. This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to process the data generated by Bro.

Installation
------------

Install librdkafka (https://github.com/edenhill/librdkafka), a native client
library for Kafka. This plugin has been tested against the latest release of
librdkafka, which at the time of this writing is v0.9.4. In order to support interacting
with a kerberized kafka, you will need libsasl2 installed
1. Install [librdkafka](https://github.com/edenhill/librdkafka), a native client library for Kafka. This plugin has been tested against the latest release of librdkafka, which at the time of this writing is v0.9.4.

```
# curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz
# cd librdkafka-0.9.4/
# ./configure --enable-sasl
# make
# sudo make install
```
In order to use this plugin within a kerberized Kafka environment, you will also need `libsasl2` installed and will need to pass `--enable-sasl` to the `configure` script.

Then compile this Bro plugin using the following commands.
```
curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz
cd librdkafka-0.9.4/
./configure --enable-sasl
make
sudo make install
```

```
# ./configure --bro-dist=$BRO_SRC
# make
# sudo make install
```
1. Build the plugin using the following commands.

Run the following command to ensure that the plugin was installed successfully.
```
./configure --bro-dist=$BRO_SRC
make
sudo make install
```

```
# bro -N Bro::Kafka
Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
```
1. Run the following command to ensure that the plugin was installed successfully.

```
$ bro -N Bro::Kafka
Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
```

Activation
----------

The easiest way to enable Kafka output is to load the plugin's
`logs-to-kafka.bro` script. If you are using BroControl, the following lines
added to local.bro will activate it.
The following examples highlight different ways that the plugin can be used. Simply add Bro script to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to activate the plugin.

### Example 1

The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`.
* Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table.
* By defining `topic_name` all records will be sent to the same Kafka topic.

```
@load Bro/Kafka/logs-to-kafka.bro
redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG);
redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
redef Kafka::topic_name = "bro";
redef Kafka::kafka_conf = table(
["metadata.broker.list"] = "localhost:9092"
);
```

This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on
the localhost to a topic called `bro`. Any configuration value accepted by
librdkafka can be added to the `kafka_conf` configuration table.
### Example 2

It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`.
* The `topic_name` value must be set to an empty string.
* The `$path` value of Bro's Log Writer mechanism is used to define the topic name.
* Any configuration value accepted by librdkafka can be added to the `$config` configuration table.
* Each log writer accepts a separate configuration table.

```
@load Bro/Kafka/logs-to-kafka.bro
redef Kafka::topic_name = "";
redef Kafka::tag_json = T;

event bro_init()
{
# handles HTTP
local http_filter: Log::Filter = [
$name = "kafka-http",
$writer = Log::WRITER_KAFKAWRITER,
$config = table(
["stream_id"] = "HTTP::LOG",
["metadata.broker.list"] = "localhost:9092"
),
$path = "http"
];
Log::add_filter(HTTP::LOG, http_filter);

# handles DNS
local dns_filter: Log::Filter = [
$name = "kafka-dns",
$writer = Log::WRITER_KAFKAWRITER,
$config = table(
["stream_id"] = "DNS::LOG",
["metadata.broker.list"] = "localhost:9092"
),
$path = "dns"
];
Log::add_filter(DNS::LOG, dns_filter);
}
```

Settings
--------
Expand Down Expand Up @@ -147,7 +185,7 @@ For an environment where the following is true:
The kafka topic `bro` has been given permission for the `metron` user to
write:
```
# login using the metron user
# login using the metron user
kinit -kt /etc/security/keytabs/metron.headless.keytab metron@EXAMPLE.COM
${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro
```
Expand Down
30 changes: 16 additions & 14 deletions metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,36 @@
#

find_path(LibRDKafka_ROOT_DIR
NAMES include/librdkafka/rdkafkacpp.h
NAMES include/librdkafka/rdkafkacpp.h
)

find_library(LibRDKafka_LIBRARIES
NAMES rdkafka++
HINTS ${LibRDKafka_ROOT_DIR}/lib
NAMES rdkafka++
HINTS ${LibRDKafka_ROOT_DIR}/lib
PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)

find_library(LibRDKafka_C_LIBRARIES
NAMES rdkafka
HINTS ${LibRDKafka_ROT_DIR}/lib
NAMES rdkafka
HINTS ${LibRDKafka_ROT_DIR}/lib
PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)

find_path(LibRDKafka_INCLUDE_DIR
NAMES librdkafka/rdkafkacpp.h
HINTS ${LibRDKafka_ROOT_DIR}/include
NAMES librdkafka/rdkafkacpp.h
HINTS ${LibRDKafka_ROOT_DIR}/include
)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(LibRDKafka DEFAULT_MSG
LibRDKafka_LIBRARIES
LibRDKafka_C_LIBRARIES
LibRDKafka_INCLUDE_DIR
LibRDKafka_LIBRARIES
LibRDKafka_C_LIBRARIES
LibRDKafka_INCLUDE_DIR
)

mark_as_advanced(
LibRDKafka_ROOT_DIR
LibRDKafka_LIBRARIES
LibRDKafka_C_LIBRARIES
LibRDKafka_INCLUDE_DIR
LibRDKafka_ROOT_DIR
LibRDKafka_LIBRARIES
LibRDKafka_C_LIBRARIES
LibRDKafka_INCLUDE_DIR
)
2 changes: 2 additions & 0 deletions metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ find_path(OpenSSL_INCLUDE_DIR
find_library(OpenSSL_SSL_LIBRARY
NAMES ssl ssleay32 ssleay32MD
HINTS ${OpenSSL_ROOT_DIR}/lib
PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)

find_library(OpenSSL_CRYPTO_LIBRARY
NAMES crypto
HINTS ${OpenSSL_ROOT_DIR}/lib
PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)

set(OpenSSL_LIBRARIES ${OpenSSL_SSL_LIBRARY} ${OpenSSL_CRYPTO_LIBRARY}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
# This is loaded when a user activates the plugin. Include scripts here that should be
# loaded automatically at that point.
#

@load ./init.bro
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ event bro_init() &priority=-5
{
local filter: Log::Filter = [
$name = fmt("kafka-%s", stream_id),
$writer = Log::WRITER_KAFKAWRITER
$writer = Log::WRITER_KAFKAWRITER,
$config = table(["stream_id"] = fmt("%s", stream_id))
];

Log::add_filter(stream_id, filter);
Expand Down
85 changes: 54 additions & 31 deletions metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,55 @@ using namespace writer;

KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL)
{
// TODO do we need this??
topic_name.assign((const char*)BifConst::Kafka::topic_name->Bytes(),
BifConst::Kafka::topic_name->Len());
// need thread-local copies of all user-defined settings coming from
// bro scripting land. accessing these is not thread-safe and 'DoInit'
// is potentially accessed from multiple threads.

// tag_json - thread local copy
tag_json = BifConst::Kafka::tag_json;

// topic name - thread local copy
topic_name.assign(
(const char*)BifConst::Kafka::topic_name->Bytes(),
BifConst::Kafka::topic_name->Len());

// kafka_conf - thread local copy
Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
IterCookie* c = val->AsTable()->InitForIteration();
HashKey* k;
TableEntryVal* v;
while ((v = val->AsTable()->NextEntry(k, c))) {

// fetch the key and value
ListVal* index = val->AsTableVal()->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string val = v->Value()->AsString()->CheckString();
kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val));

// cleanup
Unref(index);
delete k;
}
}

KafkaWriter::~KafkaWriter()
{}

bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
{
// if no global 'topic_name' is defined, use the log stream's 'path'
if(topic_name.empty()) {
topic_name = info.path;
}

// initialize the formatter
if(BifConst::Kafka::tag_json) {
formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH);
} else {
formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH);
}

// kafka global configuration
string err;
// is debug enabled
string debug;
debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len());
bool is_debug(!debug.empty());
Expand All @@ -53,41 +83,31 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
else {
reporter->Info( "Debug is turned off.");
}

// kafka global configuration
string err;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

// apply the user-defined settings to kafka
Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
IterCookie* c = val->AsTable()->InitForIteration();
HashKey* k;
TableEntryVal* v;
while ((v = val->AsTable()->NextEntry(k, c))) {

// fetch the key and value
ListVal* index = val->AsTableVal()->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string val = v->Value()->AsString()->CheckString();

if(is_debug) {
reporter->Info("Setting '%s'='%s'", key.c_str(), val.c_str());
}
// apply setting to kafka
if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
return false;
}

// cleanup
Unref(index);
delete k;
map<string,string>::iterator i;
for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
string key = i->first;
string val = i->second;

// apply setting to kafka
if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
return false;
}
}

if(is_debug) {
string key("debug");
string val(debug);
if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
return false;
}
}
}

// create kafka producer
Expand All @@ -104,9 +124,11 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
reporter->Error("Failed to create topic handle: %s", err.c_str());
return false;
}

if(is_debug) {
reporter->Info("Successfully created producer.");
}

return true;
}

Expand All @@ -130,8 +152,9 @@ bool KafkaWriter::DoFinish(double network_time)

// successful only if all messages delivered
if (producer->outq_len() == 0) {
reporter->Error("Unable to deliver %0d message(s)", producer->outq_len());
success = true;
} else {
reporter->Error("Unable to deliver %0d message(s)", producer->outq_len());
}

delete topic;
Expand Down
15 changes: 15 additions & 0 deletions metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@
#include "kafka.bif.h"
#include "TaggedJSON.h"

namespace RdKafka {
class Conf;
class Producer;
class Topic;
}

namespace threading {
namespace formatter {
class Formatter;
}}

namespace logging { namespace writer {

/**
Expand All @@ -54,6 +65,10 @@ class KafkaWriter : public WriterBackend {
virtual bool DoHeartbeat(double network_time, double current_time);

private:
static const string default_topic_key;
string stream_id;
bool tag_json;
map<string, string> kafka_conf;
string topic_name;
threading::formatter::Formatter *formatter;
RdKafka::Producer* producer;
Expand Down