Bro Elastic/ELK Experiences

After some sweat, I finally have Bro integrated into the ELK stack (Elasticsearch,
Logstash, Kibana). While there’s a lot of stuff online about doing this, a
bunch of it is incomplete and/or out of date. Here is a summary of my experience
so as to hopefully save others time. I am using the latest releases of
everything (Bro 2.5.0, ELK 5.4.4).

I do plan to go further and use the Kafka plugin to pipe into Logstash,
but haven’t gotten that far yet.

Logstash: a lot of the examples have complex regular expression matches to
convert the default tab-separated bro logs. This is not necessary. All you
need is to emit JSON in Bro:

@load tuning/json-logs

then in the logstash config file set your codec to JSON:

codec => “json”

Timestamps: This is important to get right so that the Kibana time-based functions
work properly. Again, a lot of examples are out of date. What worked for me
was to have Bro use ISO timestamps rather than the default Unix Epoch:

redef LogAscii::json_timestamps = JSON::TS_ISO8601 ;

then in the logstash filter section, add the following:

date {
match => [ “ts”, “ISO8601” ]
}

GeoIP: This was a pain to get right, with many questions on the ELK forums,
First, you need to add a geoip to the filter section in the logstash config
file:

geoip {
source => “id.resp_h”
target => “geoip”
}
If you use the default logstash output to elasticsearch with no changes,
this works (i.e., Kibana recognizes the geoip.location filed as a “geo_point”).
However, all your indexes are called “logstash*”, which is a pain, and causes
problems the moment you add a new bro log type. I wanted to have an index per
Bro log type, so I did the following:

In the input section of logstash, I put:

file {
start_position => “beginning”
type => “bro_conn_logs”
path => “/home/nahum/conn.log”
codec => “json”
}

Then, in the output section of logstash, I put:

if [type] == “bro_conn_logs” {
elasticsearch {
index => “bro-conn”
}
}

This, however, breaks the GeoIP because it relies on the default index template
for logstash, which defines how to map the geoip to a geo_point. The moment you
change the index name, you lose that template mapping. So, I created a new
bro template doing the following:

1: Get the logstash template

curl -XGET localhost:9200/_template/logstash?pretty > bro.template

2: Edit bro.template to change logstash to bro
3: Delete all existing indexes
4: Install the new template into Elasticsearch

curl -XPUT ‘localhost:9200/_template/bro?pretty’
-H ‘Content-Type: application/json’ -d ’ <bro.template> ’

where <bro.template> is the inline template JSON.

5: Reload data into ES indexes via logstash

Note you must have no indexes before loading data, since templates
are auto-generated at index creation time. I.e., if there’s no pre-existing
template for your new index, Elasticsearch will automatically create one, which
doesn’t handle the geoip properly, and thus it’s too late.
So create the template BEFORE creating any new indexes.

Thanks to Aaron Gee-Clough for answering some questions. I’m also attaching my logstash config for reference.

-Erich

(See attached file: bro-to-elastic.conf)

bro-to-elastic.conf (1.17 KB)

We are using NSQ rather than kafka to get stuff into ELK, FYI works great.

+1. Thanks for sharing.

You might want to have NXLog or Filebeat as the shipper instead of Logstash on the Bro instance as they are a lot lighter resource wise. You could even forward from Filebeat to Redis and then have Logstash read from Redis for a quick improvement or go the Kafka way as you mentioned.

When creating your elasticsearch template make sure you designate the version field as a string. Logstash will pick up the http.log and ssl.log json version fields as strings but ssh.log will be recognized as number.

Doesn’t look like you started ingesting ssh.log yet, so just a heads up.

For those of you using ELK for Bro how many are using a message broker like RabbitMQ or Apache Kafka? Why did you choose the message broker you did?

Thanks,

Craig Edgmand

IT Security

Oklahoma State University

Craig,

I’m currently Redis, but I’m migrating to Kafka.

I initially chose Redis because it was super easy to set up. It’s worked well for me so far, but it’s limited by the amount of ram I have in my servers. I currently run 3 servers, but they aren’t actually clustered. Logstash runs on my Bro server to forward the logs to Redis, and will fail over in the event that one of my Redis servers goes down. I had to write some scripts to block incoming writes if the node got too full while still allowing the Logstash agents on the other side to read from the queue. When I’m doing ELK maintenance I have to keep a close eye on my Redis boxes, I’ve only got a few hours worth of headroom on them, and if they fill up they’ll crash and I lose data. Also, Redis operates as a single queue. I can’t have multiple processes consuming the same data for different uses.

Migrating to Kafka will allow me to do real clustering at the queuing layer. Since it writes to disk I have days or weeks worth of storage rather than hours, so my ELK maintenance won’t be so stressful. And since it has the concept of consumer offsets I can have the ELK system consuming the data, as well as my other custom processes consume it for other uses without impacting ELK. The downside is that it’s a bit more complex to get a Kafka cluster up and running than it is to get a single Redis node up. I’m only testing it right now but so far it seems like it’s worth the effort.

If you’re ever in the Tulsa area, hit me up. I’d be happy to show you some details about how we’re running things at TU.

-Landy

You should also consider http://nsq.io/ as a replacement for redis instead of kafka.

Log stash or the bro elasticsearch plugin can write to it. It's a lot simpler to run compared to kafka.

I agree with this. I am running NSQ on the bro box and using log stash to pull from it and push to ES. Works like a charm (10+Gbps of traffic)

The timing of your email was perfect. As it turns out last week I was preparing a presentation on using Bro with ELK. Your email helped me nail down parts of my demo configuration. I still need to integrate your geoip mapping.

If anyone is interested I posted my own setup guide and presentation on GitHub.

https://github.com/ljb2of3/techfest2017

-Landy

I used to run nsq but we moved to Kafka. The biggest reason why is because of our heavy use of Apache Metron, which leverages Kafka and handles insertion into ES after normalization, enrichment, threat triage, etc.

Jon