[Bro-Commits] [git/bro] topic/actor-system: First-pass broker-enabled Cluster scripting API + misc. (07ad06b)

This is coming together quite nicely. Not sure if it's stable yet, but
I'll just go ahead with some feedback I noticed looking over the new
cluster API:

    - One thing I can't quite tell is if this is still aiming to
      maintain compatibility with the old communication system, like
      by keeping the proxies and also the *_events patterns. Looking
      at setup-connections, it seems so. I'd say just go ahead and
      remove all legacy pieces. Maintain two schemes in parallel is
      cumbersome, and I think it's fine to just force everything over
      to Broker.

    - Is the idea for the "*_topic" constants that one just picks the
      apppropiate one when sending events? Like if I want to publish
      something to all workers, I'd publish to Cluster::worker_topic?
      I think that's fine, though I'm wondering if we could compress
      the API there somehow so that Cluster doesn't need to export all
      those constants indvidiually. One idea would be a function that
      returns a topic based on node type?

    - I like the Pools! How about moving Pool with its functions out
      of the main.bro, just for clarity.

    - Looks like the hello/bye events are broadcasted by all nodes. Is
      that on purpose, or should that be limited to just one, like
      just the master sending them out? Or does it not matter and this
      provides for more redundancy?

    - create_store() vs "stores": Is the idea that I'd normally use
      create_store() and that populates the table, but I could also
      redef it myself instead of using create_store() to create more
      custom entries? If so, maybe make that a bit more explicit in
      the comments that there're two ways to configure that table.

Robin

   - One thing I can't quite tell is if this is still aiming to
     maintain compatibility with the old communication system, like
     by keeping the proxies and also the *_events patterns. Looking
     at setup-connections, it seems so. I'd say just go ahead and
     remove all legacy pieces. Maintain two schemes in parallel is
     cumbersome, and I think it's fine to just force everything over
     to Broker.

It does keep the old functionality around if one does “redef Cluster::use_broker=F", but the “T” branch of the code doesn’t aim to maintain compatibility. For the moment, I like having the old functionality around as reference: e.g. as I port other scripts/frameworks I may find it helpful to switch back to the old version to test/compare what it was doing. I’ve made a note to remove it after I get everything working/stable.

The "use_broker=T” code branch does keep the notion of proxies the same (at least in the way the connect to other nodes in the cluster). My thought was they can conceptually still be used for the same type of stuff: data sharing and offloading other misc. analysis/calculation. And the only change to the setup I think I’d have to make is that each worker would now connect with all proxies instead of just one and proxies do not connect to each other.

I’ve also been talking with Justin and it seems like he wants the ability for there to be multiple logger nodes in a cluster w/ ability to distribute logs between then, which seems possible, but would need some API changes in Bro to get that working (e.g. change the static log topic to one returned by user-defined function). I think he also had been expecting ‘data’ nodes to be a thing (not sure how those differ from proxies), so generally I’m worried I missed a previous discussion on what people expect the new cluster layout to look like or maybe just no one has put forth a coherent plan/design for that yet?

   - Is the idea for the "*_topic" constants that one just picks the
     apppropiate one when sending events? Like if I want to publish
     something to all workers, I'd publish to Cluster::worker_topic?

Yeah, you have the idea right.

     I think that's fine, though I'm wondering if we could compress
     the API there somehow so that Cluster doesn't need to export all
     those constants indvidiually. One idea would be a function that
     returns a topic based on node type?

Yeah, could do that, but also don't really see the problem with exporting things individually. At least that way, the topic strings are guaranteed to be correct in the generated docs. With a function, you’d have to maintain the topic strings in two places: the docs and the internal function implementation, which may seem trivial to get right, but I’ve seen enough instances of outdated documentation that I have doubts...

   - I like the Pools! How about moving Pool with its functions out
     of the main.bro, just for clarity.

Sure.

   - Looks like the hello/bye events are broadcasted by all nodes. Is
     that on purpose, or should that be limited to just one, like
     just the master sending them out? Or does it not matter and this
     provides for more redundancy?

Mostly on purpose. The point of the “hello” message is to map broker node ID to cluster node name. E.g. node IDs provided by Broker::peer_added are a hash of a MAC address concatenated w/ process ID (hard to read and associate with a cluster node) and node names are “manager”, “worker-1”, etc. At the point where two nodes connect, I don’t think we have any other information other than node IDs and we need the node names to be able to send more directed messages, thus the broadcast. At least I don’t think there’s another way to send directed messages (e.g. based on node ID) in Bro’s current API, maybe I missed it?

And the “bye” event is only raised locally so users can potentially handle it to know when a cluster node goes down (i.e. it gives them the friendlier node name rather than the broker node ID that you’d get from handling Broker::peer_lost).

I might generally be missing some context here: I remember broker endpoints originally being able to self-identify with the friendly names, so these new hello/bye events wouldn’t have been needed, but it didn’t seem like that functionality was around anymore.

   - create_store() vs "stores": Is the idea that I'd normally use
     create_store() and that populates the table, but I could also
     redef it myself instead of using create_store() to create more
     custom entries? If so, maybe make that a bit more explicit in
     the comments that there're two ways to configure that table.

That’s right, I’ll improve the docs.

- Jon

My thought was they can conceptually still be used for the same type
of stuff: data sharing and offloading other misc.
analysis/calculation.

Yeah, agree that we want such nodes, however I would like to switch
away from the proxy name. "proxy" had a very specific meaning with the
old communication system and calling the new nodes the same would be
confusing I think.

I’m worried I missed a previous discussion on what people expect the
new cluster layout to look like or maybe just no one has put forth a
coherent plan/design for that yet?

Justin, correct me if I'm wrong, but I don't think this has ever been
fully fleshed out. If anybody wants to propose something specific, we
can discuss, otherwise I would suggest we stay with the minimum for
now that replicates the old system as much as possible and then expand
on that going forward.

Yeah, could do that, but also don't really see the problem with
exporting things individually. At least that way, the topic strings
are guaranteed to be correct in the generated docs.

Yeah, that's true, I was mostly thinking from the perspective of
having a concise API in the export section. But either way seems fine.

the broadcast. At least I don’t think there’s another way to send
directed messages (e.g. based on node ID) in Bro’s current API, maybe
I missed it?

Ah, I misunderstood the purpose of these messages.

If I remember right we can send direct messages at the C++ level and
could expose that to Bro; or we could have nodes subscribe to a topic
that corresponds to their node ID. But not sure either would make it
much different, so nevermind.

I might generally be missing some context here: I remember broker
endpoints originally being able to self-identify with the friendly
names, so these new hello/bye events wouldn’t have been needed, but it
didn’t seem like that functionality was around anymore.

I actually don't remember. If we had it, not sure what happened to it.

Robin

My design for a new cluster layout is multiple data nodes and multiple logger nodes using the new RR and HRW pools Jon added.

It's not too much different from what we have now, just instead of doing things like statically configuring that worker-1,3,5,7 connects
to proxy-1 and worker-2,4,6,8 connect to proxy-2, workers would connect to all data nodes and loggers and use round robin/hashing
for distributing messages.

We have preliminary support for multiple loggers in broctl now, it just uses the static configuration method, so if you are running two
and one process dies, half the workers have no functioning logger.

The node.cfgs would look something like

## Multiple node cluster with redundant data/logger nodes
# manager - 1
[manager-1-logger]
host = manager1
type = logger

[manager-1-data]
host = manager1
type = data
lb_procs = 2

# manager - 2
[manager-2-logger]
host = manager2
type = logger

[manager-2-data]
host = manager2
type = data
lb_procs = 2

# worker 1
[worker-1]
host = workerN
type = worker
lb_procs = 16

...

# worker 4
[worker-4]
host = worker4
type = worker
lb_procs = 16

## 2(or more) node cluster with no SPOF:
# node - 1
[node-1-logger]
host = node1
type = logger

[node-1-data]
host = node1
type = data
lb_procs = 2

[node-1-workers]
host = worker1
type = worker
lb_procs = 16

# node - 2
[node-2-logger]
host = node2
type = logger

[node-2-data]
host = node2
type = data
lb_procs = 2

[node-2-workers]
host = worker2
type = worker
lb_procs = 16

Replicating the old system initially sounds good to me, just as long as that doesn't make it harder to expand things later.

The logger stuff should be the easier thing to change later since scripts don't deal with logger nodes directly and the
distribution would be handled in one place inside the logging framework. Multiple data nodes is a little harder to add in
later since that requires script language support and script changes for routing events across nodes.

I think for the most part the support for multiple data nodes comes down to 2 functions being required:

- a bif/function for sending an event to a data node based on the hash of a key.
  - This looks doable now with the HRW code, it's just not wrapped in a single function.

- a bif/function for efficiently broadcasting an event to all other workers (or data nodes)
  - If the current node is a data node, just send it to all workers
  - otherwise, round robin the event to a data node and have it send it to all workers minus the current node.

If &synchronized is going away script writers should be able to broadcast an event to all workers by doing something like

    Cluster::Broadcast(Cluster::WORKERS, event Foo(42));

This would replace a ton of code that currently uses things like worker2manager_events+manager2worker_events+@if ( Cluster::local_node_type() == Cluster::MANAGER )

Yeah, agree that we want such nodes, however I would like to switch
away from the proxy name. "proxy" had a very specific meaning with the
old communication system and calling the new nodes the same would be
confusing I think.

Agreed. There has been so much confusion over the "proxy" name that it's best to just get rid of it. Especially considering that the *exact* tasks those processes will be taking on will be slightly different.

Justin, correct me if I'm wrong, but I don't think this has ever been
fully fleshed out. If anybody wants to propose something specific, we
can discuss, otherwise I would suggest we stay with the minimum for
now that replicates the old system as much as possible and then expand
on that going forward.

Agreed on this too. Some of these changes sound like they could take a while to prototype and figure out how they would be effectively used.

   .Seth

- a bif/function for efficiently broadcasting an event to all other workers (or data nodes)
- If the current node is a data node, just send it to all workers
- otherwise, round robin the event to a data node and have it send it to all workers minus the current node.

In the case of broadcasting from a worker to all other workers, the reason why you relay via another node is only because workers are not connected to each other? Do we know that a fully-connected cluster is a bad idea? i.e. why not have a worker able to broadcast directly to all other workers if that’s what is needed?

If &synchronized is going away script writers should be able to broadcast an event to all workers by doing something like

   Cluster::Broadcast(Cluster::WORKERS, event Foo(42));

This would replace a ton of code that currently uses things like worker2manager_events+manager2worker_events+@if ( Cluster::local_node_type() == Cluster::MANAGER )

The successor to &synchronized was primarily intended to be the new data store stuff, so is there a way to map what you need onto that functionality? Or can you elaborate on an example where you think this new broadcast pattern is a better way to replace &synchronized than using a data store?

- Jon

- a bif/function for efficiently broadcasting an event to all other workers (or data nodes)
- If the current node is a data node, just send it to all workers
- otherwise, round robin the event to a data node and have it send it to all workers minus the current node.

In the case of broadcasting from a worker to all other workers, the reason why you relay via another node is only because workers are not connected to each other? Do we know that a fully-connected cluster is a bad idea? i.e. why not have a worker able to broadcast directly to all other workers if that’s what is needed?

Mostly so that workers don't end up spending all their time sending out messages when they should be analyzing packets.

If &synchronized is going away script writers should be able to broadcast an event to all workers by doing something like

  Cluster::Broadcast(Cluster::WORKERS, event Foo(42));

This would replace a ton of code that currently uses things like worker2manager_events+manager2worker_events+@if ( Cluster::local_node_type() == Cluster::MANAGER )

The successor to &synchronized was primarily intended to be the new data store stuff, so is there a way to map what you need onto that functionality? Or can you elaborate on an example where you think this new broadcast pattern is a better way to replace &synchronized than using a data store?

- Jon

I think a shared data store would work for most of the use cases where people are messing with worker2manager_events.

If all the cases of people using worker2manager_events+manager2worker_events to mimic broadcast functionality are really just
doing so to update data then it does make sense to just replace all of that with a new data store.

How would something like policy/protocols/ssl/validate-certs.bro look with intermediate_cache as a data store?

My view:

I have again and again encountered 4 types cases while doing script/pkg work:

1) manager2worker: Input-framework reads external data and all workers need to see it.
  examples: intel-framework,
2) worker2manager: workers see something report to manager, manager keeps
aggregated counts to make decisions
  example: scan-detection
3) worker2manager2all-workers: workers see something, send to manager, manager
distributes to all workers
  example: tracking clicked URLs from extracted from email

Basically, Bro has two kinds of heuristic needs

a) Cooked data analysis and corelations - cooked data is the data which ends up
in logs - basically the entire 'protocol record' example c$http or c$smtp -
these are majority.

Cooked data processing functionality can be also interpreted, for simplicity) as
:
  tail -f blah.log | ./python-script
  
  but inside bro.

b) Raw or derived data - which you need to extract from traffic with a defined
policy of your own (example - extracted URLs from email tapping into
mime_data_all event) or extracting mac addresses from router
advertisements/solicitation events or something which is not yet in ::Info
record or a new 'thing' - this should be rare and few use cases over time.

So in short, give me reliable events which are simply tail -f log functionality
on a data/processing node. It will reduce the number of syncronization needs by
order of magnitude(s).

for (b) - raw or derived data, we can keep complexities of broker stores and
syncs. etc. but I have hopes that a refined raw data could become its own log
easily and be processed as cooked data.

So a lot of data centrality issues related to cluster can go away with data
note which can handle a lot of cooked data related stuff for (1), (2) and in
somecases (3).

Now, while Justins' multiple data nodes idea has specticular merits, I am not much fan of it. Reason being having multiple data-notes results in same sets of problems - syncronization, latencies, mess of data2worker, worker2data events etc etc.

I'd love to keep things rather simple. Cooked data goes to one (or more) datanodes (datastores). Just replicate for relibaility rather then pick and choose what goes where.

Just picking up some things:

> In the case of broadcasting from a worker to all other workers, the reason why you relay via another node is only because workers are not connected to each other? Do we know that a fully-connected cluster is a bad idea? i.e. why not have a worker able to broadcast directly to all other workers if that’s what is needed?

Mostly so that workers don't end up spending all their time sending out messages when they should be analyzing packets.

Yes, Also, I have seen this can case broadcast stroms. Thats why I have always
used manager as a central judge on what goes. See, often same data is seen by
all workers. so if manager is smart, it can just send first instance to workers
and all other workers can stop announcing further.

Let me explain:

- I block a scanner on 3 connections.
- 3 workers see a connection each - they each report to manager
- manager says "yep scanner" sends note to all workers saying traffic from this
  IP is now uninteresting stop reporting.
- lets say 50 workers
- total commnication events = 53

If all workers send data to all workers a scanner hitting 65,000 hosts will be a
mess inside cluster. esp when scanners are hitting in ms and not seconds.

Similar to this is another case.

lets say

- I read 1 million blacklisted IPs from a file on manager.
- manager sends 1 million X 50 events ( to 50 workers)
- each worker needs to report if a blacklisted IP has touched network
- now imagine, if we want to keep a count of how many unique local IPs has each
  of these blacklisted IPs touched
- and at what rate and when was first contact and when was last contact.

(btw, I have a working script for this - so whatever new broker does, it needs
to be able to give me this functionality)

Here is a sample log:

#fields ts ipaddr ls days_seen first_seen last_seen active_for last_active hosts total_conns source
1509606970.541130 185.87.185.45 Blacklist::ONGOING 3 1508782518.636892 1509462618.466469 07-20:55:00 01-16:05:52 20 24 TOR
1509606980.542115 46.166.162.53 Blacklist::ONGOING 3 1508472908.494320 1509165782.304233 08-00:27:54 05-02:33:18 7 9 TOR
1509607040.546524 77.161.34.157 Blacklist::ONGOING 3 1508750181.852639 1509481945.439893 08-11:16:04 01-10:44:55 7 9 TOR
1509607050.546742 45.79.167.181 Blacklist::ONGOING 4 1508440578.524377 1508902636.365934 05-08:20:58 08-03:40:14 66 818 TOR
1509607070.547143 192.36.27.7 Blacklist::ONGOING 6 1508545003.176139 1509498930.174750 11-00:58:47 01-06:02:20 30 33 TOR
1509607070.547143 79.137.80.94 Blacklist::ONGOING 6 1508606207.881810 1509423624.519253 09-11:03:37 02-02:57:26 15 16 TOR

Aashish

Aashish

Now, while Justins' multiple data nodes idea has specticular merits, I am not much fan of it. Reason being having multiple data-notes results in same sets of problems

It does not have the same problems.. It may have different problems that I haven't thought of yet, but it doesn't have the same problems.

syncronization,

What synchronization problems?

latencies

Adding multiple data nodes will reduce the load on each node and lower overall latencies.

mess of data2worker, worker2data events etc etc

you're projecting the current mess of worker2manager_events and manager2worker_events onto what I am trying to replace them with.

Having
    worker2manager_events
and
    @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )

all over the place exists because bro doesn't have higher level methods for distributing data and events across the cluster. I am not proposing replacing that with

    worker2datanode_events
and
    @if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::DATANODE )

I'm proposing getting rid of that sort of thing entirely. No '@if cluster'. no 'redef worker2manager_events'. All gone.

I'd love to keep things rather simple. Cooked data goes to one (or more) datanodes (datastores). Just replicate for relibaility rather then pick and choose what goes where.

Then clusters will just change from having an overloaded manager process that is falling under the load to 2 data nodes that are both failing. This is just renaming the current bottlenecks and is not a solution.

I implemented a multi data node cluster back in March on top of topic/mfischer/broker-integration . Porting my scan.bro from the manager2worker_events stuff to sending events directly to one of N datanodes was:

Remove:

    redef Cluster::worker2manager_events ...
    @if (Cluster ...
    event Scan::scan_attempt(scanner, attempt);

Add:
    
    local args = Broker::event_args(Scan::scan_attempt, scanner, attempt);
    Cluster::send_event_hashed(scanner, args);

Other than having that wrapped in a single function, it doesn't get any easier than that.

- a bif/function for efficiently broadcasting an event to all other workers (or data nodes)
- If the current node is a data node, just send it to all workers
- otherwise, round robin the event to a data node and have it send it to all workers minus the current node.

In the case of broadcasting from a worker to all other workers, the reason why you relay via another node is only because workers are not connected to each other? Do we know that a fully-connected cluster is a bad idea? i.e. why not have a worker able to broadcast directly to all other workers if that’s what is needed?

Mostly so that workers don't end up spending all their time sending out messages when they should be analyzing packets.

Ok, I get what you want to avoid, though could be interesting to actually have a fully-connected cluster in order to collect performance data on each comm. pattern and see how significant the difference is for a variety of use-cases.

Do you have a particular example you can give where you’d use this BIF/function to relay a broadcast from a worker to all other workers via a proxy?

How would something like policy/protocols/ssl/validate-certs.bro look with intermediate_cache as a data store?

global intermediate_store: Cluster::StoreInfo;

event bro_init()
  {
  intermediate_store = Cluster::create_store(“ssl/validate-certs/intermediate_store");
  }

And then port the rest of that script to use broker data store api (get/put/exists calls) to access that store.

- Jon

Thanks, though I’m not sure this scenario maps well to this particular point. E.g. my impression is Justin wants a single BIF/function that can send one event from a worker to a proxy and have the proxy purely relay it to all other workers without doing anything else. So it’s solely taking the cost of sending N messages from a worker and offloading that burden to a different node.

I think your example differs because there is actually an additional task/logic being performed on the middleman that ends up reducing comm/processing requirements. i.e. it’s not just a pure relay of messages.

Or maybe I’m just not understanding what anybody wants :slight_smile:

- Jon

Mostly so that workers don't end up spending all their time sending out messages when they should be analyzing packets.

Ok, I get what you want to avoid, though could be interesting to actually have a fully-connected cluster in order to collect performance data on each comm. pattern and see how significant the difference is for a variety of use-cases.

Do you have a particular example you can give where you’d use this BIF/function to relay a broadcast from a worker to all other workers via a proxy?

Scripts like what validate-certs does to broadcast the presence of a new intermediate cert to the other nodes in the cluster.
That script does have the added optimization on the manager side for only broadcasting the value once if a new cert is seen by two workers at the same time,
so maybe it's not the best example for a broadcast.

With explicit event destinations and load balancing across data nodes that script could look something like

function add_to_cache(key: string, value: vector of opaque of x509)
    {
    # could still do @if ( Cluster::is_enabled() ), but in a standalone setup we are the data
    # node, so this would just short circuit to raise the event locally. I'd rather broker do
    # the @if internally than have every script have to have two implementations.
    Broker::publish_hrw(Cluster::data_pool, key, SSL::new_intermediate, key, value);
    }

event SSL::new_intermediate(key: string, value: vector of opaque of x509)
    {
    if ( key in intermediate_cache )
        return;
    intermediate_cache[key] = value;
    # in a standalone setup this would just be a NOOP
    Broker::broadcast(Cluster::worker_pool, SSL:: intermediate_add, key, value);
    }

event SSL::intermediate_add(key: string, value: vector of opaque of x509)
    {
    intermediate_cache[key] = value;
    }

Without the added optimization you'd just have

function add_to_cache(key: string, value: vector of opaque of x509)
    {
    intermediate_cache[key] = value;
    # in a standalone setup this would just be a NOOP
    Broker::broadcast(Cluster::worker_pool, SSL:: intermediate_add, key, value);
    }

event SSL::intermediate_add(key: string, value: vector of opaque of x509)
    {
    intermediate_cache[key] = value;
    }

The optimization could be built into broker though, something like

    Broker::broadcast_magic_once_whatever(Cluster::worker_pool, key, SSL:: intermediate_add, key, value);

That would hash the key, send it to a data node, then have the data node broadcast the
event while adding key to a 'recently broadcasted keys' table that only needs to buffer for 10s or so.

This would enable you to efficiently broadcast an event (once) across all workers with a single line of code.

In either case all of the

manager2worker_events
worker2manager_events
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )

Is no longer needed.

I guess my whole point with all of this is that if the intent of a script is that an event should be seen on all workers, the script should look something like

    Broker::broadcast(Cluster::worker_pool, SSL:: intermediate_add, key, value);

and not have a bunch of redefs and @ifs so that the script can eventually have

    event SSL::new_intermediate(key, value);

How would something like policy/protocols/ssl/validate-certs.bro look with intermediate_cache as a data store?

global intermediate_store: Cluster::StoreInfo;

event bro_init()
  {
  intermediate_store = Cluster::create_store(“ssl/validate-certs/intermediate_store");
  }

And then port the rest of that script to use broker data store api (get/put/exists calls) to access that store.

- Jon

Does that have the same performance profile as the current method?

    if (issuer in intermediate_cache)

vs

    Broker::get(intermediate_cache, issuer)

I think you're understanding it perfectly :slight_smile:

You're right that it's often not a pure relay, but it's often the same messaging pattern of "send to all other workers,
but deduplicate it first to avoid sending the same message twice back to back".

For an example of a purely broadcast use case, see

scripts/base/frameworks/intel/cluster.bro

You can see the crazy amount of complexity around the Intel::cluster_new_item event.

Ultimately what I'm wanting are the messaging patterns to be implemented in something bro ships, so that script
writers don't have to implement message broadcasting and relaying themselves - and end up doing it less efficiently
than bro can do it internally. Requiring script writers to write cluster and non-cluster versions peppered with @if (Cluster
means that bro is not abstracting things at the right level anymore.

Maybe with broker data stores there won't be much use for things like event broadcasting, but I feel like anything that
does need to be broadcasted should be using an explicit Broadcast function, and not things like manager2worker_events.

The optimization could be built into broker though, something like

   Broker::broadcast_magic_once_whatever(Cluster::worker_pool, key, SSL:: intermediate_add, key, value);

That would hash the key, send it to a data node, then have the data node broadcast the
event

The first thing I’d suggest is a new internal broker message type that can publish a message via a topic and then on any receiving nodes re-publish via a second topic. In this way, we could distribute to all workers via a single proxy/data node as:

  local one_proxy_topic = Cluster::rr_topic(Cluster::proxy_pool, “ssl/intermediate_add”));
  local e = Broker::make_event(SSL::intermediate_add, key, value);
  Broker::relay(one_proxy_topic, Cluster::worker_topic, e);

Or potentially compressed into one call:

  Cluster::relay(first_topic, second_topic, rr_key, event, varargs...);

while adding key to a 'recently broadcasted keys' table that only needs to buffer for 10s or so.

This would enable you to efficiently broadcast an event (once) across all workers with a single line of code.

I’m not sure about this part. Is that just for throttling potential duplicates? Is that something you want generally or just for the this particular example? I’m thinking maybe it can wait until we know we have several places where it’s actually needed/used before compressing that pattern into a single BIF/function.

I guess my whole point with all of this is that if the intent of a script is that an event should be seen on all workers, the script should look something like

   Broker::broadcast(Cluster::worker_pool, SSL:: intermediate_add, key, value);

and not have a bunch of redefs and @ifs so that the script can eventually have

   event SSL::new_intermediate(key, value);

Yeah, minimizing the need for people to have to constantly implement various code paths that depend on cluster/node-type is a good goal and what I’m also aiming at.

How would something like policy/protocols/ssl/validate-certs.bro look with intermediate_cache as a data store?

global intermediate_store: Cluster::StoreInfo;

event bro_init()
  {
  intermediate_store = Cluster::create_store(“ssl/validate-certs/intermediate_store");
  }

And then port the rest of that script to use broker data store api (get/put/exists calls) to access that store.

- Jon

Does that have the same performance profile as the current method?

   if (issuer in intermediate_cache)

vs

   Broker::get(intermediate_cache, issuer)

Theoretically, they’re both in-memory hash-table lookups, though the implementations are obviously very different and I don’t know how they compare in reality.

I think it’s true that many scripts could be ported to use either data stores or just explicitly exchange events. Probably the preference to use a data store would be for cases where it may be useful to keep persistent data across crashes/restarts.

- Jon

That's right! Took me some time to figure out how data should be distributed. So, I am following this thread trying to keep up with the development. Right now I don't have new ideas to contribute but as the intel framework was mentioned multiple times as an example, I thought I might sketch its communication behavior, so that we have a more complete view of that use case.

Let's assume in the beginning there is only the manager. The manager reads in the intel file and creates his "in-memory database", a quite complex table (DataStore), as well as a data structure that contains only the indicators for matching on the workers (MinDataStore).

Now, when a worker connects, he receives the current MinDataStore, sent using send_id. (Side note: I am planning to replace the sets used in the MinDataStore by Cuckoo Filters. Not sure how serialization etc. will work out using broker but if I remember correctly there is a temporary solution for now.) If the worker detects a match, he triggers match_no_items on the manager, who generates the hit by combining the seen data of the worker and the meta data of the DataStore.

At this point, if the manager functionality is distributed across multiple data nodes, we have to make sure, that every data node has the right part of the DataStore to deal with the incoming hit. One could keep the complete DataStore on every data node but I think that would lead to another scheme in which a subset of workers send all their requests to a specific data node, i.e. each data node serves a part of the cluster.

Back to the current implementation. So far not that complex but there are two more cases to deal with: Inserting new intel items and removing items. A new item can be inserted on the manager or on a worker. As a new item might be just new meta data for an already existing indicator (no update of the MinDataStore needed), the manager is the only one who can handle the insert. So if inserted on a worker, the worker triggers a cluster_new_item event on the manager, who proceeds like he inserted the item. Finally, the manager only triggers cluster_new_item on the workers if the inserted item was a new indicator that has to be added to the worker's MinDataStores. Some of the complexity here is due to the fact that the same event, cluster_new_item, is used for communication in both directions (worker2manager and manager2worker). The removal of items works more or less the same with the only difference that for each direction there is a specific event (remove_item and purge_item).

Long story short: I think the built in distribution across multiple data nodes you discussed is a great idea. The only thing to keep in mind would be a suitable way of "initializing" the data nodes with the corresponding subset of data they need to handle. I guess in case of the intel framework the manager will still handle reading the intel files and might make use of the same mechanisms the workers use to distribute the ingested data to the data nodes. The only thing I am not sure about is how we can/should handle dynamic adding and removing of the data nodes.

And just to avoid misunderstandings: We won't be able to get rid of the
@if (Cluster::local_node_type() != Cluster::MANAGER/DATANODE)
statements completely as different node types have different functionality. It's just about the communication API, right?

I hope this helps when thinking about the API design :slight_smile:

Jan

The ability to write that code doesn’t go away, I think it’s just in some places we may have the ability to do something else that may be easier to understand and/or less busy-work for script-writers to implement.

As I port the handful of scripts that come with Bro, I’m generally hesitant to radically change/reorganize the way they work. For obvious comm. patterns that are repeated and can be replaced with something simpler, I’d do that, but for complex scripts I’d likely try to just do the simplest translation that gets it working.

I also want to make sure there’s a good foundation for people to then make further/larger changes and it sounds like there will be: it’s mostly a matter of changing the cluster layout a bit and maybe giving Justin a few more functions related to message patterns/distribution. Once the thread slows, I’ll post a concise summary of the particular changes that I think are needed.

- Jon

Yeah, this is where the HRW(hashing) vs RR(round robin) pool distribution methods come in.

If all data nodes had a full copy of the data store, then either dsitribution method would work.

Partitioning the intel data set is a little tricky since it supports subnets and hashing 10.10.0.0/16
and 10.10.10.10 won't necessarily give you the same node. Maybe subnets need to exist on all
nodes but everything else can be partitioned? There would also need to be a method for
re-distributing the data if the cluster configuration changes due to nodes being added or removed.

'Each data node serving a part of a cluster' is kind of like what we have now with proxies,
but that is statically configured and has no support for failover. I've seen cluster setups where
there are 4 worker boxes and run one proxy on each box. The problem is if one box down,
1/4 of the workers on the remaining 3 boxes are configured to use a proxy that no longer exists.

So minimally just having a copy of the data in another process and using RR would be an improvement.

There may be an issue with scaling out data notes to 8+ processes for things like scan detection and sumstats,
if those 8 data nodes would also need to have a full copy of the intel data in memory. I don't know how much
memory a large intel data set is inside a running bro process though.

Things like scan detection,sumstats,known hosts/ports/services/certs are a lot easier to partition because by definition
they are keyed on something.

On 03/11/17 18:07, Azoff, Justin S wrote:> Partitioning the intel data set is a little tricky since it supports subnets and hashing 10.10.0.0/16

and 10.10.10.10 won't necessarily give you the same node. Maybe subnets need to exist on all
nodes but everything else can be partitioned?

Good point! Subnets are stored kind of separate to allow prefix matches anyway. However, I am a bit hesitant as it would become a quite complex setup.

There would also need to be a method for
re-distributing the data if the cluster configuration changes due to nodes being added or removed.

Right, that's exactly what I was thinking of. I guess this applies also to other use cases which will use HRW. I am just not sure whether dynamic layout changes are out of scope at the moment...

'Each data node serving a part of a cluster' is kind of like what we have now with proxies,
but that is statically configured and has no support for failover. I've seen cluster setups where
there are 4 worker boxes and run one proxy on each box. The problem is if one box down,
1/4 of the workers on the remaining 3 boxes are configured to use a proxy that no longer exists.

So minimally just having a copy of the data in another process and using RR would be an improvement.

There may be an issue with scaling out data notes to 8+ processes for things like scan detection and sumstats,
if those 8 data nodes would also need to have a full copy of the intel data in memory. I don't know how much
memory a large intel data set is inside a running bro process though.

Fully agreed! In that case it might be nice if one can define separate special purpose data nodes, e.g. "intel data nodes". But, I am not sure whether this is a good idea as this might lead to complex cluster definitions and poor usability as users need to know a bit about how the underlying mechanisms work. On the other hand this would theoretically allow to completely decouple the intel data store (e.g. interface a "real" database with some pybroker-scripts).

Jan

I had a similar thought, but also not sure if it’s a good idea. Example node.cfg:

[data-1]
type = data
pools = Intel::pool

[data-2]
type = data
pools = Intel::pool

[data-3]
type = data

[data-4]
type = data

So there would be two pools here: Cluster::data_pool which is already predefined by the cluster framework (and consists of all data nodes that have not been specifically assigned to other pools) and Intel::pool which is defined/registered by the intel framework. Then there’s some magic that makes broctl set up those nodes so that they will belong to any pools listed in the config file and the cluster framework will manage it from there. So this gives users more opportunity to customize, but a problem is it’s hard to say whether the default config file will end up doing something sane for all cases or if you end up with script-writers having more complicated installation instructions like “you should definitely change your node.cfg and don’t scale this pool out to more than N data nodes”.

- Jon

On 03/11/17 18:07, Azoff, Justin S wrote:> Partitioning the intel data set is a little tricky since it supports subnets and hashing 10.10.0.0/16

and 10.10.10.10 won’t necessarily give you the same node. Maybe subnets need to exist on all
nodes but everything else can be partitioned?

Good point! Subnets are stored kind of separate to allow prefix matches anyway. However, I am a bit hesitant as it would become a quite complex setup.

Indeed… replication+load balancing is probably a good enough first step.

There would also need to be a method for
re-distributing the data if the cluster configuration changes due to nodes being added or removed.

Right, that’s exactly what I was thinking of. I guess this applies also to other use cases which will use HRW. I am just not sure whether dynamic layout changes are out of scope at the moment…

Other use cases are still problematic, but even without replication/redistribution the situation is still greatly improved.
Take scan detection for example:

With sumstats/scan-ng/simple-scan if the current manager host or process dies, all detection comes to a halt
until it is restarted. Once it is restarted, all state is lost so everything starts over from 0.

If there were 4 data nodes participating in scan detection, and all 4 die, same result, so this is no better or
worse than the current situation.
If only one node dies though, only 1/4 of the analysis is affected. The remaining analysis can immediately
fail over to the next node. So while it may still have to start from 0, there would only be a small hole in the analysis.

For example:

The scan threshold is 20 packets.

A scan has just started from 10.10.10.10.
10 packets into the scan, the data node that 10.10.10.10 hashes to crashes.
HRW now routes data for 10.10.10.10 to another node
30 packets into the scan, the threshold on the new node crosses 20 and a notice is raised.

Replication between data nodes could make this even more seamless, but it’s not a huge priority, at least for me.
My priority is getting the cluster to a point where things don’t grind to a halt just because one component is down.

Ignoring the worker->logger connections, it would look something like the attached layout.png

Fully agreed! In that case it might be nice if one can define separate special purpose data nodes, e.g. “intel data nodes”. But, I am not sure whether this is a good idea as this might lead to complex cluster definitions and poor usability as users need to know a bit about how the underlying mechanisms work. On the other hand this would theoretically allow to completely decouple the intel data store (e.g. interface a “real” database with some pybroker-scripts).

Jan

I’ve been thinking the same thing, but I hope it doesn’t come to that. Ideally people will be able
to scale their clusters by just increasing the number of data nodes without having to get into
the details about what node is doing what.

Partitioning the data analysis by task has been suggested… i.e., one data node for scan detection,
one data node for spam detection, one data node for sumstats… I think this would be very easy to
implement, but it doesn’t do anything to help scale out those individual tasks once one process can
no longer handle the load. You would just end up with something like the scan detection and spam
data nodes at 20% cpu and the sumstats node CPU at 100%