After some sweat, I finally have Bro integrated into the ELK stack (Elasticsearch,
Logstash, Kibana). While there’s a lot of stuff online about doing this, a
bunch of it is incomplete and/or out of date. Here is a summary of my experience
so as to hopefully save others time. I am using the latest releases of
everything (Bro 2.5.0, ELK 5.4.4).
I do plan to go further and use the Kafka plugin to pipe into Logstash,
but haven’t gotten that far yet.
Logstash: a lot of the examples have complex regular expression matches to
convert the default tab-separated bro logs. This is not necessary. All you
need is to emit JSON in Bro:
@load tuning/json-logs
then in the logstash config file set your codec to JSON:
codec => “json”
Timestamps: This is important to get right so that the Kibana time-based functions
work properly. Again, a lot of examples are out of date. What worked for me
was to have Bro use ISO timestamps rather than the default Unix Epoch:
redef LogAscii::json_timestamps = JSON::TS_ISO8601 ;
then in the logstash filter section, add the following:
date {
match => [ “ts”, “ISO8601” ]
}
GeoIP: This was a pain to get right, with many questions on the ELK forums,
First, you need to add a geoip to the filter section in the logstash config
file:
geoip {
source => “id.resp_h”
target => “geoip”
}
If you use the default logstash output to elasticsearch with no changes,
this works (i.e., Kibana recognizes the geoip.location filed as a “geo_point”).
However, all your indexes are called “logstash*”, which is a pain, and causes
problems the moment you add a new bro log type. I wanted to have an index per
Bro log type, so I did the following:
In the input section of logstash, I put:
file {
start_position => “beginning”
type => “bro_conn_logs”
path => “/home/nahum/conn.log”
codec => “json”
}
Then, in the output section of logstash, I put:
if [type] == “bro_conn_logs” {
elasticsearch {
index => “bro-conn”
}
}
This, however, breaks the GeoIP because it relies on the default index template
for logstash, which defines how to map the geoip to a geo_point. The moment you
change the index name, you lose that template mapping. So, I created a new
bro template doing the following:
1: Get the logstash template
curl -XGET localhost:9200/_template/logstash?pretty > bro.template
2: Edit bro.template to change logstash to bro
3: Delete all existing indexes
4: Install the new template into Elasticsearch
curl -XPUT ‘localhost:9200/_template/bro?pretty’
-H ‘Content-Type: application/json’ -d ’ <bro.template> ’
where <bro.template> is the inline template JSON.
5: Reload data into ES indexes via logstash
Note you must have no indexes before loading data, since templates
are auto-generated at index creation time. I.e., if there’s no pre-existing
template for your new index, Elasticsearch will automatically create one, which
doesn’t handle the geoip properly, and thus it’s too late.
So create the template BEFORE creating any new indexes.
Thanks to Aaron Gee-Clough for answering some questions. I’m also attaching my logstash config for reference.
-Erich
(See attached file: bro-to-elastic.conf)
bro-to-elastic.conf (1.17 KB)