scheduling events vs using &expire_func ?

I have a aggregation policy where I am trying to keep counts of number of
connections an IP made in a cluster setup.

For now, I am using table on workers and manager and using expire_func to
trigger worker2manager and manager2worker events.

All works great until tables grow to > 1 million after which expire_functions
start clogging on manager and slowing down.

Example of Timer from prof.log on manager:

1523636760.591416 Timers: current=57509 max=68053 mem=4942K lag=0.44s
1523636943.983521 Timers: current=54653 max=68053 mem=4696K lag=168.39s
1523638289.808519 Timers: current=49623 max=68053 mem=4264K lag=1330.82s
1523638364.873338 Timers: current=48441 max=68053 mem=4162K lag=60.06s
1523638380.344700 Timers: current=50841 max=68053 mem=4369K lag=0.47s

So Instead of using &expire_func, I can probably try schedule {} ; but I am not
sure how scheduling events are any different internally then scheduling
expire_funcs ?

I'd like to think/guess that scheduling events is probably less taxing. but
wanted to check with the greater group on thoughts - esp insights into their
internal processing queues.

Thanks,
Aashish

I have a aggregation policy where I am trying to keep counts of number of
connections an IP made in a cluster setup.

For now, I am using table on workers and manager and using expire_func to
trigger worker2manager and manager2worker events.

All works great until tables grow to > 1 million after which expire_functions
start clogging on manager and slowing down.

Example of Timer from prof.log on manager:

1523636760.591416 Timers: current=57509 max=68053 mem=4942K lag=0.44s
1523636943.983521 Timers: current=54653 max=68053 mem=4696K lag=168.39s
1523638289.808519 Timers: current=49623 max=68053 mem=4264K lag=1330.82s
1523638364.873338 Timers: current=48441 max=68053 mem=4162K lag=60.06s
1523638380.344700 Timers: current=50841 max=68053 mem=4369K lag=0.47s

So Instead of using &expire_func, I can probably try schedule {} ; but I am not
sure how scheduling events are any different internally then scheduling
expire_funcs ?

There's a single timer per table that continuously triggers incremental iteration over fixed-size chunks of the table, looking for entries to expire. The relevant options that you can tune here:

* `table_expire_interval`
* `table_incremental_step`
* `table_expire_delay`

I'd like to think/guess that scheduling events is probably less taxing. but
wanted to check with the greater group on thoughts - esp insights into their
internal processing queues.

I'm not clear on exactly how your code would be restructured around scheduled events, though guessing if you just did one event per entry that needs to be expired, it's not going to be better. You would then have one timer per table entry (up from a single timer), or possibly more depending on expiration scheme (e.g. if it's expiring on something other than create times, you're going to need a way to invalidate previously scheduled events).

Ultimately, you'd likely still have the same amount of equivalent function calls (whatever work you're doing in &expire_func, would still need to happen). With the way table expiration is implemented, my guess is that the actual work required to call and evaluate the &expire_func code becomes too great at some point, so maybe first try decreasing `table_incremental_step` or reducing the work that you need to do in the &expire_func.

With new features in the upcoming broker-enabled cluster framework (soon to be merged into git/master), I'd suggest a different way to think about structuring the problem: you could Rendezvous Hash the IP addresses across proxies, with each one managing expiration in just their own table. In that way, the storage/computation can be uniformly distributed and you should be able to simply adjust number of proxies to fit the required scale.

- Jon

For now, I am resorting to &expire_func route only. I think by using some more
heuristics in worker's expire functions for more aggregated stats, I am able to
shed load on manager where manager doesn't need to track ALL potential scanners.

Lets see, I am running to see if new code works without exhausting memory for few days.

Yes certainly, the following changed did address the manager network_time()
stall issues:

redef table_expire_interval = 0.1 secs ;
redef table_incremental_step=25 ;

Useful observation: if you want to expire a lot of entires from a table/set,
expire few but expire often.

I Still need to determine limits of both table_incremental_step,
table_expire_interval and this works for million or million(s) of entires.

on expiration scheme (e.g. if it's expiring on something other than create
times, you're going to need a way to invalidate previously scheduled
events).

Actually, in this case - I was more thinking in terms of let scheduled event
kick in and in that event I decide to further schedule a next one or delete the
entry from the table.

actual work required to call and evaluate the &expire_func code becomes too
great at some point, so maybe first try decreasing `table_incremental_step`

Yes, It seems like that. I still don't know at what point. In previous runs it
appears after table had 1.7-2.3 Million entires. But then I don't think its
function of counts, but how much RAM i've got on the system. Somewhere in the
range is when manager ran out of memory. HOwever (as stated above), I was able
to come up with a little heuristics which still allows me to keep track of
really slow scanners, while not burdening manager but rather let load be on
workers. Simple observation that really slow scanners aren't going to have a lot
of connections allows to keep those in (few) worker table. This would
potentially be a problem if there really a LOT of very slow scanners. but,
still, those all get divided by number of workers we run.

I'd suggest a different way to think about
structuring the problem: you could Rendezvous Hash the IP addresses across
proxies, with each one managing expiration in just their own table. In that
way, the storage/computation can be uniformly distributed and you should be
able to simply adjust number of proxies to fit the required scale.

I think above might work reasonable.

So previously I was making manager keep count of potential scanners but now
moving that work instead to workers. New model would let us just move all this
to proxy(ies) and proxies can decide if delete or send to manager for
aggregation.

I suppose, given proxies don't process packets, it will be cheaper there to do
all this work.

Only thing bothers me is scan-detection is a complicated problem only because of
distribution of data in cluster. Its a lot simple problem if we could just do a
tail -f conn.log | ./python-script

So yes we can shed load from manger -> workers -> proxies. I'll try this
approach. But I think I am also going to try (with new broker-enabled cluster)
approach of sending all connections to one proxy/data-store and just do
aggregation there and see if that works out (the tail -f conn.log |
'python-script' approach). Admittedly, this needs more thinking to get the right
architecture in the new cluster era!

Thanks,
Aashish

For now, I am resorting to &expire_func route only. I think by using some more
heuristics in worker's expire functions for more aggregated stats, I am able to
shed load on manager where manager doesn't need to track ALL potential scanners.

Lets see, I am running to see if new code works without exhausting memory for few days.

Yes certainly, the following changed did address the manager network_time()
stall issues:

redef table_expire_interval = 0.1 secs ;
redef table_incremental_step=25 ;

Useful observation: if you want to expire a lot of entires from a table/set,
expire few but expire often.

I Still need to determine limits of both table_incremental_step,
table_expire_interval and this works for million or million(s) of entires.

That should probably work unless you are adding new table entries at a rate higher than 250/second.
You may need to tweak that so interval x step is at least the rate of new entries.

Yes, It seems like that. I still don't know at what point. In previous runs it
appears after table had 1.7-2.3 Million entires. But then I don't think its
function of counts, but how much RAM i've got on the system. Somewhere in the
range is when manager ran out of memory. HOwever (as stated above), I was able
to come up with a little heuristics which still allows me to keep track of
really slow scanners, while not burdening manager but rather let load be on
workers. Simple observation that really slow scanners aren't going to have a lot
of connections allows to keep those in (few) worker table. This would
potentially be a problem if there really a LOT of very slow scanners. but,
still, those all get divided by number of workers we run.

How are you tracking slow scanners on the workers? If you have 50 workers and you
are not distributing the data between them, there's only a 1 in 50 chance that you'll
see the same scanner twice on the same worker, and a one in 2500 that you'd see
3 packets in a row on the same worker... and 1:125,000 for 4 in a row.

I'd suggest a different way to think about
structuring the problem: you could Rendezvous Hash the IP addresses across
proxies, with each one managing expiration in just their own table. In that
way, the storage/computation can be uniformly distributed and you should be
able to simply adjust number of proxies to fit the required scale.

I think above might work reasonable.

It does, it works great.

So previously I was making manager keep count of potential scanners but now
moving that work instead to workers. New model would let us just move all this
to proxy(ies) and proxies can decide if delete or send to manager for
aggregation.

There's no need, you just aggregate it on the proxies, the manager never sees anything.

I suppose, given proxies don't process packets, it will be cheaper there to do
all this work.

Only thing bothers me is scan-detection is a complicated problem only because of
distribution of data in cluster. Its a lot simple problem if we could just do a
tail -f conn.log | ./python-script

That doesn't simplify anything, that just moves the problem. You can only tail the single
conn.log because the logger aggregated the records from all the workers. If the manager
is running out of memory tracking all of the scanners, then the single instance of the python
script is going to run into the same issue at some point.

So yes we can shed load from manger -> workers -> proxies. I'll try this
approach. But I think I am also going to try (with new broker-enabled cluster)
approach of sending all connections to one proxy/data-store and just do
aggregation there and see if that works out (the tail -f conn.log |
'python-script' approach). Admittedly, this needs more thinking to get the right
architecture in the new cluster era!

No.. this is just moving the problem again. If your manager is running out of memory and you
move everything to one proxy, that's just going to have the same problem.

The fix is to use the distributing message routing features that I've been talking about for a while
(and that Jon implemented in the actor-system branch!)

The entire change to switch simple-scan from aggregating all scanners on a single manager to
aggregating scanners across all proxies (which can be on multiple machines) is swapping

         event Scan::scan_attempt(scanner, attempt);

with

        Cluster::publish_hrw(Cluster::proxy_pool, scanner, Scan::scan_attempt, scanner, attempt);

(with some @ifdefs to make it work on both versions of bro)

Justin,

How are you tracking slow scanners on the workers? If you have 50 workers and you
are not distributing the data between them, there's only a 1 in 50 chance that you'll
see the same scanner twice on the same worker, and a one in 2500 that you'd see
3 packets in a row on the same worker... and 1:125,000 for 4 in a row.

Yes, that was the observation and idea. If real slow scanners, (won't be way too many way
too soon (obviously)) let each one of their start time be tracked on workers.
ODDS of hitting same worker are too low, so burden of tracking start time for
1000's of slow scanner is distributed fairly even across 10/50 workers *instead*
of manager having to store all this.

So 600K slow scanners means |manager_table| +=600K vs |worker_table| = 600K/50
so burden of memory is more distributed.

I checked some numbers on my end = since midnight we flagged 172K scanners while
tracking 630K potential scanners which will eventually be flagged.

Issue isn't flagging these. Issue was being accurate on when was the very first
time we saw a IP connect to us and keep that in memory - this is not needed but
good to have stat.

>> I'd suggest a different way to think about
>> structuring the problem: you could Rendezvous Hash the IP addresses across
>> proxies, with each one managing expiration in just their own table. In that
>> way, the storage/computation can be uniformly distributed and you should be
>> able to simply adjust number of proxies to fit the required scale.
>
That doesn't simplify anything, that just moves the problem. You can only tail the single
conn.log because the logger aggregated the records from all the workers. If the manager
is running out of memory tracking all of the scanners, then the single instance of the python
script is going to run into the same issue at some point.

I agree, but we digress on the actual issue here. see below.

> So yes we can shed load from manger -> workers -> proxies. I'll try this
> approach. But I think I am also going to try (with new broker-enabled cluster)
> approach of sending all connections to one proxy/data-store and just do
> aggregation there and see if that works out (the tail -f conn.log |
> 'python-script' approach). Admittedly, this needs more thinking to get the right
> architecture in the new cluster era!

No.. this is just moving the problem again. If your manager is running out of memory and you
move everything to one proxy, that's just going to have the same problem.

I think we've talked about this roughly over 2 years. I am probably
mis-understanding or may be unclear. the issue is complexity of aggregation due
to clusterization in scan-detection. now you can use many proxies, many data
nodes etc but as long as distributed nature of data is there, aggregation in
realtime is problem. Data needs to be concentrated at one place. A tail -f
conn.log is data concentrated at one place.

Now its a different issue that conn.log entry is 5+ seconds late which can
already miss a significant scan etc.

The fix is to use the distributing message routing features that I've been talking about for a while
(and that Jon implemented in the actor-system branch!)

The entire change to switch simple-scan from aggregating all scanners on a single manager to
aggregating scanners across all proxies (which can be on multiple machines) is swapping

aggregating across all proxies is still distributing data around. So the way I
see is you are moving the problem around :slight_smile: But as I said, I don't know more how
this works since I haven't tried new broker stuff just yet.

         event Scan::scan_attempt(scanner, attempt);
with
        Cluster::publish_hrw(Cluster::proxy_pool, scanner, Scan::scan_attempt, scanner, attempt);
(with some @ifdefs to make it work on both versions of bro)

I am *really* looking forward to trying this stuff out in new broker model.

Aashish

conn.log because the logger aggregated the records from all the workers. If the manager
is running out of memory tracking all of the scanners, then the single instance of the python
script is going to run into the same issue at some point.

Oh totally forgot to add an important point.

Issue isn't memory to begin with. Issue was 2Million entires in a table can
result in expire_function to be slow clogging the event queue resulting in stall
network_time and manager lagging significantly behind.

Holding potential scanners over workers helps me break manager table down to
much smaller size across workers.

How are you tracking slow scanners on the workers? If you have 50 workers and you
are not distributing the data between them, there's only a 1 in 50 chance that you'll

Exactly, thats what keeps worker table sizes small. until there are enough
connections to flag something as scanner. Note: worker_tables only keep
IP->start_time

They report to manager a potential scanner but manager doesn't need to keep that
in table. I think I use combination of bloomfilter and hyperloglog there to
scale to Millions easily.

Note2: this is to generate scan_summary and not a scanner.

this thing:

#fields ts scanner state detection start_ts end_ts detect_ts detect_latency total_conn total_hosts_scanned duration scan_rate country_code region city distance event_peer
#types time addr enum string time time time interval count count interval double string string string double string
1522090899.384438 122.224.112.162 Scan::DETECT LandMine 1522090219.816163 1522090744.317532 1522090744.317532 524.501369 19 19 524.501369 27.605335 CN 02 Hangzhou 6243.450744 bro
1522094550.487969 122.224.112.162 Scan::UPDATE LandMine 1522090219.816163 1522094128.634672 1522090744.317532 524.501369 110 109 3908.818509 35.86072 CN 02 Hangzhou 6243.450744 bro
1522098156.871486 122.224.112.162 Scan::UPDATE LandMine 1522090219.816163 1522097984.861365 1522090744.317532 524.501369 225 227 7765.045202 34.207248 CN 02 Hangzhou 6243.450744 bro
1522101784.996068 122.224.112.162 Scan::UPDATE LandMine 1522090219.816163 1522101081.946002 1522090744.317532 524.501369 359 365 10862.129839 29.75926 CN 02 Hangzhou 6243.450744 bro
1522354414.224635 122.224.112.162 Scan::FINISH LandMine 1522090219.816163 1522103520.055414 1522090744.317532 524.501369 488 507 13300.239251 26.233214 CN 02 Hangzhou 6243.450744 bro

Aashish

I think the part that you are missing is that the data is PARTITIONED across the proxies.

So say you saw a few scan attempts:

attacker.1 -> local.1:22
attacker.2 -> local.1:22
attacker.1 -> local.2:22
attacker.3 -> local.1:80
attacker.4 -> local.1:23
attacker.2 -> local.3:80
attacker.1 -> local.3:22

To do scan detection they need to be grouped into a table like this:

attacker.1 -> [local.1:22, local.2:22, local.3:22]
attacker.2 -> [local.1:22, local.3:80]
attacker.3 -> [local.1:80]
attacker.4 -> [local1:23]

This is how scan-ng, simple-scan work now.
Currently this needs to be aggregated in a single place (the manager) but with publish_hrw, bro
will do something like this:

hash(attacker.1) == proxy-1
hash(attacker.2) == proxy-2
hash(attacker.3) == proxy-2
hash(attacker.4) == proxy-1

Which will distribute the data across the proxies and you will end up with:

Data stored on proxy-1:
attacker.1 -> [local.1:22, local.2:22, local.3:22]
attacker.4 -> [local1:23]

Data stored on proxy-2:
attacker.2 -> [local.1:22, local.3:80]
attacker.3 -> [local.1:80]

Each proxy will have a consistent view of a single scanner but not need to store all data
for all scanners.