Broker issue when in clustered mode

Hello,

I have a Zeek script that publishes a couple of different topics using
the Zeek Broker. I've tested this on Zeek 3.1.3. I followed the Python
bindings guide here:
https://docs.zeek.org/projects/broker/en/current/python.html and it
works so long as Zeek isn't in clustered mode. This is my zeek_init():

event zeek_init()
{
        if (SNIFFPASS::broker_enable)
        {
            Broker::listen("127.0.0.1", "9999");
            Broker::auto_publish("/sniffpass/credentials_seen",
SNIFFPASS::credentials_seen);
            Broker::auto_publish("/sniffpass/credentials_seen",
SNIFFPASS::credentials_seen_detailed);
        }
}

When I try running this in cluster mode on the same machine, it fails.
This is because the manager and workers attempt to listen on the same
IP and Port:

error in main.bro, line 160: Failed to listen on 127.0.0.1:9999
(Broker::listen(SNIFFPASS::broker_host, SNIFFPASS::broker_port,
Broker::default_listen_retry))
fatal error: errors occurred while initializing

I tried moving the Broker::listen to the manager only like this:

event zeek_init()
{
    if ( Cluster::is_enabled() && Cluster::local_node_type() ==
Cluster::MANAGER ) {
        Broker::listen(SNIFFPASS::broker_host, SNIFFPASS::broker_port);
    }

    Broker::auto_publish("/sniffpass/credentials_seen",
SNIFFPASS::credentials_seen);
    Broker::auto_publish("/sniffpass/credentials_seen",
SNIFFPASS::credentials_seen_detailed);
}

This now allows Zeek to now successfully start in clustered mode and
my Zeek script runs. My Python script connects to the manager on
localhost:9999 successfully, but doesn't receive any events from the
manager. This is the Python script I'm using for testing:

#!/bin/env python3

import broker
import sys

# Setup endpoint and connect to Zeek.
ep = broker.Endpoint()
sub = ep.make_subscriber("/sniffpass/credentials_seen")
ss = ep.make_status_subscriber(True);
ep.peer("127.0.0.1", 9999)

# Wait until connection is established.
st = ss.get()

if not (type(st) == broker.Status and st.code() == broker.SC.PeerAdded):
    print("could not connect")
    sys.exit(0)

while True:
    print("Connected!")
    (t, d) = sub.get()
    event = broker.zeek.Event(d)
    print("received {}{}".format(event.name(), event.args()))

I would assume it has to do with the Manager not relaying the messages
from the broker, but I can't quite figure out how to get this working.

My full Zeek script is up here:
https://github.com/cybera/zeek-sniffpass/blob/master/scripts/main.bro

Any insight into how to do this properly would be greatly appreciated.

Thanks in advance!

Thinking a little bit more about this, I would assume the Manager
would need to subscribe to that topic from the workers, and then
forward those so my Python subscriber could pick them up. I tried
this:

        if ( Cluster::is_enabled() && Cluster::local_node_type() ==
Cluster::MANAGER ) {
                Broker::listen("127.0.0.1", "9999");
                Broker::subscribe("/sniffpass/credentials_seen");
                Broker::forward("/sniffpass/credentials_seen");
        }
        else if ( Cluster::is_enabled() && Cluster::local_node_type()
== Cluster::WORKER ) {
                Broker::auto_publish("/sniffpass/credentials_seen",
SNIFFPASS::credentials_seen);
        }

This still results in no messages being published to my Python subscriber.

I'll continue researching :slight_smile:

Andrew

Likely that latest example may work if you just enabled forwarding:

    redef Broker::forward_messages = T;

Zeek has auto-forwarding disabled by default since it's easy (at the
moment) to set up unintentional routing loops and so it becomes a bit
safer to explicitly "forward" events yourself. e.g. handle the event
on the manager and then call Broker::publish() to pass the event along
one more hop to direct peers that also subscribe to that topic.

Also, if the Zeek manager isn't going to do anything with the event
other than forward it and you wanted to cut it out entirely as a
middle-man, you may or may not find it acceptable to do an entirely
different peering scheme such that the Zeek works are always direct
peers of the Python script (either have the Python-side listen() and
workers connection, or each worker listen() and the Python-side peers
with each).

- Jon

Thanks for your help Jon.

I like the idea of the workers connecting directly to the Python peer instead.

I was able to get it working so long as my Python binding script is
listening on a socket when I restart the workers. This is due to the
Broker::peer being called at zeek_init().

I'll need to find how to get the Workers to reconnect if they can't
connect at zeek_init().

Andrew

The `Broker::peer()` function can take an argument to control how
frequently a failed connection gets automatically retried. May work
if you wanted to set that lower than the default of 30 seconds, like 1
second or even less. Related docs:

https://docs.zeek.org/en/current/scripts/base/frameworks/broker/main.zeek.html#id-Broker::peer
https://docs.zeek.org/en/current/scripts/base/frameworks/broker/main.zeek.html#id-Broker::default_connect_retry

- Jon

Perfect, that helps a ton! My Python Broker listener is working
perfectly in cluster mode now.

Thanks for your help Jon.