Scaling out bro cluster communication

I've been thinking about ideas for how to better scale out bro cluster communication and how that would look in scripts.

Scaling out sumstats and known hosts/services/certs detection will require script language or bif changes.

What I want to make possible is client side load balancing and failover for worker -> manager/datanode communication.

I have 2 ideas for how things could work.

## The implicit form, new bifs like:

      send_event(dest: string, event: any);
      send_event_hashed(dest: string, hash_key: any, event: any);

      send_event("datanode", Scan::scan_attempt(scanner, attempt));
      send_event_hashed("datanode", scanner, Scan::scan_attempt(scanner, attempt));

## A super magic awesome implicit form

    global scan_attempt: event(scanner: addr, attempt: Attempt)
        &partition_via=func(scanner: addr, attempt: Attempt) { return scanner; } ;

The implicit form fits better with how bro currently works, but I think the explicit form would ultimately make cluster aware scripts simpler.

The difference hinges on the difference between the implicit and explicit communication.

Currently all bro cluster communication is implicit:

* You send logs to the logger/manager node by calling Log::write
* You send notices to the manager by calling NOTICE
* You can share data between nodes by marking a container as &synchronized.
* You can send data to the manager by redef'ing Cluster::worker2manager_events

The last two are what we need to replace/extend.

As an example, in my scan.bro I want to send scan attempts up to the manager for correlation, so this means:

    # define event
    global scan_attempt: event(scanner: addr, attempt: Attempt);
    
    # route it to the manager
    redef Cluster::worker2manager_events += /Scan::scan_attempt/;
    
    # only handle it on the manager
    @if ( Cluster::local_node_type() == Cluster::MANAGER )
    event Scan::scan_attempt(scanner: addr, attempt: Attempt)
        {
        add_scan_attempt(scanner, attempt);
        }
    @endif

and then later in the worker code, finally

    # raise the event to send it down to the manager.
    event Scan::scan_attempt(scanner, attempt);

If bro communication was more explicit, the script would just be

    # define event and handle on all nodes
    global scan_attempt: event(scanner: addr, attempt: Attempt);
    event Scan::scan_attempt(scanner: addr, attempt: Attempt)
        {
        add_scan_attempt(scanner, attempt);
        }

    # send the event directly to the manager node
    send_event("manager", Scan::scan_attempt(scanner, attempt));

Things like scan detection and known hosts/services tracking are easily partitioned, so if you had two datanodes for analysis:

    if (hash(scanner) % 2 == 0)
      send_event("datanode-0", Scan::scan_attempt(scanner, attempt));
    else
      send_event("datanode-1", Scan::scan_attempt(scanner, attempt));

Which would be wrapped in a function:

      send_event_hashed("datanode", scanner, Scan::scan_attempt(scanner, attempt));

that would handle knowing how many active nodes there are and doing proper consistent hashing/failover, something like this:

    function send_event_hashed(dest: string, hash_key: any, event: any) {
        data_nodes = |Cluster::active_nodes[dest]|; # or whatever
        node = hash(hash_key) % data_nodes;
        node_name = Cluster::active_nodes[node]$name;
        send_event(node_name, event);
    }

What I want to make possible is client side load balancing and failover for worker -> manager/datanode communication.

Yes! Load balancing and failover are great goals for this stuff.

## A super magic awesome implicit form

   global scan_attempt: event(scanner: addr, attempt: Attempt)
       &partition_via=func(scanner: addr, attempt: Attempt) { return scanner; } ;

I'm not sure how much I like this model, but I'd need to think about it a bit more still. I agree that on the surface it feels magic and awesome but I'm worried we could get ourselves into situations that aren't easily resolvable with this model.

The implicit form fits better with how bro currently works, but I think the explicit form would ultimately make cluster aware scripts simpler.

Agree on both points.

   # define event and handle on all nodes
   global scan_attempt: event(scanner: addr, attempt: Attempt);
   event Scan::scan_attempt(scanner: addr, attempt: Attempt)
       {
       add_scan_attempt(scanner, attempt);
       }

   # send the event directly to the manager node
   send_event("manager", Scan::scan_attempt(scanner, attempt));

I do like the look of making this more explicit. The implicit event sharing behavior makes some stuff that feels like it should be easy end up being really difficult. Do you have thoughts on how you'd do things like if you want the manager to send an event to all workers or all data nodes?

Another thing I think we need to address is that this behavior seamlessly falls back if someone isn't running a cluster. Do you expect your idea to do that? I know that in the current programming model, making this cluster aware but still work not on a cluster can be painful to create the right abstraction.

  .Seth

What I want to make possible is client side load balancing and
failover for worker -> manager/datanode communication.

This is an important part of future Bro deployments.

Before delving into script code, I would like to get a better
understanding of the underlying concepts and communication patterns.
Once we have a clear picture what workloads we need to support, we can
make architectural choices. Finally, the API falls out at the end.

Concretely: can you describe (without Bro script code) what "client-side
load-balancing and failover" means? Who is the client and what state
needs to be resilient to failure? I don't think we have a working
definition of "data node" either. My hunch is that they are involved in
MapReduce computation and perhaps represent the reducers, but I'm not
sure.

    Matthias

  # define event and handle on all nodes
  global scan_attempt: event(scanner: addr, attempt: Attempt);
  event Scan::scan_attempt(scanner: addr, attempt: Attempt)
      {
      add_scan_attempt(scanner, attempt);
      }

  # send the event directly to the manager node
  send_event("manager", Scan::scan_attempt(scanner, attempt));

I do like the look of making this more explicit. The implicit event sharing behavior makes some stuff that feels like it should be easy end up being really difficult. Do you have thoughts on how you'd do things like if you want the manager to send an event to all workers or all data nodes?

Hmm, perhaps there would be multiple functions:

* One for sending an event to all nodes of a type
* One for sending an event to a specific node
* One for sending an event to one type of node based on a hash function

Currently bro only does the first one (but by only having one manager or data node means that events sent to data nodes only go to one)

Not being able to send events directly to an individual node also prevents bro scripts from doing RPC type queries. A worker can send the manager a query, but the manager can only raise a reply event that is sent to all workers.

Another thing I think we need to address is that this behavior seamlessly falls back if someone isn't running a cluster. Do you expect your idea to do that? I know that in the current programming model, making this cluster aware but still work not on a cluster can be painful to create the right abstraction.

.Seth

For falling back, if

    send_event("manager", Scan::scan_attempt(scanner, attempt));

was ran on the manager node it could skip broker and just raise the event locally.

Currently bro has cluster specific code in intel,netcontrol,notice,openflow,packet-filter,sumstats.. so the current event system doesn't always just magically work on a cluster.. I don't think explicit send_event functions would change that at all.

Plus, I'm not even sure if special-casing a non-cluster makes sense anymore.

For example, scan detection on a single node doesn't need to do any cluster communication, it can just manage everything locally. But the code that handles scan detection is extremely simple: it consumes scan_attempt events and raises notices. What if a dedicated actor thread was started to handle the scan_attempt event? Then the code could do something like

    send_event("scan_aggregator", Scan::scan_attempt(scanner, attempt));

Which even on a single process instance could distribute the event to a thread dedicated to handling this work.

Yes.. exactly like reducers.

In this case, the clients are the workers and the servers are the manager/logger/datanode

I want to send events containing data up to data nodes so they can be aggregated, but I don't want the data node to be a single point of failure or bottleneck.

scan detection doesn't require coordination. The data just needs to be partitioned by source address.

This also applies for:

* Known hosts (partition on host)
* Known services (partition on host or host+service)
* Known certs (partition on cert hash)
* Intel (partition on seen value)
* Notices (partition on identifier)
* DHCP (partition on mac address)

as far as state, the data nodes COULD replicate their state to the other data nodes, but that's a whole separate issue.

Initially the goal would just to be able to fail over from one data node to the next in the case of an outage. State on that data note would be lost if it wasn't replicated, but new work would be able to be performed instead of the system grinding to a halt.

Hi,

I was able to put together of a prototype of the functionality I had in mind.

I learned a bit more about broker including the Broker::send_event function vs. the auto_event method.

send_event doesn't let you pick the node you want to send data to, but it does let you pick the queue. By creating multiple nodes and subscribing each node to it's own queue I was able to achieve the end result I wanted. It boiled down to this:

function send_event_hashed(key: any, args: Broker::EventArgs)
{
    local destination_count = node_count; #FIXME: how to figure out dynamically
    local dest = 1+ md5_hash_count(key) % destination_count;
    local queue = fmt("bro/data/%s", dest);
    print fmt("Send hash(%s)=%s: %s", key, queue, args);
    Broker::send_event(queue, args);
}

I have the full example here https://github.com/JustinAzoff/broker_distributed_events

It implements a fake known hosts and scan detection policy.

the main things to figure out is:

* How to work out the proper node_count at runtime. I think on a real bro cluster the Cluster namespace has the data I need for this, including which nodes are reachable.

* How to handle one node becoming unreachable or a new node showing up. Ideally bro would use a form of consistent ring hashing.

If this were worked out, and implemented for logging as well, you could run a bro cluster with 2 'manager' nodes and have a fully functioning cluster even if one of them died.

As is, I can probably use this on our test cluster to run 4 data nodes and distribute scan detection to 4 cpu cores.

The example doesn't show it, but for things like the known hosts tracking it would be useful if the data could be replicated to the other data nodes. Because the sender-side hash based distribution also acts to de-duplicate the data, the replication would not be latency sensitive. It would not have the problem that the current known hosts policy has where 2 nodes can detect and log a new host before the data synchronizes. As long as the data replicated before a node outage occurred, you would get consistent logs.

It implements a fake known hosts and scan detection policy.

the main things to figure out is:

* How to work out the proper node_count at runtime. I think on a real bro cluster the Cluster namespace has the data I need for this, including which nodes are reachable.

* How to handle one node becoming unreachable or a new node showing up. Ideally bro would use a form of consistent ring hashing.

If this were worked out, and implemented for logging as well, you could run a bro cluster with 2 'manager' nodes and have a fully functioning cluster even if one of them died.

As is, I can probably use this on our test cluster to run 4 data nodes and distribute scan detection to 4 cpu cores.

I got this to work on a real cluster running the broker integration branch!

Took me a while to figure out that a change somewhere made Broker::publish_topic required to send messages to other nodes, and that to raise notices on a datanode bro needs:

redef datanode2manager_events += {"Notice::cluster_notice"};

The main screwy thing is that I couldn't figure out a consistent way to enumerate the data nodes[1], so I just hardcoded the index in the name:

[datanode-0]
type=datanode
host=bro-test

[datanode-1]
type=datanode
host=bro-test

etc.

Once I had the base functions in place I changed my scan.bro from

    #this is a worker2manager event
    event Scan::scan_attempt(scanner, attempt);

to

    local args = Broker::event_args(scan_attempt, scanner, attempt);
    Cluster::send_event_hashed(scanner, args);

And.. it just worked. I have 8 data nodes running that are each handling 1/8th of the scan detection aggregation.