From 2249b31211dd5856a9ae63fa03623091aeb5a983 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 24 Apr 2017 13:35:32 -0400 Subject: [PATCH 1/2] METRON-883 Capture Bro Plugin Enhancements from bro/bro-plugins --- metron-sensors/bro-plugin-kafka/README.md | 100 ++++++++++++------ .../cmake/FindLibRDKafka.cmake | 30 +++--- .../bro-plugin-kafka/cmake/FindOpenSSL.cmake | 2 + .../scripts/Bro/Kafka/__load__.bro | 2 + .../scripts/Bro/Kafka/logs-to-kafka.bro | 3 +- .../bro-plugin-kafka/src/KafkaWriter.cc | 85 +++++++++------ .../bro-plugin-kafka/src/KafkaWriter.h | 15 +++ 7 files changed, 157 insertions(+), 80 deletions(-) diff --git a/metron-sensors/bro-plugin-kafka/README.md b/metron-sensors/bro-plugin-kafka/README.md index 6d0582e452..0141cf1656 100644 --- a/metron-sensors/bro-plugin-kafka/README.md +++ b/metron-sensors/bro-plugin-kafka/README.md @@ -1,60 +1,92 @@ -Bro Logging Output to Kafka +Logging Bro Output to Kafka =========================== -A Bro log writer that sends logging output to Kafka. This provides a convenient -means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to -process the data generated by Bro. +A Bro log writer that sends logging output to Kafka. This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to process the data generated by Bro. Installation ------------ -Install librdkafka (https://github.com/edenhill/librdkafka), a native client -library for Kafka. This plugin has been tested against the latest release of -librdkafka, which at the time of this writing is v0.9.4. In order to support interacting -with a kerberized kafka, you will need libsasl2 installed +1. Install [librdkafka](https://github.com/edenhill/librdkafka), a native client library for Kafka. This plugin has been tested against the latest release of librdkafka, which at the time of this writing is v0.9.4. -``` -# curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz -# cd librdkafka-0.9.4/ -# ./configure --enable-sasl -# make -# sudo make install -``` + In order to use this plugin within a kerberized Kafka environment, you will also need `libsasl2` installed and will need to pass `--enable-sasl` to the `configure` script. -Then compile this Bro plugin using the following commands. + ``` + curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz + cd librdkafka-0.9.4/ + ./configure --enable-sasl + make + sudo make install + ``` -``` -# ./configure --bro-dist=$BRO_SRC -# make -# sudo make install -``` +1. Build the plugin using the following commands. -Run the following command to ensure that the plugin was installed successfully. + ``` + ./configure --bro-dist=$BRO_SRC + make + sudo make install + ``` -``` -# bro -N Bro::Kafka -Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1) -``` +1. Run the following command to ensure that the plugin was installed successfully. + + ``` + $ bro -N Bro::Kafka + Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1) + ``` Activation ---------- -The easiest way to enable Kafka output is to load the plugin's -`logs-to-kafka.bro` script. If you are using BroControl, the following lines -added to local.bro will activate it. +The following examples highlight different ways that the plugin can be used. Simply add Bro script to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to activate the plugin. + +### Example 1 + +The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table. By defining `topic_name` all records will be sent to the same Kafka topic. ``` @load Bro/Kafka/logs-to-kafka.bro -redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG); +redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); redef Kafka::topic_name = "bro"; redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "localhost:9092" ); ``` -This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on -the localhost to a topic called `bro`. Any configuration value accepted by -librdkafka can be added to the `kafka_conf` configuration table. +### Example 2 + +It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`. Any configuration value accepted by librdkafka can be added to the `$config` configuration table. Note that each log writer accepts a separate configuration. + +``` +@load Bro/Kafka/logs-to-kafka.bro +redef Kafka::topic_name = ""; +redef Kafka::tag_json = T; + +event bro_init() +{ + # handles HTTP + local http_filter: Log::Filter = [ + $name = "kafka-http", + $writer = Log::WRITER_KAFKAWRITER, + $config = table( + ["stream_id"] = "HTTP::LOG", + ["metadata.broker.list"] = "localhost:9092" + ), + $path = "http" + ]; + Log::add_filter(HTTP::LOG, http_filter); + + # handles DNS + local dns_filter: Log::Filter = [ + $name = "kafka-dns", + $writer = Log::WRITER_KAFKAWRITER, + $config = table( + ["stream_id"] = "DNS::LOG", + ["metadata.broker.list"] = "localhost:9092" + ), + $path = "dns" + ]; + Log::add_filter(DNS::LOG, dns_filter); +} +``` Settings -------- @@ -147,7 +179,7 @@ For an environment where the following is true: The kafka topic `bro` has been given permission for the `metron` user to write: ``` -# login using the metron user +# login using the metron user kinit -kt /etc/security/keytabs/metron.headless.keytab metron@EXAMPLE.COM ${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro ``` diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake index c64d8f9040..904bfff32d 100644 --- a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake +++ b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake @@ -16,34 +16,36 @@ # find_path(LibRDKafka_ROOT_DIR - NAMES include/librdkafka/rdkafkacpp.h + NAMES include/librdkafka/rdkafkacpp.h ) find_library(LibRDKafka_LIBRARIES - NAMES rdkafka++ - HINTS ${LibRDKafka_ROOT_DIR}/lib + NAMES rdkafka++ + HINTS ${LibRDKafka_ROOT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) find_library(LibRDKafka_C_LIBRARIES - NAMES rdkafka - HINTS ${LibRDKafka_ROT_DIR}/lib + NAMES rdkafka + HINTS ${LibRDKafka_ROT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) find_path(LibRDKafka_INCLUDE_DIR - NAMES librdkafka/rdkafkacpp.h - HINTS ${LibRDKafka_ROOT_DIR}/include + NAMES librdkafka/rdkafkacpp.h + HINTS ${LibRDKafka_ROOT_DIR}/include ) include(FindPackageHandleStandardArgs) find_package_handle_standard_args(LibRDKafka DEFAULT_MSG - LibRDKafka_LIBRARIES - LibRDKafka_C_LIBRARIES - LibRDKafka_INCLUDE_DIR + LibRDKafka_LIBRARIES + LibRDKafka_C_LIBRARIES + LibRDKafka_INCLUDE_DIR ) mark_as_advanced( - LibRDKafka_ROOT_DIR - LibRDKafka_LIBRARIES - LibRDKafka_C_LIBRARIES - LibRDKafka_INCLUDE_DIR + LibRDKafka_ROOT_DIR + LibRDKafka_LIBRARIES + LibRDKafka_C_LIBRARIES + LibRDKafka_INCLUDE_DIR ) diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake index 5ed955c956..58af5c73ae 100644 --- a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake +++ b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake @@ -47,11 +47,13 @@ find_path(OpenSSL_INCLUDE_DIR find_library(OpenSSL_SSL_LIBRARY NAMES ssl ssleay32 ssleay32MD HINTS ${OpenSSL_ROOT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) find_library(OpenSSL_CRYPTO_LIBRARY NAMES crypto HINTS ${OpenSSL_ROOT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) set(OpenSSL_LIBRARIES ${OpenSSL_SSL_LIBRARY} ${OpenSSL_CRYPTO_LIBRARY} diff --git a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro index 12295a9197..1df1136e90 100644 --- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro +++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro @@ -17,3 +17,5 @@ # This is loaded when a user activates the plugin. Include scripts here that should be # loaded automatically at that point. # + +@load ./init.bro diff --git a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro index 84e390c230..d62e03f592 100644 --- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro +++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro @@ -35,7 +35,8 @@ event bro_init() &priority=-5 { local filter: Log::Filter = [ $name = fmt("kafka-%s", stream_id), - $writer = Log::WRITER_KAFKAWRITER + $writer = Log::WRITER_KAFKAWRITER, + $config = table(["stream_id"] = fmt("%s", stream_id)) ]; Log::add_filter(stream_id, filter); diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc index 79a85ed701..951a60c5f7 100644 --- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc +++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc @@ -22,9 +22,35 @@ using namespace writer; KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL) { - // TODO do we need this?? - topic_name.assign((const char*)BifConst::Kafka::topic_name->Bytes(), - BifConst::Kafka::topic_name->Len()); + // need thread-local copies of all user-defined settings coming from + // bro scripting land. accessing these is not thread-safe and 'DoInit' + // is potentially accessed from multiple threads. + + // tag_json - thread local copy + tag_json = BifConst::Kafka::tag_json; + + // topic name - thread local copy + topic_name.assign( + (const char*)BifConst::Kafka::topic_name->Bytes(), + BifConst::Kafka::topic_name->Len()); + + // kafka_conf - thread local copy + Val* val = BifConst::Kafka::kafka_conf->AsTableVal(); + IterCookie* c = val->AsTable()->InitForIteration(); + HashKey* k; + TableEntryVal* v; + while ((v = val->AsTable()->NextEntry(k, c))) { + + // fetch the key and value + ListVal* index = val->AsTableVal()->RecoverIndex(k); + string key = index->Index(0)->AsString()->CheckString(); + string val = v->Value()->AsString()->CheckString(); + kafka_conf.insert (kafka_conf.begin(), pair (key, val)); + + // cleanup + Unref(index); + delete k; + } } KafkaWriter::~KafkaWriter() @@ -32,6 +58,11 @@ KafkaWriter::~KafkaWriter() bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields) { + // if no global 'topic_name' is defined, use the log stream's 'path' + if(topic_name.empty()) { + topic_name = info.path; + } + // initialize the formatter if(BifConst::Kafka::tag_json) { formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH); @@ -39,8 +70,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH); } - // kafka global configuration - string err; + // is debug enabled string debug; debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len()); bool is_debug(!debug.empty()); @@ -53,41 +83,31 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading else { reporter->Info( "Debug is turned off."); } + + // kafka global configuration + string err; conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // apply the user-defined settings to kafka - Val* val = BifConst::Kafka::kafka_conf->AsTableVal(); - IterCookie* c = val->AsTable()->InitForIteration(); - HashKey* k; - TableEntryVal* v; - while ((v = val->AsTable()->NextEntry(k, c))) { - - // fetch the key and value - ListVal* index = val->AsTableVal()->RecoverIndex(k); - string key = index->Index(0)->AsString()->CheckString(); - string val = v->Value()->AsString()->CheckString(); - - if(is_debug) { - reporter->Info("Setting '%s'='%s'", key.c_str(), val.c_str()); - } - // apply setting to kafka - if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { - reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); - return false; - } - - // cleanup - Unref(index); - delete k; + map::iterator i; + for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) { + string key = i->first; + string val = i->second; + + // apply setting to kafka + if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { + reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); + return false; + } } if(is_debug) { string key("debug"); string val(debug); - if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { + if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); return false; - } + } } // create kafka producer @@ -104,9 +124,11 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading reporter->Error("Failed to create topic handle: %s", err.c_str()); return false; } + if(is_debug) { reporter->Info("Successfully created producer."); } + return true; } @@ -130,8 +152,9 @@ bool KafkaWriter::DoFinish(double network_time) // successful only if all messages delivered if (producer->outq_len() == 0) { - reporter->Error("Unable to deliver %0d message(s)", producer->outq_len()); success = true; + } else { + reporter->Error("Unable to deliver %0d message(s)", producer->outq_len()); } delete topic; diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h index 7e77bc0de1..ad3e03fd26 100644 --- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h +++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h @@ -28,6 +28,17 @@ #include "kafka.bif.h" #include "TaggedJSON.h" +namespace RdKafka { + class Conf; + class Producer; + class Topic; +} + +namespace threading { + namespace formatter { + class Formatter; +}} + namespace logging { namespace writer { /** @@ -54,6 +65,10 @@ class KafkaWriter : public WriterBackend { virtual bool DoHeartbeat(double network_time, double current_time); private: + static const string default_topic_key; + string stream_id; + bool tag_json; + map kafka_conf; string topic_name; threading::formatter::Formatter *formatter; RdKafka::Producer* producer; From 087533cd951ec7b17749e5d4b479dfac9f6ea42e Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 24 Apr 2017 16:55:12 -0400 Subject: [PATCH 2/2] Improved README --- metron-sensors/bro-plugin-kafka/README.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/metron-sensors/bro-plugin-kafka/README.md b/metron-sensors/bro-plugin-kafka/README.md index 0141cf1656..31b1f54424 100644 --- a/metron-sensors/bro-plugin-kafka/README.md +++ b/metron-sensors/bro-plugin-kafka/README.md @@ -40,7 +40,9 @@ The following examples highlight different ways that the plugin can be used. Si ### Example 1 -The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table. By defining `topic_name` all records will be sent to the same Kafka topic. +The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. + * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table. + * By defining `topic_name` all records will be sent to the same Kafka topic. ``` @load Bro/Kafka/logs-to-kafka.bro @@ -53,7 +55,11 @@ redef Kafka::kafka_conf = table( ### Example 2 -It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`. Any configuration value accepted by librdkafka can be added to the `$config` configuration table. Note that each log writer accepts a separate configuration. +It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`. + * The `topic_name` value must be set to an empty string. + * The `$path` value of Bro's Log Writer mechanism is used to define the topic name. + * Any configuration value accepted by librdkafka can be added to the `$config` configuration table. + * Each log writer accepts a separate configuration table. ``` @load Bro/Kafka/logs-to-kafka.bro