Broker performance tuning

Hi,
I am using a zeek (6.1.1) script to send data to a python script using broker communication. The VM has 2 cpu cores … both zeek and python script running in both the cores. While Zeek is able to send or write data using Broker::publish(sync_topic, sync_cmd, c)

In the python endpoint, we are receiving messages at very slow pace. When I bind a separate core for python script, speed increases little bit but still lagging significantly.

So, for 250 message zeek is sending in 3 seconds where python endpoint receiving in 18-20 seconds …

How to debug the issue or tune the performance, so that python end point receives it faster.

I have tried with max_threads=16 in python script, but it doesn’t help much.

Details:
python side snippet

 cfg = broker.Configuration()
            cfg.max_threads = 16

            self.ep = broker.Endpoint(cfg)

  while True:
                statuses = self.ss.poll()
                for s in statuses:
                    if s.code() in (broker.SC.PeerLost, broker.SC.EndpointUnreachable):
                        print("Connection reinitialized.")
                        continue

                # Busy poll for a message or later status
                msg = self.sub.get(0.0000001)

                if msg is None:
                    continue
                (topic, msgData) = msg
                self.msgCounter += 1
                print(f"MsgCount:{self.msgCounter}")

In publisher side I am publishing message for each event I am calling
Broker::publish(sync_topic, sync_cmd, c); print "SENT CONNECTION info", c;
The pcap will produce 252 such events and it is taking 2-3 seconds time to send or publish the message, but the python poll is picking up messages late as you can see I am just polling and printing the message count … it is taking 18 seconds. What changes or tuning should I try to enhance the broker communication performance?

Thanks
Biswa

Thanks for the edit.
In the sender or publisher side I have also tried these … although sender is working fast. Not sure how to tune the broker enqueue and dequeue.
redef Broker::aggressive_polls = 4;
redef Broker::aggressive_interval = 1;
redef Broker::max_threads = 16;
redef Broker::scheduler_policy = "stealing";
This also doesn’t help.

Hi @awelzel can you please comment ?

Hey @biswa61 - I’ve tested with the latest Zeek master and a 6.1.1 container image (you should really update to 7.1) and event throughput appears fine. I’m getting ~6.3k events per second with the following test program:

# test.zeek
global hello: event(c: count);

event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
        {
        print "peer_added", endpoint, msg;
        }

event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string)
        {
        print "peer_lost", endpoint, msg;
        }

global c = 0;
event tick()
        {
        Broker::publish("/topic/test", hello, c);
        ++c;
        schedule 0.1msec { tick() };
        }

event zeek_init()
        {
        Broker::listen("127.0.0.1", 9999/tcp);
        event tick();
        }
#!/usr/bin/env python
# client.py
#
# export PYTHONPATH=/opt/zeek-dev-prod2/lib/zeek/python/
import broker

def main():
    print(broker)
    cfg = broker.Configuration()
    cfg.max_threads = 16

    with broker.Endpoint(cfg) as ep, \
        ep.make_subscriber("/topic/test") as sub, \
            ep.make_status_subscriber(True) as ss:
                print("peer")
                ep.peer("127.0.0.1", 9999)

                while True:
                    st = ss.poll()
                    if st:
                        print(st)

                    su = sub.get(0.0000001);
                    if su is None:
                        continue

                    print(su)


if __name__ == "__main__":
    main()
# Terminal 1
$ zeek test.zeek

# Terminal 2
$ taskset -c 1 python3 client.py | pv -l > /dev/null
43,3k 0:00:07 [6,32k/s] [                                                 <=>                                                                                                                                                                                                                                                                                                                ]

If you see drastically less, is this maybe a debug build? Do you use some custom Python version? Is the Python process running at 100% CPU? If so, could you profile it?

If you get also many thousand events per second, then the processing bottleneck is probably somewhere else.

Hope this helps,
Arne

1 Like

Oh…

Is the c here a connection record? Don’t do that :sweat_smile: The connection has a lot of state attached to it. It shouldn’t bring things to a halt, but if it’s sufficient to send c$uid or c$id, do that instead.

Ah, yes.

Broker::publish(sync_topic, sync_cmd, c); print "SENT CONNECTION info", c;

It might still make sense to use the posted scripts to test what throughput you’re seeing on your system, then see if steering away from publishing connection makes it much faster.

1 Like

Thanks for the reply. Yes, it is due to the connection parameter size which we are trying to send over the broker, but is there any way to optimize the broker performance even after using whole record connection structure?

Could you share your pcap? Are you running with base scripts only?

But, it’s probably much better to not send the whole connection record. Instead, introduce a new record type that contains just the fields that are needed by the Python application. Surely it can’t be everything?

1 Like

Yes This is PMU data for synchrophasor protocol … it is large in size. I was using below pcaps … filled required structures just below existing zeek connection record and sending it over broker.
pcap

FYI , this is the connection data which is being sent over the broker

`
DATA FRAME, [id=[orig_h=192.168.0.20, orig_p=36835/tcp, resp_h=192.168.0.241, resp_p=4712/tcp], orig=[size=18, state=4, num_pkts=3, num_bytes_ip=182, flow_label=0, l2_addr=00:09:6b:93:7b:83], resp=[size=134, state=4, num_pkts=2, num_bytes_ip=116, flow_label=0, l2_addr=00:30:a7:00:0d:3a], start_time=1218021007.698753, duration=3.0 msecs 777.980804 usecs, service={
SYNCHROPHASOR_TCP
}, history=ShADad, uid=C4HWQO2mg4iPUqg23e, tunnel=, vlan=, inner_vlan=, dpd=, dpd_state=, service_violation={

}, removal_hooks=, conn=, extract_orig=F, extract_resp=F, thresholds=, dce_rpc=, dce_rpc_state=, dce_rpc_backing=, dhcp=, dnp3=, dns=, dns_state=, ftp=, ftp_data_reuse=F, ssl=, http=, http_state=, irc=, krb=, ldap=, modbus=, mqtt=, mqtt_state=, mysql=, ntlm=, ntp=, quic=, radius=, rdp=, rfb=, sip=, sip_state=, snmp=, smb_state=, smtp=, smtp_state=, socks=, ssh=, synchrophasor_proto=tcp, synchrophasor=[ts=1218021007.70023, uid=C4HWQO2mg4iPUqg23e, id=[orig_h=192.168.0.20, orig_p=36835/tcp, resp_h=192.168.0.241, resp_p=4712/tcp], proto=, version={
1
}, data_stream_id={
241
}, history=2D, frame_size_min=18, frame_size_max=134, frame_size_tot=13724, data_frame_count=251, data_rate={
50
}], synchrophasor_cmd=, synchrophasor_hdr=, synchrophasor_cfg=[ts=1218021007.702531, uid=C4HWQO2mg4iPUqg23e, id=[orig_h=192.168.0.20, orig_p=36835/tcp, resp_h=192.168.0.241, resp_p=4712/tcp], proto=, frame_type=CFG2, frame_size=134, header_time_stamp=, cont_idx=0, pmu_count_expected=1, pmu_count_actual=1, data_rate=50, cfg_frame_id=cGETt54yO0A], synchrophasor_data=[ts=1218021012.705214, uid=C4HWQO2mg4iPUqg23e, id=[orig_h=192.168.0.20, orig_p=36835/tcp, resp_h=192.168.0.241, resp_p=4712/tcp], proto=, frame_type=Data, frame_size=54, header_time_stamp=, pmu_count_expected=1, pmu_count_actual=1, data_frame_id=dLKFec3SD7m], synchrophasor_cfg_detail=, synchrophasor_data_detail=[ts=1218021012.705214, uid=C4HWQO2mg4iPUqg23e, id=[orig_h=192.168.0.20, orig_p=36835/tcp, resp_h=192.168.0.241, resp_p=4712/tcp], proto=tcp, frame_type=Data, header_time_stamp=1217606735.12, data_frame_id=dLKFec3SD7m, pmu_idx=0, trigger_reason=0, unlocked_time=0, pmu_time_quality=0, data_modified=F, config_change=F, pmu_trigger_pickup=T, data_sorting_type=F, pmu_sync_error=F, data_error_indicator=0, est_rectangular_real=[126.005814, 133.931183, -86704.1875, 86583.65625], est_rectangular_imaginary=[-100044.539062, -100037.671875, 49918.132812, 50130.394531], est_polar_magnitude=[0.0, 0.0, 0.0, 0.0], est_polar_angle=[0.0, 0.0, 0.0, 0.0], freq_dev_mhz=0.0, rocof=0.0, analog_data=, digital=]]
`

Just to make sure: You’re “only” loading synchrophasor analyzer package and then attempt to publish the full c: connection record to Python. There’s no other scripts involved?

I’d still argue it’d be more efficient to create a separate record containing just the field that you need, but 18 seconds for 252 events does sound something might be quite slow.

Yes, actually for iec104, we have similar big structure, but performance is fine. Here in synchrophasor, there are lots of vector or set data, also floating numbers. but I think this shouldn’t create any issue in broker communication.
Yes, I will shorten the structure as you suggested and that would increase the performance. I have tried after adding few data in a custom record and saw line rate performance.
Thanks