Broker data layouts

We are currently writing code for ingesting data directly using Broker’s API. From the docs, it seems that Broker assumes that publishers and subscribers somehow agree on one layout per topic: "senders and receivers will need to agree on a specific data layout for the values exchanged, so that they interpret them in the same way.” [1]

This raises a couple of questions. Primarily: where can Broker users learn the layouts to interpret received data? There’s essentially no hope in deferring a layout, since broker::data doesn’t include any meta information such as field names. Is meta information stored somewhere? Is there a convention how to place and retrieve such meta information in Broker’s data stores? How does Bro itself make such information available? Is there a document that lists all topics used by Bro with their respective broker::data layout?

Dominik

[1] https://bro-broker.readthedocs.io/en/stable/comm.html#exchanging-bro-events

This raises a couple of questions. Primarily: where can Broker users learn the layouts to interpret received data?

broker/bro.hh is basically all there is right now. e.g. if you
construct a broker::bro::Event from a received broker::data, you get
access to event name + "interpretable" arguments.

There’s essentially no hope in deferring a layout, since broker::data doesn’t include any meta information such as field names. Is meta information stored somewhere?

No, nothing like that is in implicit in message content.

Is there a convention how to place and retrieve such meta information in Broker’s data stores?

No, and any stores created by Bro don't even have such meta info. The
types stored in them are just documented like "the keys are type Foo
and values are type Bar".

How does Bro itself make such information available?

Nothing beyond documentation or the Bro -> Broker type mapping that's
implicit in events themselves (or as given in docs for data stores).

Is there a document that lists all topics used by Bro with their respective broker::data layout?

I don't think there's a plan to keep such an up-to-date document.

A basic usage premise that I'm wondering about is that none of the
current Broker usage in Bro actually seems suitable for generic/public
consumption as it is. It's maybe more implementation details of doing
cluster-enabled network traffic analysis, so also not a primary goal
to make interpretation of those communications easy for
external/direct Broker users. (You can ingest it if you want and do
the work of "manually" interpreting it all, but maybe won't be a
stable/transparent source of data going forward).

However, one can still use Bro + Broker to create their own
events/stores in a way that does contain the meta information required
for easier/programmatic interpretation on the receiving side. e.g. I
think, at the moment, if one is interested in ingesting data produced
by Bro, they are best served by explicitly defining topic names,
event/data types, and explicitly producing those messages at suitable
places within Bro scripts themselves. Then, one can be in control of
defining a common/expected data format and include whatever meta
information is necessary to help receivers interpret the data.

Maybe there's a more standardized approach that could be worked
towards, but likely we just need more experience in understanding and
defining common use-cases for external Bro data consumption.

Or if we were just talking about Broker-only usage independent of Bro,
then I think it's still the same ideas/answers: currently up to user
to decide how to encode broker::data in a way that defines
common/expected layouts + any required meta info.

Does that help at all?

- Jon

Dominik, wasn't the original idea for VAST to provide an event
description language that would create the link between the values
coming over the wire and their interpretation? Such a specification
could be auto-generated from Bro's knowledge about the events it
generates.

Also, this question is about events, not logs, right? Logs have a
different wire format and they actually come with meta data describing
their columns.

Robin

Though the Broker data corresponding to log entry content is also
opaque at the moment (I recall that was maybe for performance or
message volume optimization), but I suppose same reasoning as before
could apply: this info is internal to Bro operation unless one wants
to explicitly re-publish it via their own event for external
consumption.

- Jon

Yeah, but generally this is something I could see opening up. The log
structure is pretty straight-forward and self-describing, it'd be
mostly a matter of clean up and documentation to make that directly
accessible to external consumers I think. Events, on the other hands,
are semantically tied very closely to the scripts generating them, and
also much more diverse so that self-description doesn't really seem
feasible/useful. Republishing a relevant subset certainly sounds
better for that; or, if it's really a bulk feed that's desired, some
out-of-band mechanism to convey the schema information somehow.

Robin

Dominik, wasn't the original idea for VAST to provide an event
description language that would create the link between the values
coming over the wire and their interpretation? Such a specification
could be auto-generated from Bro's knowledge about the events it
generates.

We were actually thinking about auto-generating the schema. But broker::data simply has no meta information that we can use. Even distinguishing records/tuples from actual lists is impossible, because broker::vector is used for both. Of course we can make a couple of assumptions (the top-level vector is a record, for example), but then VAST users only ever can use type queries. In other words, they can only ask for IP addresses for example, but not specifically for originator IPs.

In a sense, broker’s representation is an inverted JSON. In JSON, we have field names but no type information (everything is a string), whereas in broker we have (ambiguous) type information but no field names. :slight_smile:

Though the Broker data corresponding to log entry content is also
opaque at the moment (I recall that was maybe for performance or
message volume optimization),

Yeah, but generally this is something I could see opening up. The log
structure is pretty straight-forward and self-describing, it'd be
mostly a matter of clean up and documentation to make that directly
accessible to external consumers I think. Events, on the other hands,
are semantically tied very closely to the scripts generating them, and
also much more diverse so that self-description doesn't really seem
feasible/useful. Republishing a relevant subset certainly sounds
better for that; or, if it's really a bulk feed that's desired, some
out-of-band mechanism to convey the schema information somehow.

Opening that up would be great.

However, our goal was to have Broker as a source for structured data that we can import in a generic fashion for later analysis. Of course that relies on a standard / convention / best practice for making schema programmatically accessible. Currently, it seems that we need a schema definition provided by the user offline. This will work as long as all published data for a given topic is uniform. Multiplexing multiple event types already makes things complicated, but it seems like this is actually the standard use case. OSQuery, for example, will generate different events that we than either need to separate into different topics or multiplex in a single topic but merge-in some meta information. And once we mix in meta information with actual data, a simple schema definition no longer cuts it. At worst, importing data from Broker requires a separate parser for each import format.

broker/bro.hh is basically all there is right now

I’m a bit hesitant to rely on this header at the moment, because of:

/// A Bro log-write message. Note that at the moment this should be used only
/// by Bro itself as the arguments aren't publicly defined.

Is the API stable enough on your end at this point to make it public? Also, there are LogCreate and LogWrite events. The LogCreate has the `fields_data` (a list of field names?). Does that mean I need to receive the LogCreate even first to understand successive LogWrite events? That would mean I cannot parse logs that had their LogCreate event before I was able to subscribe to the topic.

    Dominik

I’m a bit hesitant to rely on this header at the moment, because of:

/// A Bro log-write message. Note that at the moment this should be used only
/// by Bro itself as the arguments aren't publicly defined.

Is the API stable enough on your end at this point to make it public?

The comment is just pointing out what was said about the log message
formats being opaque at the moment. It's expected only Bro will be
able to make sense of the content.

Also, there are LogCreate and LogWrite events. The LogCreate has the `fields_data` (a list of field names?).

Yeah, there's some field info in there: names, types, optionality.
The type info in particularly doesn't seem good to treat as intended
for public consumption.

Does that mean I need to receive the LogCreate even first to understand successive LogWrite events? That would mean I cannot parse logs that had their LogCreate event before I was able to subscribe to the topic.

Yeah, that's one problem, but a bigger issue is you can't parse
LogWrite because the content is a serial blob whose format is another
thing not intended for public consumption.

- Jon

I guess my earlier comment might have been misleading: there's
certaily work that needs to be done to open this up. Right now, it's
probably not even realistic at all because we still have a work around
in place in there that uses the old (non-Broker) serialization code
for creating that blob. That was to get around a performance issue,
and still needs to be addressed. As part of upgrading that, I think it
can make sense to think about documenting the format we end up
chosing.

Robin

I don't really see a way around that without substantially increasing
volume. We could send LogCreate updates regularly, so that it's easier
to synchronize with an ongoing stream.

Robin

I don't really see a way around that without substantially increasing
volume. We could send LogCreate updates regularly, so that it's easier
to synchronize with an ongoing stream.

It sounds like this is critical also for regular operation: (1) when
an endpoint bootstraps slowly and the LogCreate message has already
been sent, it doesn't know what to do, and (2) when an endpoint
crashes and comes back, it may have lost the state from the initial
LogCreate.

That said, I want to make sure I understood you correctly: is it
currently impossible to parse Bro logs with Broker, because all logs
come in the LogWrite message, wich is a binary blob? It sounds like
that the topic /bro/logs gets the LogCreate and LogWrite messages.

In other words, can Broker currently be used if one writes a Bro
script that publishes plain events (message type 1 in bro.hh)?

    Matthias

It sounds like this is critical also for regular operation:

Agree. Right now a newly connecting peer gets a round of explicit
LogCreates, but that's probably not the best way forward for larger
topologies.

is it currently impossible to parse Bro logs with Broker, because all
logs come in the LogWrite message, wich is a binary blob?

Correct. (This was different at first, but the switch was necessary
for performance. It's waiting for a better solution at this point.)

In other words, can Broker currently be used if one writes a Bro
script that publishes plain events (message type 1 in bro.hh)?

Yes to that. Non-Bros can exchange events (assuming they know the
schema), but not logs.

Robin

Agree. Right now a newly connecting peer gets a round of explicit
LogCreates, but that's probably not the best way forward for larger
topologies.

Okay. In the future, we probably need some form of
"serialization-free" batching mechanism to ship data more efficiently.
There exist technologies like Apache Arrow, flatbuffers, Cap'N'Proto,
MsgPack, etc., all of which require building a set of values once, and
then just copying them around as a binary blob on the wire.
Deserialization is not needed because one would typically only "view"
the data through light-weight accessors.

We're doing something similar in VAST for performance reasons, but Bro
and Broker have the exact same issues in that regard.

> In other words, can Broker currently be used if one writes a Bro
> script that publishes plain events (message type 1 in bro.hh)?

Yes to that. Non-Bros can exchange events (assuming they know the
schema), but not logs.

Got it.

(Unfortunately that will make our BroCon talk pretty boring in terms
of throughput analysis, because we were planning to build an
end-to-end log ingestion system based on Broker. We'll probably switch
gears a bit and focus more on the latency side, where a Bro script
publishes something to an external application and receives feedback
though an auxiliary channel.)

    Matthias

Do you guys have a sense of how load splits up between serialization
and batching/communication? My hope has been that batching itself can
take care of the performance issues, so that we'll be able to send
logs as standard CAF messages, each one representing a batch of N log
lines. The benchmark I had created a little while ago to examine that
wasn't able to get the necessary performance out of Broker/CAF to do
that (hence the fall-back to Bro's old serialization of log messages
for now, sent over CAF). But iirc, the conclusion was that there's
still room for improvement in CAF that should make this feasible
eventually. However, if you guys believe it's really CAF's
serialization that's the bottle-neck, then we'll need to come up with
something else indeed.

Robin

Okay. In the future, we probably need some form of
"serialization-free" batching mechanism to ship data more efficiently.

Do you guys have a sense of how load splits up between serialization
and batching/communication? My hope has been that batching itself can
take care of the performance issues, so that we'll be able to send
logs as standard CAF messages, each one representing a batch of N log
lines. The benchmark I had created a little while ago to examine that
wasn't able to get the necessary performance out of Broker/CAF to do
that (hence the fall-back to Bro's old serialization of log messages
for now, sent over CAF). But iirc, the conclusion was that there's
still room for improvement in CAF that should make this feasible
eventually. However, if you guys believe it's really CAF's
serialization that's the bottle-neck, then we'll need to come up with
something else indeed.

I think there are a couple of orthogonal aspects merged together here. Namely, (1) memory-mapping, (2) batching, and (3) performance of CAF's serialization.

1) Matthias threw in memory-mapping, but I’m not so sure if this is actually feasible for you. The main benefit here is to have a unified representation in memory, on disk, and on the wire. I think you’re still going to keep the ASCII log output format for Bro logs. Also, a memory-mapped format would mean to drop the current broker::data API entirely. My hunch is that you would rather not break the API immediately after releasing it to the public.

2) CAF already does batching. Ideally, Broker should not need to do any additional batching on top of that. In fact, doing the batching in user code greatly diminishes effectiveness of CAF’s own batching, because now CAF can no longer break up chunks on its own to make efficient use of resources.

3) Serialization should really not be a bottleneck. The costly part is shuffling bytes around in buffers and heap allocations when deserializing a broker::data. There’s no way around these two costs. Do you still remember what showed up during your investigation that triggered you to go with the blob? Because what I can see as a *much* bigger issue is *copying* overhead, not serialization. CAF streams assume that individual elements are cheap to copy. So probably a copy-on-write optimization for broker::data would have a much higher impact on performance (it’s also straightforward to implement and CAF has re-usable pieces for that). If serialization still shows up with unreasonable costs in a profiler, however, there are ways to speed things up. The customization point here is a specialized inspect() overload for broker::data that essentially allows you apply all optimization you want (and that might be used in Bro’s framework).

I hope we’re not talking past each other. :slight_smile:

An in-depth performance analysis of Broker’s streaming layer is on my todo list for months at this point. I hope I get something done before the Bro Workshop in Europe. Then we can hopefully discuss this with some reliable data in person.

    Dominik

1) Matthias threw in memory-mapping, but I’m not so sure if this is
actually feasible for you.

Yeah, our normal use case is different, memory-mapping won't help much
with that.

2) CAF already does batching. Ideally, Broker should not need to do
any additional batching on top of that.

Yep, but (3) was the problem with that:

Do you still remember what showed up during your investigation that
triggered you to go with the blob?

Looking back through emails, at some point Jon replaced CAF
serialization with these blobs and got substantially better
performance. He also had a patch that reproduced the effect with the
benchmark tool you wrote. I'm pasting that in below, I'm assuming it
still applies. Looks like the conclusion at that time was that it is
indeed an issue with the serialization and/or copying the data.

An in-depth performance analysis of Broker’s streaming layer is on my
todo list for months at this point. I hope I get something done before
the Bro Workshop in Europe.

That would be great. :slight_smile:

Robin

diff --git a/tests/benchmark/broker-stream-benchmark.cc
b/tests/benchmark/broker-stream-benchmark.cc
index 821ac39..26b0778 100644
--- a/tests/benchmark/broker-stream-benchmark.cc
+++ b/tests/benchmark/broker-stream-benchmark.cc
@@ -1,6 +1,7 @@
 #include <iostream>

 #include <broker/broker.hh>
+#include <broker/bro.hh>

 using std::cout;
 using std::cerr;
@@ -55,8 +56,11 @@ void publish_mode(broker::endpoint& ep, const std::string&
topic_str) {
       // nop
     },
     [=](caf::unit_t&, downstream<std::pair<topic, data>>& out, size_t num) {
-      for (size_t i = 0; i < num; ++i)
-        out.push(std::make_pair(topic_str, "Lorem ipsum dolor sit amet."));
+      for (size_t i = 0; i < num; ++i) {
+        auto ev = broker::bro::Event(std::string("event_1"),
+                                     std::vector<broker::data>{42, "test"});
+        out.push(std::make_pair(topic_str, std::move(ev)));
+      }
       global_count += num;
     },
     [=](const caf::unit_t&) {