Queueing in Broker?

One more Broker idea: I'm thinking we should add a queuing mechanism
to Broker that buffers outgoing messages for a while when a peer goes
down. Once it comes back up, we'd pass them on. That way an endpoint
could restart for example without us loosing data.

I'm not immediately sure how/where we'd integrate that. For outgoing
messages, we could add it to the transparent reconnect. However, for
incoming connections, where the local endpoint doesn't have a notion
of "that peer should be coming back", it might not be as straight
forward?

Robin

One more Broker idea: I'm thinking we should add a queuing mechanism
to Broker that buffers outgoing messages for a while when a peer goes
down. Once it comes back up, we'd pass them on. That way an endpoint
could restart for example without us loosing data.

Yes!

I'm not immediately sure how/where we'd integrate that. For outgoing
messages, we could add it to the transparent reconnect. However, for
incoming connections, where the local endpoint doesn't have a notion
of "that peer should be coming back", it might not be as straight
forward?

I can imagine being able to define queue length and queue (byte) size for consumers and producers might be interesting.

As a producer:
  Keep up to 1000 messages and/or 1MByte of data.
As a consumer:
  Only be willing to receive up to the 1000 most recent message or up to 1MByte of data.

I still haven't spent time with the broker API to see if these thoughts actually make sense though. :slight_smile:

   .Seth

One more Broker idea: I'm thinking we should add a queuing mechanism
to Broker that buffers outgoing messages for a while when a peer goes
down. Once it comes back up, we'd pass them on. That way an endpoint
could restart for example without us loosing data.

If the goals is to prevent loss of data, then don't we need more than
just buffering, like message acknowledgements from the peer?

e.g. you can think your peer is up, send a message, then immediately
find out it went offline and so the message got lost "in the middle".

You would also need to keep the message buffered until receiving an
ACK from *all* peers that are subscribed (and the subscription list is
a potentially moving target) ?

And if you still planned on message routing/auto-forwarding being more
widely used, I think you would want to buffer the message while the
longest subscribed *path* has a down node?

I'm not immediately sure how/where we'd integrate that. For outgoing
messages, we could add it to the transparent reconnect. However, for
incoming connections, where the local endpoint doesn't have a notion
of "that peer should be coming back", it might not be as straight
forward?

Yeah, I'm also unclear if there's anyway you can tell if the peer is
supposed to be permanent vs. transient in come cases.

Last observation is that I think any of these types of changes would
be to the internal messaging pattern/protocol and so maybe reasonable
to change/improve in subsequent releases in a way that's transparent
to users.

- Jon

If the goals is to prevent loss of data, then don't we need more than
just buffering, like message acknowledgements from the peer?

Yeah, I wouldn't see it as bullet-proof reliability, rather as a best
effort "let's no needlesly drop stuff on the floor" kind of thing. I'm
thinking less here of the cluster setting (where things can get
complex and we'd usually restart everything anyways), and more of
external agents streaming stuff into Bro, like with the osquery
plugin. If one needs to restart the receiving-side Bro, it would be
nice to not just drop any activity reported in the meantime. With that
perspective, it would really just need just a bit of buffering of
messages that cannot be sent out right now. And if in the end they
still don't make it, that's not the end of the world.

And if you still planned on message routing/auto-forwarding being more
widely used, I think you would want to buffer the message while the
longest subscribed *path* has a down node?

I was thinking to do the buffering at the routing/hop-level. The
messsage would get as far as it can at first. If a peer is down that a
node would have normally forwarded to, it'd buffer for a bit until
that comes back (but I realize this makes it even more fuzzy which
peers to wait for: in a flexible topology peers could come and go all
the time; see below).

That said, I'm now wondering if such buffering functionality should
really be located inside CAF, as that's in charge of low-level message
propagation.

Yeah, I'm also unclear if there's anyway you can tell if the peer is
supposed to be permanent vs. transient in come cases.

We could make that an explicit endpoint option: "for this peer, on
disconnect buffer stuff it would normally receive until it comes back
(subject to some limits)". We may need a better way to identify the
same peer though, just IP probably wouldn't work well. Maybe through
some ID/name sent during the handshake? One would need to configure
such a name for peers when turning on the buffering.

Last observation is that I think any of these types of changes would
be to the internal messaging pattern/protocol and so maybe reasonable
to change/improve in subsequent releases in a way that's transparent
to users.

Yeah, nothing to get in immediately, still needs some thinking. I'm
getting the sense though that we'll need it for some applications,
osquery being the main one on my mind.

Robin

Producer-side it should be easy to enforce limits, but consumer-side
it seems more difficult as it would need either some kind of a
handshake or a notion what data represents a buffered activity. Do you
think consumer-side is important? We already can not prevent a peer
from sending too much data during normal operation either.

Robin

And if you still planned on message routing/auto-forwarding being more
widely used, I think you would want to buffer the message while the
longest subscribed *path* has a down node?

I was thinking to do the buffering at the routing/hop-level. The
messsage would get as far as it can at first. If a peer is down that a
node would have normally forwarded to, it'd buffer for a bit until
that comes back (but I realize this makes it even more fuzzy which
peers to wait for: in a flexible topology peers could come and go all
the time; see below).

That said, I'm now wondering if such buffering functionality should
really be located inside CAF, as that's in charge of low-level message
propagation.

CAF already implements cumulative ACKs. Combine this with send buffers, snapshotting and a cluster manager and you have fault-tolerant pipelines with automatic redeployment/failover - in theory. That’s all up in the air of course, since we don’t have the manpower to fully flesh this out at the moment. However, many prerequisites are already there (such as ACKs on a per-batch level and customization points in stream mangers to deal with errors) that we could leverage for this use case.

I think your use case is simple enough that we can make a few additions to CAF and then implement this in Broker-land. Let me outline a solution here:
- on disconnect, keep the outbound path alive
- add new data to path’s buffer up to maximum (or timeout)
- include some form of unique identifier (host name? configured ID?) in handshakes
- rebind and resume sends on an outbound path if a client reconnects

An outbound path in a CAF stream is essentially a buffer with additional state for batch ID and credit bookkeeping. Does that outlined solution make sense? This would have "at least once" semantics, so the receiving peer can receive messages twice for anything it already processed but didn’t have the chance to ACK. Just pointing it out.

Disclaimer: I’m weeks away from finishing work in my topic/streaming branch. After that point it’s straightforward to give you scaffold for this.

Yeah, I'm also unclear if there's anyway you can tell if the peer is
supposed to be permanent vs. transient in come cases.

We could make that an explicit endpoint option: "for this peer, on
disconnect buffer stuff it would normally receive until it comes back
(subject to some limits)". We may need a better way to identify the
same peer though, just IP probably wouldn't work well. Maybe through
some ID/name sent during the handshake? One would need to configure
such a name for peers when turning on the buffering.

Yes, I think a custom ID via the caf-application.ini is the simplest solution. Using the hostname is an option too, as long as users make sure hostnames in their network are unique.

Last observation is that I think any of these types of changes would
be to the internal messaging pattern/protocol and so maybe reasonable
to change/improve in subsequent releases in a way that's transparent
to users.

Yeah, nothing to get in immediately, still needs some thinking. I'm
getting the sense though that we'll need it for some applications,
osquery being the main one on my mind.

That’s good to know. I will keep this in mind as a topic for later, when my topic branch is merged back to master.

    Dominik

I think your use case is simple enough that we can make a few additions to CAF and then implement this in Broker-land. Let me outline a solution here:

Yeah, that sounds like a good plan to me and should make the remaining
parts on the Broker-side pretty straight forward.

This would have "at least once" semantics, so the receiving peer can
receive messages twice for anything it already processed but didn’t
have the chance to ACK. Just pointing it out.

Hmm ... Need to think about that. More than once could be a problem
for some use cases, we might need to add way to recognizes duplicates.

Robin