Broker::publish API

The other day when merging Johanna's code to clusterize the
configuration framework, I noticed this code in there:

     # [Send id=val to everyone else]

     Broker::publish(change_topic, Config::cluster_set_option, ID, val, location);

     if ( Cluster::local_node_type() != Cluster::MANAGER )
         Broker::relay(change_topic, change_topic, Config::cluster_set_option, ID, val, location);

It took me a bit to understand that ... The goal here is that a change
in a configuration value gets propagated out to all nodes in the
cluster. The Broker::publish() sends it to a node's immediate
neighbors, but not further. That means that for workers it goes (only)
to their manager; for the manager it means, it goes to all workers. If
we're not a manager, we then separately (through Broker::relay()) ask
our neighbors (that's the manager) to forward the change to *their*
neighbors (that's the other workers), without reraising it locally.

I remember we have discussed this API before, but I wanted to bring it
up again as I keep finding it confusing. I believe the code above
could be simplified by using the newer Broker::publish_and_relay(),
which was added to combine the two operations. Still, I'm realizing
now that I don't like thinking about this in terms of separate
publishing and relaying operations.

It all won't become easier once we add multi-hop routing to the mix
(which is in the works). And on top of all that, we also have
Cluster::publish_rr, Cluster::publish_hew, Cluster::relay_rr, and
Cluster::relay_hew -- another set of separate publishing & relay
options.

I'm wondering if we should give it another try to simply this API
while we still can (i.e., before 2.6 goes out). To me, the most
intuitive publish operation is "send to topic T and propagate to
everybody subscribed to that topic". I'd structure the API around
that, making that the main publish function for that simply:

    Broker::publish(topic, args);

That would send to all neighbors, which then process locally and relay
to their neighbors. Right now, that would propagate just across one
hop but once we have multihop that'd start being broadcasted out
broadly.

To support the other use cases, we can then add modifiers & functions
to tweak this default, e.g.:

    - Give publish() another argument "relay: bool &default=T" to prevent
      it from going beyond the immediate receiver. Or maybe instead:
      "relay_hops: int &default=-1" to specify the max number of hops
      to relay across, with -1 meaning no limit. (I recall concerns
      about loops being too easy to create; we could set the default
      here to F/0 to default to no forwarding, although conceptually I
      don't really like that :slight_smile:

    - Give publish() another argument "relay_topic: string &default=""
      to change the topic when relaying on the 1st hop.

    - Give publish() another argument "process_on_relays: bool &default=T"
      to change whether a relaying hop also sees the event locally.

    - Add a second function publish_pool() that has all the same
      options, but receives a pool type instead of a topic (just an
      enum: RR, HRW).

What I'm not quite sure about is if some of these modifiers are better
to leave for the receiver to specify (e.g., whether to raise events
received on a given topic locally, or just forward). I think I can see
that either way.

Robin

I'm wondering if we should give it another try to simply this API
while we still can (i.e., before 2.6 goes out). To me, the most
intuitive publish operation is "send to topic T and propagate to
everybody subscribed to that topic". I'd structure the API around
that, making that the main publish function for that simply:

    Broker::publish(topic, args);

That would send to all neighbors, which then process locally and relay
to their neighbors. Right now, that would propagate just across one
hop but once we have multihop that'd start being broadcasted out
broadly.

Can you remind/clarify what's meant by "multihop" ? I thought:

Broker already has manual multihop if you set up subscriptions on all
relevant nodes on the path yourself. Bro doesn't use it right now.

Broker does not yet have automatic multihop where subscriptions are
globally flooded automatically.

A difference between "manual multihop" and "automatic multihop" would
be that in the later, some relaying nodes may not actually hold a
subscription to the message they are relaying and so, in the case of
Bro events, I think they would not process them locally.

    - Give publish() another argument "relay: bool &default=T" to prevent
      it from going beyond the immediate receiver. Or maybe instead:
      "relay_hops: int &default=-1" to specify the max number of hops
      to relay across, with -1 meaning no limit.

Going with the generalized approach of configurable number of hops per
message via "relay_hops" from the start would be better than finding
out we need it later.

Possibly a downside is now you need to store original hop limit in
addition to current TTL in each message if you want to detect the "is
1st hop" condition for the "relay_topic" option below.

. (I recall concerns
      about loops being too easy to create; we could set the default
      here to F/0 to default to no forwarding, although conceptually I
      don't really like that :slight_smile:

It's maybe both a concern and a reality -- Bro clusters currently
contain cycles (e.g. worker -> manager -> proxy -> worker)

    - Give publish() another argument "relay_topic: string &default=""
      to change the topic when relaying on the 1st hop.

    - Give publish() another argument "process_on_relays: bool &default=T"
      to change whether a relaying hop also sees the event locally.

Those seem fine to me.

    - Add a second function publish_pool() that has all the same
      options, but receives a pool type instead of a topic (just an
      enum: RR, HRW).

What's the goal of the enums instead of just publish_hrw() and publish_rr() ?

Instead of the API being 2 functions, it then seems like 2 enums that
are never used elsewhere + 1 function that now always branches
internally.

- Jon

This would do weird things on workers, since they connect to both the manager and proxies.

Worker 1 would send to it's neighbors [manager, proxy1, proxy2] but then those 3 nodes would
relay to all of the other workers. The TTL would stop the propagation, but you'd still end up sending
3 copies of the same message to each worker.

I do agree that there's room for a lot of simplification, for example a worker broadcasting a message efficiently to all
other workers needs to do something like this from the docs:

        Cluster::relay_rr(Cluster::proxy_pool, "example_key",
                          Cluster::worker_topic, worker_to_workers,
                          Cluster::node + " (via a proxy)");

But a lot of that could have defaults:

Most use cases would want to relay through the default proxy pool
Since round robin is in use, they key shouldn't matter.
The round robin part itself is really an implementation detail for proxy load balancing and maybe not something that
should be exposed in the API. Now that I think of it I'm not sure why one would ever use relay_hrw over relay_rr.

Removing a lot of that gets close to what you are suggesting:

    Cluster::relay(Cluster::worker_topic, worker_to_workers, Cluster::node + " (via a proxy)");

which is I guess just

    Cluster::relay(topic, args)

like you said.

I do agree that there's room for a lot of simplification, for example a worker broadcasting a message efficiently to all
other workers needs to do something like this from the docs:

        Cluster::relay_rr(Cluster::proxy_pool, "example_key",
                          Cluster::worker_topic, worker_to_workers,
                          Cluster::node + " (via a proxy)");

But a lot of that could have defaults:

Most use cases would want to relay through the default proxy pool
Since round robin is in use, they key shouldn't matter.

At the moment, one could write their own wrapper function around that
if they find it too verbose and always want to use certain defaults?

The round robin part itself is really an implementation detail for proxy load balancing and maybe not something that
should be exposed in the API. Now that I think of it I'm not sure why one would ever use relay_hrw over relay_rr.

Theoretically, a more favorable load distribution that's consistent
over time? e.g. if you do RR of the same messaging pattern from
multiple nodes, you could have waves of "randomly" overlapping loads
on the relayer-node since everyone is cycling through all the proxies
at their own rate when choosing the relayer. With HRW, you'd stick
with the same relayer over time and only change on outages, but
everyone should have chosen their relayer in a uniformly distributed
fashion.

- Jon

I do agree that there's room for a lot of simplification, for example a worker broadcasting a message efficiently to all
other workers needs to do something like this from the docs:

       Cluster::relay_rr(Cluster::proxy_pool, "example_key",
                         Cluster::worker_topic, worker_to_workers,
                         Cluster::node + " (via a proxy)");

But a lot of that could have defaults:

Most use cases would want to relay through the default proxy pool
Since round robin is in use, they key shouldn't matter.

At the moment, one could write their own wrapper function around that
if they find it too verbose and always want to use certain defaults?

Yeah.. The wrapper would be trivial.. Should bro include it so that the API scripts use is simpler?

The round robin part itself is really an implementation detail for proxy load balancing and maybe not something that
should be exposed in the API. Now that I think of it I'm not sure why one would ever use relay_hrw over relay_rr.

Theoretically, a more favorable load distribution that's consistent
over time? e.g. if you do RR of the same messaging pattern from
multiple nodes, you could have waves of "randomly" overlapping loads
on the relayer-node since everyone is cycling through all the proxies
at their own rate when choosing the relayer. With HRW, you'd stick
with the same relayer over time and only change on outages, but
everyone should have chosen their relayer in a uniformly distributed
fashion.

- Jon

I'd expect that round robin would give the most uniform load distribution, for N proxies each proxy
would see 1/N relay messages, but I guess in general round robin isn't the best load balance mechanism
since it doesn't take into account the responsiveness of each proxy. With some of the information CAF provides it
may be possible to also support weighted round robin. That way if a proxy node doesn't die outright but starts
having issues for one reason or another, relay_rr could avoid sending it messages.

Broker does not yet have automatic multihop where subscriptions are
globally flooded automatically.

Yep, that's what I meant: dynamic multihop where each node tracks what
its peers are subscribing to, and forwards messages independent of its
own subscriptions.

Possibly a downside is now you need to store original hop limit in
addition to current TTL in each message if you want to detect the "is
1st hop" condition for the "relay_topic" option below.

Yeah, that's right. Actually I think ideally the 1st hop wouldn't have
any special role anyways if we didn't need that "relay_topic".

It's maybe both a concern and a reality -- Bro clusters currently
contain cycles (e.g. worker -> manager -> proxy -> worker)

True, although it's not cycles in the connection topology that matter,
it's cycles in topic subscriptions. I need to think about this a bit
more (and I need to remind myself how our topics currently look like)
but could we set up topics so that even in a cluster, messages don't
go into a cycle?

Is there a summary somewhere of what events & topics the cluster nodes
are currently exchanging?

> - Add a second function publish_pool() that has all the same
> options, but receives a pool type instead of a topic (just an
> enum: RR, HRW).

What's the goal of the enums instead of just publish_hrw() and publish_rr() ?

Similar to what Justin wrote, it would more directly express the
intent, with less emphasis on the mechanism; we could set a
default to whatever we recommend people normally use; and it'd be more
extensible.

At the moment, one could write their own wrapper function around that
if they find it too verbose and always want to use certain defaults?

They could, but my general point is that it'd be nice to have a simple
API that covers the most common uses cases directly and intuitively.
Then let people change defaults if they have to and know what they are
doing.

Robin

Maybe. We can see how it fits in the mix of what Robin suggested:

  # Supports variadic args in place of Broker::Event.
  Broker::publish(topic: string, args: Broker::Event, relay_topic:
string &default="", process_on_relayer: bool &default=F)

  # Supports variadic args in place of Broker::Event.
  Cluster::publish(pool: Cluster::pool, key: any, strategy: enum,
args: Broker::Event, relay_topic: string &default="",
process_on_relayer: bool &default=F)

  # Supports variadic args in place of Broker::Event. Use proxy pool
and RR method w/ arbitrary, internal key by default.
  Cluster::publish_via_proxy(relay_topic: string, args: Broker::Event)

That last one being the wrapper you're asking for. Also, I compressed
the ideas of having a separate "relay: bool" / "relay_hops: int" and
"relay_topic: string" args -- a non-empty relay topic implicitly means
you want to enable relaying on the receiving node. Thinking more
about original idea of giving the number of relay hops: it may be
better to leave that until Broker multihop is more robust and allow
it's automatic forwarding mechanisms to take care of those scenarios
whereas a "relay" is a simple mechanism at the Bro application level
(has it's own unique message format) that serves to aid load-balancing
use-cases (rather than routing use-cases).

- Jon

True, although it's not cycles in the connection topology that matter,
it's cycles in topic subscriptions.

Right, good point.

I need to think about this a bit
more (and I need to remind myself how our topics currently look like)

I think we just have the "broadcast_topic" to which all nodes
subscribe, but not sure if there's more.

but could we set up topics so that even in a cluster, messages don't
go into a cycle?

I don't see why not, but it takes planning and prudence on everyone's
part (including users) to not break that rule.

I'd be more comfortable if one could automate answering the question:
"if I add a subscription to a given node in the network, will I create
a cycle?".

- Jon

I don't see why not, but it takes planning and prudence on everyone's
part (including users) to not break that rule.

Yeah, question is we can pre-configure the cluster so that user's
don't need to worry about it most of the time.

I'd be more comfortable if one could automate answering the question:
"if I add a subscription to a given node in the network, will I create
a cycle?".

Hmm ... What about a test mode where we'd spin up a dummy cluster
(similar to what the bests do), have each node send a message to all
subscribed topics, and watch for TTL violations?

Robin

Seems clunky and could get dicey -- subscriptions that
transient/dynamic may not be well-tested and you'd probably want to
guarantee that sending such a dummy message actually does not result
in any side-effects at the Bro-layer. If nodes start raising random
events at unusual/unintended times I start to doubt the stability of
things.

- Jon

Agreed. :slight_smile: It'd just be a heuristic to catch some obvious errors. I
don't think there's more we can do, we can't really catch loops
statically by looking at the code.

Robin

Can somebody remind me what the use-case is for changing the topic on
relay? Grepping over our standard scripts, I see only one use of
relay(), and that's the one above.

Robin

Another use is hidden within Cluster::relay_rr():

event Intel::new_item(item: Item) &priority=5
    {
    if ( Cluster::proxy_pool$alive_count == 0 )
        Broker::publish(indicator_topic, Intel::insert_indicator, item);
    else
        Cluster::relay_rr(Cluster::proxy_pool, "Intel::new_item_relay_rr",
                          indicator_topic, Intel::insert_indicator, item);
    }

That is, if the manager is currently connected to some proxy, it picks
one to do the work of distributing the event to workers. Manager
sends 1 message instead of N.

I don't know if there's currently other use-cases for Broker::relay
specifically, but Cluster::relay_rr/Cluster::relay_hrw is essentially
an extension of that which just also does the work of choosing the
initial topic based upon a given pool and partition strategy.

Might have been Justin who originally pointed out potential for
avoiding manager overload in this way.

- Jon

Yeah, though at least from an API perspective this is different: The
caller gives relay_rr() only one topic to send to (indicator_topic).
It's then using a different topic internally to get it over to the
proxy first, but that feels more like an implementation detail. So in
that sense I would argue that this is not a use-case for the Broker
API letting users change the topic on relay. (I'm not saying that that
capability can't be useful, I'm just still looking for actual use
cases.)

I have another question about this specific case: we use relay_rr()
only for sending Intel::insert_indicator. Intel::remove_indicator gets
published normally through auto_publish(). Why the difference?

Robin

So I went through the exercise of collecting this information: what
connections do we have between nodes, who's subscribing to what, and
who's publishing what; see the attached PDF. This is based on all the
standard scripts, with some special cases ignored (like the control
framework).

I'm not fully sure yet what to conclude from this, but a few quick
observations:

    - The main topics are bro/cluster/<node-type> and
      bro/cluster/node/<name>. For these we wouldn't have a problem
      with loops if we enabled automatic, topic-driven forwading as
      far as I can see.

    - bro/cluster/broadcast seems to be the main case with a looping
      problem, because everybody subscribes to it. It's hardly used
      though. (bro/config/change is used similarly though).

    - Relaying is hardly used.

    - There are a couple of script-specific topics where I'm wondering
      if these could switch to using bro/cluster/<node-type> instead
      (bro/intel/*, bro/irc/dcc_transfer_update). In other words: when
      clusterizing scripts, prefer not to introduce new topics.

    - There's a lot of checks in publishing code of the type "if I am
      (not) of node type X".

    - Pools are used for two different things: 1. the known-* scripts
      pick a proxy to process and log the information; whereas 2. the
      Intel scripts pick a proxy just as a relay to broadcast stuff
      out, reducing load. That 1st application is a good, but the 2nd
      feels like should be handled differently.

Need to mull over this more, thoughts welcome.

Overall I have to say I found it pretty hard to follow this all
because we don't have much consistency right now in how scripts
structure their communication. That's not surprising, given that we're
just starting to use all this, but it suggests that we have room for
improvement in our abstractions. :slight_smile:

Robin

Broker Communication.pdf (31.9 KB)

To be honest, I have somehow lost track of the discussion. What I can recall, it's about simplifying the API in the light of multi-hop routing, which is not fully functional yet.

Regarding multi-hop routing I am even not sure what the actual goal is that we are currently aiming at. However, from a conceptual perspective I think "routing" either needs routing algorithms or strict conventions of how the network, to route messages through, is structured. So, what would a "deep cluster" look like and what kind of message flows do we expect in there?

Some comments on the observations:

     - The main topics are bro/cluster/<node-type> and
       bro/cluster/node/<name>. For these we wouldn't have a problem
       with loops if we enabled automatic, topic-driven forwading as
       far as I can see.

How does forwarding work if I add another node type? Do we assume a certain cluster structure here? If yes: Is that a valid assumption?

     - bro/cluster/broadcast seems to be the main case with a looping
       problem, because everybody subscribes to it. It's hardly used
       though. (bro/config/change is used similarly though).

The topic-concept is a multicast scheme, isn't it? Having a broadcast functionality on top of that feels odd. However, it's limited to the cluster topic. This leads me to the question which domains do we operate on? If I think of messages, I start to think about a cluster but that might be only one domain of application. I think it would be good to define layers of abstraction more precise here.

     - There are a couple of script-specific topics where I'm wondering
       if these could switch to using bro/cluster/<node-type> instead
       (bro/intel/*, bro/irc/dcc_transfer_update). In other words: when
       clusterizing scripts, prefer not to introduce new topics.

communication patterns. What's the point of having topics if we don't use them?

     - There's a lot of checks in publishing code of the type "if I am
       (not) of node type X".

That's something I would have expected. I don't think this is necessarily an indicator of bad design. Having these kind of checks means that roles are somehow fixed and responsibilities are explicitly codified.

     - Pools are used for two different things: 1. the known-* scripts
       pick a proxy to process and log the information; whereas 2. the
       Intel scripts pick a proxy just as a relay to broadcast stuff
       out, reducing load. That 1st application is a good, but the 2nd
       feels like should be handled differently.

I think we should be careful about introducing too much abstractions. Communication patterns tend to be complex and the more of the complexity is hidden, the easier it will be to generate misunderstandings. For example, in case of the intel framework, proxy nodes might be able to implement some more logic than just relaying at some point. Having the relay abstraction would mean to deal with two different levels of abstractions regarding intel on proxy nodes in this case.

Overall I have to say I found it pretty hard to follow this all
because we don't have much consistency right now in how scripts
structure their communication. That's not surprising, given that we're
just starting to use all this, but it suggests that we have room for
improvement in our abstractions. :slight_smile:

I totally agree here! I think it could help to come up with some more use cases to identify the best abstractions.

Jan

Potentially no reason other than no one reviewed whether it had
potential to be optimized in a similar way. e.g. I first ported
scripts in a direct fashion without trying to change too much
structurally about comm. patterns or doing any optimization except in
cases where a change was specifically talked about. I only recall
Justin had called out Intel::insert_indicator, so it got changed.

- Jon

How much is due to new API usage and how much is due to things mainly
being a direct port of old communication patterns (which I guess are
written by various people over extended lengths of time and so there's
inconsistencies to be expected) ? Or due to being a mishmash of both
old and new?

- Jon

I think these 2 are somewhat related. Since there weren't higher level things like relaying, in order to relay
a message from one worker to all other workers you had to jump through hoops with worker2manger and
manager2worker events and often lots of @if stuff.

There's also a bunch of places that I think were written standalone first and then updated to work on a cluster in
place resulting in some awkwardness.. like notice/main.bro:

function NOTICE(n: Notice::Info)
    {
    if ( Notice::is_being_suppressed(n) )
        return;

    @if ( Cluster::is_enabled() )
        if ( Cluster::local_node_type() == Cluster::MANAGER )
            Notice::internal_NOTICE(n);
        else
            {
            n$peer_name = n$peer_descr = Cluster::node;
            Broker::publish(Cluster::manager_topic, Notice::cluster_notice, n);
            }
    @else
        Notice::internal_NOTICE(n);
    @endif
    }

event Notice::cluster_notice(n: Notice::Info)
    {
    NOTICE(n);
    }

So on a worker, calling NOTICE publishes a cluster_notice event that then re-calls NOTICE on the manager,
which then does the right thing. You end up with a single small function with nested @if/if that works 3 different ways.

But if this was written in a more 'cluster by default' way, it would just look like:

function NOTICE(n: Notice::Info)
    {
    if ( Notice::is_being_suppressed(n) )
        return;

    n$peer_name = n$peer_descr = Cluster::node;
    Broker::publish(Cluster::manager_topic, Notice::cluster_notice, n);
    }

event Notice::cluster_notice(n: Notice::Info)
    {
    if ( Notice::is_being_suppressed(n) )
        return;

    Notice::internal_NOTICE(n);
    }

Which other than the suppression check, has no branching at all.

Broker::publish could possibly be optimized for standalone to raise the event directly if not being ran in a cluster.
The only small downside is on a standalone you'd call is_being_suppressed twice, could always add a @if if you
really wanted, but is_being_suppressed is just a set lookup.

Then this stuff would be a good use for efficient relaying/broadcasting instead of making the manager do all the work:

    Broker::auto_publish(Cluster::worker_topic, Notice::begin_suppression);
    Broker::auto_publish(Cluster::proxy_topic, Notice::begin_suppression);

What I can recall, it's about simplifying the API in the light of
multi-hop routing, which is not fully functional yet.

To level up a bit, what I'm hoping for is that we can find some easy
ways to simplify the API a bit more now, with an eye towards dynamic
multi-hop coming later. I don't know if it'll work out before 2.6
still, but changing the API later is more painful.

We don't need to (or even can) solve multi-hop topologies right now, I
think nobody really has the use cases clear in their heads yet. But if
we could simplify the API a bit more for our current use cases in a
way that may extend to multihop naturally later, that would probably
save us some headaches at that point.

How does forwarding work if I add another node type?

That's actually something I realized yesterday: we don't have direct
worker-to-worker communication right now, correct? A worker cannot
just publish to "bro/cluster/workers".

Do we assume a certain cluster structure here? If yes: Is that a valid
assumption?

I think it's safe to assume we have the cluster structure under our
own control; it's whatever we configure it to be. That's something
that's easier to change later than the API itself. Said differently:
we can always adjust the connections and topics that we set up by
default; it's much harder to change how the publish() function works.

From my understanding this would mean going back to the old
communication patterns. What's the point of having topics if we don't
use them?

Let me try to phrase it differently: If there's already a topic for a
use case, it's better to use it. That's easier and less error-prone.
So if, e.g., I want to send my script's data to all workers,
publishing to bro/cluster/worker will do the job. And that will even
automatically adapt if things get more complex later. For example, I
can see having multiple otherwise independent cluster sharing a
communication channel. In that case, we could internally change the
topic to "bro/cluster/<cluster-id>/workers", and everybody using the
predefined worker topic would still reach "their" workers without any
further changes.

That's something I would have expected. I don't think this is
necessarily an indicator of bad design.

Maybe it's a *necessary* design, but that doesn't make it nice. :wink: It
makes it very hard to follow the logic; when reading through the
scripts I got lost multiple times because some "@if I-am-a-manager"
was somewhere half a page earlier, disabling the code I was currently
looking at for most nodes. We probably can't totally avoid that, but
the less the better.

Robin