Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.

Conversation

@cestella
Copy link
Member

We should support streaming enrichment data into kafka and writing it out to HBase in a format suitable to be used in either the Simple HBase Enrichment Adapter or the Simple HBase Threat Intel Adapter.

This should be fully backwards compatible with how we did parser topologies before, so you should be able to spin up a vagrant image and see data go through the indices.

writer.init();

if(isBulk) {
writerTransformer = config -> new ParserWriterConfiguration(config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why a Function is used here. Why not just instantiate a WriterConfiguration object for each case and pass that to messageWriter.init? Is there a benefit to doing it this way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config object can change if zookeeper is updated, so we want an
indirection here.
On Wed, May 25, 2016 at 19:11 merrimanr notifications@github.com wrote:

In
metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
#127 (comment)
:

@@ -60,7 +93,25 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
super.prepare(stormConf, context, collector);
this.collector = collector;
parser.init();

  • writer.init();
  • if(isBulk) {
  •  writerTransformer = config -> new ParserWriterConfiguration(config);
    

Curious why a Function is used here. Why not just instantiate a
WriterConfiguration object for each case and pass that to
messageWriter.init? Is there a benefit to doing it this way?


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
https://github.com/apache/incubator-metron/pull/127/files/e448abd8cccf6f912095474f034668198e58f442#r64668872

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. That is very defensive. I like it :)

@merrimanr
Copy link
Contributor

I like it. +1

@james-sirota
Copy link

Where are the docs for validating this? I would like to stream in an enrichment

@cestella
Copy link
Member Author

cestella commented May 29, 2016

In order to validate this, you can do the following:

  • Configure a new parser, in this example I'll call it a user parser and we'll parse some CSV data to map username to ip by creating a file /usr/metron/0.1BETA/config/zookeeper/parsers/user.json with
{
    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
   ,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
   ,"sensorTopic":"user"
   ,"parserConfig":
   {
     "shew.table" : "enrichment"
    ,"shew.cf" : "t"
    ,"shew.keyColumns" : "user"
    ,"shew.enrichmentType" : "user"
    ,"columns" : {
                "user" : 0
               ,"ip" : 1
                 }
   }
}
  • Add a new user enrichment type to bro data by adding ip_src_addr to hbaseEnrichment and associating user as a field type for ip_src_addr in /usr/metron/0.1BETA/config/zookeeper/enrichment/bro.json like so
{
  "index": "bro",
  "batchSize": 5,
  "enrichment": {
    "fieldMap": {
      "geo": [
        "ip_dst_addr",
        "ip_src_addr"
      ],
      "host": [
        "host"
      ],
      "hbaseEnrichment" : [ "ip_src_addr" ]
    },
   "fieldToTypeMap":
   {
      "ip_src_addr" : [ "user"]
   }
  },
  "threatIntel":{
    "fieldMap":
    {
      "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
    },
    "fieldToTypeMap":
    {
      "ip_dst_addr" : [ "malicious_ip" ]
    ,"ip_src_addr" : [ "malicious_ip" ]
    }
  }
}
  • Create the Kafka Queue as in the tutorials
  • Using /usr/metron/0.1BETA/bin/zk_load_configs.sh push up the config you just created. /usr/metron/0.1BETA/bin/zk_load_configs.sh -m PUSH -z node1:2181 -i /usr/metron/0.1BETA/config/zookeeper
  • Create some reference CSV reference data with that looks like jsirota,192.168.168.1 into a csv file named user.csv
  • Use the kafka console producer to push data into the user topic via cat user.csv | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic user
  • You should be able to check that the data gets into HBase by doing a scan 'enrichment' from the hbase shell
  • You should also be able to check, after new data has been run through, that the data is enriched in elasticsearch. I would suggest bouncing the enrichment topology to ensure that stale data in the caches get flushed, but that is not strictly necessary.

@james-sirota
Copy link

FYI...for some reason the kafka topic does not always get auto created. I can't figure out what options cause it to not auto create. Also, some times kafka throws an error the first time you push into a topic. However, when you run the same command again it works just fine. Maybe something to keep in mind for the future is that we can't rely on the kafka producer to reliably create a topic

@james-sirota
Copy link

On AWS the following did not work:

/usr/metron/0.1BETA/bin/start_parser_topology.sh -s user -k xxx:9092 -z xxx:2181

I got a:

41 [main-EventThread] INFO o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
java.lang.NullPointerException
at org.apache.metron.parsers.topology.ParserTopologyBuilder.build(ParserTopologyBuilder.java:57)
at org.apache.metron.parsers.topology.ParserTopologyCLI.main(ParserTopologyCLI.java:232)

Looking at the file here:
https://github.com/cestella/incubator-metron/blob/METRON-174/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java

looks like it chokes on the sensor topic. However, listing the topics:

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper xxx:2181
bro
enrichments
pcap
snort
user
yaf

looks like i have it correct. My parser topology config looks as follows:

{
"parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
,"sensorTopic":"user"
,"parserConfig":
{
"shew.table" : "enrichment"
,"shew.cf" : "t"
,"shew.keyColumns" : "user"
,"shew.enrichmentType" : "user"
,"columns" : {
"user" : 0
,"ip" : 1
}
}
}

And is located under /usr/metron/0.1BETA/config/zookeeper/parsers/user.json

Any suggestions?

@cestella
Copy link
Member Author

@james-sirota Did you push the new parser config to zookeeper via /usr/metron/0.1BETA/bin/zk_load_configs.sh -m PUSH -z node1:2181 -i /usr/metron/0.1BETA/config/zookeeper before you tried to start the topology? If not, then you must do that..it won't read off of disk, it reads from zookeeper. I'll make that not a NPE so it's more clear, though.

@cestella
Copy link
Member Author

If you did push the config before trying to start the parser, then please confirm that the user topology is in zookeeper via inspecting the output of /usr/metron/0.1BETA/bin/zk_load_configs.sh -m DUMP -z node1:2181

@james-sirota
Copy link

/usr/metron/0.1BETA/bin/zk_load_configs.sh -m DUMP -z 1xxx:2181
log4j:WARN No appenders could be found for logger (org.apache.curator.framework.imps.CuratorFrameworkImpl).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
GLOBAL Config: global
{
"es.clustername": "metron",
"es.ip": "xxx",
"es.port": "9300",
"es.date.format": "yyyy.MM.dd.HH"
}

PARSER Config: websphere
{
"parserClassName":"org.apache.metron.parsers.websphere.GrokWebSphereParser",
"sensorTopic":"websphere",
"parserConfig":
{
"grokPath":"/patterns/websphere",
"patternLabel":"WEBSPHERE",
"timestampField":"timestamp_string",
"dateFormat":"yyyy MMM dd HH:mm:ss"
}
}

PARSER Config: bluecoat
{
"parserClassName":"org.apache.metron.parsers.bluecoat.BasicBluecoatParser",
"sensorTopic":"bluecoat",
"parserConfig": {}
}

PARSER Config: squid
{
"parserClassName": "org.apache.metron.parsers.GrokParser",
"sensorTopic": "squid",
"parserConfig": {
"grokPath": "/patterns/squid",
"patternLabel": "SQUID_DELIMITED",
"timestampField": "timestamp"
}
}

Exception in thread "main" java.lang.RuntimeException: Unable to load {
"parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
,"sensorTopic":"user"
,"parserConfig":
{
"shew.table" : "enrichment"
,"shew.cf" : "t"
,"shew.keyColumns" : "user"
,"shew.enrichmentType" : "user"
,"columns" : {
"user" : 0
,"ip" : 1
}
}
}

at org.apache.metron.common.configuration.ConfigurationType.lambda$static$1(ConfigurationType.java:47)
at org.apache.metron.common.configuration.ConfigurationType$$Lambda$9/1684106402.apply(Unknown Source)
at org.apache.metron.common.configuration.ConfigurationType.deserialize(ConfigurationType.java:78)
at org.apache.metron.common.configuration.ConfigurationsUtils.lambda$dumpConfigs$0(ConfigurationsUtils.java:272)
at org.apache.metron.common.configuration.ConfigurationsUtils$$Lambda$7/785992331.visit(Unknown Source)
at org.apache.metron.common.configuration.ConfigurationsUtils.visitConfigs(ConfigurationsUtils.java:264)
at org.apache.metron.common.configuration.ConfigurationsUtils.visitConfigs(ConfigurationsUtils.java:251)
at org.apache.metron.common.configuration.ConfigurationsUtils.dumpConfigs(ConfigurationsUtils.java:271)
at org.apache.metron.common.cli.ConfigurationManager.dump(ConfigurationManager.java:115)
at org.apache.metron.common.cli.ConfigurationManager.run(ConfigurationManager.java:177)
at org.apache.metron.common.cli.ConfigurationManager.run(ConfigurationManager.java:161)
at org.apache.metron.common.cli.ConfigurationManager.main(ConfigurationManager.java:198)

Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "writerClassName" (class org.apache.metron.common.configuration.SensorParserConfig), not marked as ignorable (3 known properties: , "parserConfig", "parserClassName", "sensorTopic"])
at [Source: java.io.StringReader@23bb8443; line: 3, column: 26](through reference chain: org.apache.metron.common.configuration.SensorParserConfig["writerClassName"])
at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:79)
at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:555)
at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:708)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1160)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:315)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:121)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2888)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2034)
at org.apache.metron.common.utils.JSONUtils.load(JSONUtils.java:71)
at org.apache.metron.common.configuration.ConfigurationType.lambda$static$1(ConfigurationType.java:45)
... 11 more

So looks like it can't load it.

Looking in the jar it seems to exist

jar -tf metron-parsers-0.1BETA.jar | grep CSVParser
org/apache/metron/parsers/csv/CSVParser.class
com/opencsv/CSVParser$1.class
com/opencsv/CSVParser.class
com/opencsv/CSVParserBuilder.class

jar -tf metron-parsers-0.1BETA.jar | grep SimpleHbaseEnrichmentWriter
org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter$Configurations.class
org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter$KeyTransformer.class
org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.class

@cestella
Copy link
Member Author

Looks like it can't find the writerClassname field. Are you sure you ran a
build from this branch before the deploy?
On Mon, May 30, 2016 at 13:48 James Sirota notifications@github.com wrote:

/usr/metron/0.1BETA/bin/zk_load_configs.sh -m DUMP -z 1xxx:2181
log4j:WARN No appenders could be found for logger
(org.apache.curator.framework.imps.CuratorFrameworkImpl).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
GLOBAL Config: global
{
"es.clustername": "metron",
"es.ip": "xxx",
"es.port": "9300",
"es.date.format": "yyyy.MM.dd.HH"
}

PARSER Config: websphere
{

"parserClassName":"org.apache.metron.parsers.websphere.GrokWebSphereParser",
"sensorTopic":"websphere",
"parserConfig":
{
"grokPath":"/patterns/websphere",
"patternLabel":"WEBSPHERE",
"timestampField":"timestamp_string",
"dateFormat":"yyyy MMM dd HH:mm:ss"
}
}

PARSER Config: bluecoat
{
"parserClassName":"org.apache.metron.parsers.bluecoat.BasicBluecoatParser",
"sensorTopic":"bluecoat",
"parserConfig": {}
}

PARSER Config: squid
{
"parserClassName": "org.apache.metron.parsers.GrokParser",
"sensorTopic": "squid",
"parserConfig": {
"grokPath": "/patterns/squid",
"patternLabel": "SQUID_DELIMITED",
"timestampField": "timestamp"
}
}

Exception in thread "main" java.lang.RuntimeException: Unable to load {

"parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
,"writerClassName" :
"org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
,"sensorTopic":"user"
,"parserConfig":
{
"shew.table" : "enrichment"
,"shew.cf" : "t"
,"shew.keyColumns" : "user"
,"shew.enrichmentType" : "user"
,"columns" : {
"user" : 0
,"ip" : 1
}
}
}

at org.apache.metron.common.configuration.ConfigurationType.lambda$static$1(ConfigurationType.java:47)
at org.apache.metron.common.configuration.ConfigurationType$$Lambda$9/1684106402.apply(Unknown Source)
at org.apache.metron.common.configuration.ConfigurationType.deserialize(ConfigurationType.java:78)
at org.apache.metron.common.configuration.ConfigurationsUtils.lambda$dumpConfigs$0(ConfigurationsUtils.java:272)
at org.apache.metron.common.configuration.ConfigurationsUtils$$Lambda$7/785992331.visit(Unknown Source)
at org.apache.metron.common.configuration.ConfigurationsUtils.visitConfigs(ConfigurationsUtils.java:264)
at org.apache.metron.common.configuration.ConfigurationsUtils.visitConfigs(ConfigurationsUtils.java:251)
at org.apache.metron.common.configuration.ConfigurationsUtils.dumpConfigs(ConfigurationsUtils.java:271)
at org.apache.metron.common.cli.ConfigurationManager.dump(ConfigurationManager.java:115)
at org.apache.metron.common.cli.ConfigurationManager.run(ConfigurationManager.java:177)
at org.apache.metron.common.cli.ConfigurationManager.run(ConfigurationManager.java:161)
at org.apache.metron.common.cli.ConfigurationManager.main(ConfigurationManager.java:198)

Caused by:
com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "writerClassName" (class
org.apache.metron.common.configuration.SensorParserConfig), not marked as
ignorable (3 known properties: , "parserConfig", "parserClassName",
"sensorTopic"])
at Source: java.io.StringReader@23bb844
https://github.com/java.io.StringReader/incubator-metron/commit/23bb8443;
line: 3, column: 26
at
com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:79)
at
com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:555)
at
com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:708)
at
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1160)
at
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:315)
at
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:121)
at
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2888)
at
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2034)
at org.apache.metron.common.utils.JSONUtils.load(JSONUtils.java:71)
at
org.apache.metron.common.configuration.ConfigurationType.lambda$static$1(ConfigurationType.java:45)
... 11 more

So looks like it can't load it.

Looking in the jar it seems to exist

jar -tf metron-parsers-0.1BETA.jar | grep CSVParser
org/apache/metron/parsers/csv/CSVParser.class
com/opencsv/CSVParser$1.class
com/opencsv/CSVParser.class
com/opencsv/CSVParserBuilder.class

jar -tf metron-parsers-0.1BETA.jar | grep SimpleHbaseEnrichmentWriter

org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter$Configurations.class

org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter$KeyTransformer.class
org/apache/metron/writer/hbase/SimpleHbaseEnrichmentWriter.class


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#127 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAg-x5RCxf5TQYeQSCzhxhv2LH44lMbCks5qGyLXgaJpZM4IjTlz
.

@james-sirota
Copy link

I built the jar from the branch and copied it out to an existing AWS cluster that I had.

@james-sirota
Copy link

I was able to get past the previous error by uploading a new common jar. Now when the topology comes up it processes the CSV no problem. But, I only have the spout and the parser bolt come up. The writer does not come up. Also, I can't get the parser to ack anything. My file looks like this:

{
"parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
,"sensorTopic":"user"
,"parserConfig":
{
"shew.table" : "enrichment"
,"shew.cf" : "t"
,"shew.keyColumns" : "user"
,"shew.enrichmentType" : "user"
,"columns" : {
"user" : 0
,"ip" : 1
}
}
}

@james-sirota
Copy link

Tracing through the log i got this. Let me bounce hbase and see if that helps
2016-05-30 23:09:55.612 o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/meta-region-server
2016-05-30 23:09:55.612 o.a.h.h.z.RecoverableZooKeeper [ERROR] ZooKeeper getData failed after 4 attempts
2016-05-30 23:09:55.612 o.a.h.h.z.ZKUtil [WARN] hconnection-0x6013736e0x0, quorum=localhost:2181, baseZNode=/hbase Unable to get data of znode /hbase/meta-region-server
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/meta-region-server
at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) ~[zookeeper-3.4.6.2.3.4.7-4.jar:3.4.6-4--1]
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) ~[zookeeper-3.4.6.2.3.4.7-4.jar:3.4.6-4--1]
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155) ~[zookeeper-3.4.6.2.3.4.7-4.jar:3.4.6-4--1]
at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.getData(RecoverableZooKeeper.java:360) ~[stormjar.jar:?]
at org.apache.hadoop.hbase.zookeeper.ZKUtil.getData(ZKUtil.java:745) [stormjar.jar:?]
at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.getMetaRegionState(MetaTableLocator.java:482) [stormjar.jar:?]
at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.getMetaRegionLocation(MetaTableLocator.java:168) [stormjar.jar:?]
at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.blockUntilAvailable(MetaTableLocator.java:600) [stormjar.jar:?]
at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.blockUntilAvailable(MetaTableLocator.java:580) [stormjar.jar:?]
at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.blockUntilAvailable(MetaTableLocator.java:559) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation(ZooKeeperRegistry.java:61) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateMeta(ConnectionManager.java:1185) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1152) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.relocateRegion(ConnectionManager.java:1126) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1331) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1155) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:370) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.AsyncProcess.submit(AsyncProcess.java:321) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:206) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:183) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1439) [stormjar.jar:?]
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1042) [stormjar.jar:?]
at org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter.write(SimpleHbaseEnrichmentWriter.java:271) [stormjar.jar:?]
at org.apache.metron.common.writer.BulkWriterComponent.write(BulkWriterComponent.java:96) [stormjar.jar:?]
at org.apache.metron.parsers.bolt.ParserBolt.execute(ParserBolt.java:154) [stormjar.jar:?]
at backtype.storm.daemon.executor$fn__5495$tuple_action_fn__5497.invoke(executor.clj:670) [storm-core-0.10.0.2.3.4.7-4.jar:0.10.0.2.3.4.7-4]
at backtype.storm.daemon.executor$mk_task_receiver$fn__5418.invoke(executor.clj:426) [storm-core-0.10.0.2.3.4.7-4.jar:0.10.0.2.3.4.7-4]
at backtype.storm.disruptor$clojure_handler$reify__4994.onEvent(disruptor.clj:58) [storm-core-0.10.0.2.3.4.7-4.jar:0.10.0.2.3.4.7-4]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) [storm-core-0.10.0.2.3.4.7-4.jar:0.10.0.2.3.4.7-4]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) [storm-core-0.10.0.2.3.4.7-4.jar:0.10.0.2.3.4.7-4]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) [storm-core-0.10.0.2.3.4.7-4.jar:0.10.0.2.3.4.7-4]
at backtype.storm.daemon.executor$fn__5495$fn__5508$fn__5559.invoke(executor.clj:808) [storm-core-0.10.0.2.3.4.7-4.jar:0.10.0.2.3.4.7-4]

@james-sirota
Copy link

Another interesting thing that I think is a problem is that I sent it exactly 30 tuples. The spout acked 60 tuples (somehow doubled the count) and when it failed the number of failed acks was 100. Not sure where the other 40 tuples came from, but i think it may be replaying them. We need to check all our topologies for this behavior. We need to make sure that failed tuples are not being replayed

@james-sirota
Copy link

Now getting the following error on the bro topology trying to enrich:

2016-05-31 04:24:05.212 o.a.k.c.n.Selector [WARN] Error in I/O with ip xxxx
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:1.8.0_40]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[?:1.8.0_40]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) [stormjar.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [stormjar.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [stormjar.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [stormjar.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]

I am giving it the same kafka broker that i gave to the user topology, which worked

@cestella
Copy link
Member Author

You sure Kafka is still up?

@cestella
Copy link
Member Author

Try pulling data from that broker using the console consumer

@james-sirota
Copy link

    1. Had a kafka problem. Works great

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants