Broker raw throughput

I'm taking a discussion about Broker performance public which we
previously had on our internal mailing list. Below, I'm pasting in
Dominik's answer to my email. Dominik, we've white-listed your email
address on bro-dev, but free to subscribe there to get future responses.
Apologies for the inconvenience this may have caused on your end.

While I'm at it: there's small but important difference:

    gperf != gperftools

The former is GNU profiler whereas the latter Google's instrumentation
framework. Unfortunately the naming jungle out there is convoluted.

    Matthias

(Context for Dominik: we are measuring the maximum throughput CAF can
deliver over a TCP connection and find it performs and order of
magnitude below 0mq.)

I've attempted to reproduce the same benchmark with pure CAF, without
Broker, and can confirm the same order-of-magnitude difference. In fact,
the rates I observe with CAF are in the same range as Justin's
measurements.

We can get one level deeper by using brokers (I mean caf::broker actors)
directly to get rid of serialization entirely. I don't think that would
change the performance much, but it makes analyzing profiler output a
bit easier by removing unrelated components and functions from the
graph. I'm going to write a small test program for CAF in the next
couple of days.

From looking at the plots, I suspect that the client is the bottleneck,
which spends a third of its cycles in the function _sendto via
caf::io::network::stream::handle_event. The multiplexer and BASP broker
share the rest of CPU time.

Dominik, does this make sense to you? Have you performed similar
measurements in the past?

So far we tested primarily in comparison to other messaging frameworks
for specific test cases such as distributed computations with CAF vs.
MPI/Scala/Erlang. However, I ran a small benchmark comparing raw
throughput measured via iperf to a CAF setup a while ago and noticed
that receive performance was ok, but send performance was lacking. This
corresponds to your findings, but I don't remember it being factor 5-6
worse.

Thanks for sending me the gperf graphs. I will come back to you after
running a test series under Linux and digging through the code a bit.

    Dominik

----- Forwarded message from Matthias Vallentin <vallentin@icir.org> -----

Sender: Matthias Vallentin <matthias@vallentin.net>

caf-client.cpp (811 Bytes)

caf-server.cpp (1.23 KB)

And more from the other mailing list, I had given some flame graphs of the client and server running..

client.svg

server.svg

For better reproducibility, here's the Makefile that I used to drive the
experiments:

    CC = cc
    CXX = c++
    FLAGS = -O3 -g -std=c++11 -stdlib=libc++
    LIBS = -lcaf_core -lcaf_io -ltcmalloc -lprofiler

    caf-client: caf-client.cpp
        $(CXX) $(FLAGS) $< -o $@ $(LIBS)

    caf-server: caf-server.cpp
        $(CXX) $(FLAGS) $< -o $@ $(LIBS)

    bench-caf-client:
        CPUPROFILE=caf-client.prof ./caf-client 1000

    bench-caf-server:
        CPUPROFILE=caf-server.prof ./caf-server 10

    bench-caf-pprof: caf-client.prof caf-server.prof
        pprof --pdf caf-client caf-client.prof > caf-client.pdf
        pprof --pdf caf-server caf-server.prof > caf-server.pdf

On my FreeBSD box, I had to add /usr/local/include to -I and -L, because
I installed CAF and gperftools via ports. Since it's a headless machine
without ps2pdf, we need extra level of indirection:

    (1) pprof --raw caf-client caf-client.prof > caf-client.raw
    (2) copy raw profile to desktop
    (3) pprof --pdf caf-client.raw > caf-client.pdf

Hope this helps,

    Matthias

Thanks for providing build scripts and sharing results.

Just a quick heads-up from me: I have implemented a simple sender/receiver pair using C sockets as well as CAF brokers (attached, but works only with the current actor-system topic branch). Both sending and receiving are slower with CAF (as expected), although the performance is slightly better when using the ASIO backend [1]. I'm still investigating and hopefully come back to you guys later this week.

    Dominik

[1] e.g. ./caf_impl --caf#middleman.network-backend=asio -s

caf_impl.cpp (2.98 KB)

Makefile (384 Bytes)

native_impl.cpp (3.21 KB)

With most noise like serialization etc. out of the way, this is what I measured on Linux:

native sender → native receiver

567520085 Bytes/s

CAF sender → native receiver
511333973 Bytes/s

native sender → CAF receiver
229689173 Bytes/s

CAF sender → CAF receiver
222102755 Bytes/s

Send performance is OK, but performance drops significantly once CAF is used at the receiver. The profiler output (attached) doesn’t point to a particular function that consumes an inappropriate amount of time. So it’s either the sum of the many little functions called for each received chunk or the epoll_wait loop itself.

I have created a ticket for further progress tracking / discussion [1] as this is clearly not a Bro/Broker problem. Thank you all for reporting this and all the input you have provided.

@Matthias: FYI, I have used a new feature in CAF that allows senders to get feedback from the I/O layer for not overloading it. This allows the sender to adapt to the send rate of the network.

Dominik

[1] https://github.com/actor-framework/actor-framework/issues/432

caf-client.pdf (24.2 KB)

caf-server.pdf (25 KB)

@Matthias: FYI, I have used a new feature in CAF that allows senders
to get feedback from the I/O layer for not overloading it. This allows
the sender to adapt to the send rate of the network.

Great, it sounds like this would fix the stall/hang issues. I expect to
port Broker to the actor-system branch by the end of the month.

    Matthias

I have created a ticket for further progress tracking / discussion [1]
as this is clearly not a Bro/Broker problem. Thank you all for
reporting this and all the input you have provided.

It's good to see the new commit improves performance. But I want to take
again the perspective of Broker, where we're measuring throughput in
number of messages per second. Before the changes, we could blast around
80K messages/sec through two remotely connected CAF nodes. After your
changes, I am now measuring peak rate of up to 190K/sec on my FreeBSD
box. That's more than double. Really cool! But: the benchmark no longer
terminates and the server quickly stops getting data, and I would like
to know why. Here is the modified actor-system code:

    // Client
    using namespace caf;
    using namespace caf::io;
    using namespace std;

    int main(int argc, char** argv) {
      actor_system_config cfg{argc, argv};
      cfg.load<io::middleman>();
      actor_system system{cfg};
      auto server = system.middleman().remote_actor("127.0.0.1", 6666);
      cerr << "connected to 127.0.0.1:6666, blasting out data" << endl;
      auto i = 0;
      scoped_actor self{system};
      self->monitor(server);
      for (auto i = 0; i < 1000000; ++i)
        self->send(server, i++);
      self->receive(
        [&](down_msg const& msg) {
          cerr << "server terminated" << endl;
        }
      );
      self->await_all_other_actors_done();
    }

    // Server
    using namespace caf;
    using namespace caf::io;
    using namespace std;
    using namespace std::chrono;

    CAF_ALLOW_UNSAFE_MESSAGE_TYPE(high_resolution_clock::time_point)

    behavior server(event_based_actor* self, int n = 10) {
      auto counter = make_shared<int>();
      auto iterations = make_shared<int>(n);
      self->send(self, *counter, high_resolution_clock::now());
      return {
        [=](int i) {
          ++*counter;
        },
        [=](int last, high_resolution_clock::time_point prev) {
          auto now = high_resolution_clock::now();
          auto secs = duration_cast<seconds>(now - prev);
          auto rate = (*counter - last) / static_cast<double>(secs.count());
          cout << rate << endl;
          if (rate > 0 && --*iterations == 0) // Count only when we have data.
            self->quit();
          else
            self->delayed_send(self, seconds(1), *counter, now);
        }
      };
    }

I invoke the server as follows:

  CPUPROFILE=caf-server.prof ./caf-server --caf#scheduler.scheduler-max-threads=4

And the client like this:

  CPUPROFILE=caf-client.prof ./caf-client --caf#scheduler.scheduler-max-threads=4 --caf#scheduler.max-throughput=10000

I've tried various parameters for the scheduler throughput, but they do
not seem to make a difference. Would you mind taking a look at what's
going on here? It looks like the "sender overload protection" you
mentioned is not working as expected.

I'm also attaching a new gperftools profiler output from the client and
server. The server is not too telling, because it was spinning idle for
a bit until I ran the client, hence the high CPU load in nanosleep.
Looking at the client, it seems that only 67.3% of time is spent in
local_actor::resume, which would mean that the runtime adds 33.7%
overhead. That's not correct, because gperftools cannot link the second
tree on the right properly. (When compiling with -O0 instead of -O3, it
looks even worse.) Still, why is intrusive_ptr::get consuming 27.9%?

Looking on the left tree, it looks like this workload stresses the
allocator heavily:

    - 20.4% tc_malloc_skip_new_handler
    - 7% std::vector::insert in the BASP broker
    - 13.5% CAF serialization (adding two out-edges from
            basp::instance::write, 5.8 + 7.5)

Perhaps this helps you to see some more optimization opportunities.

Switching gears to your own performance measurements: it sounded like
that you got gains at the order 400% when comparing just raw byte
throughput (as opposed to message throughput). Can you give us an
intuition how that relates to the throughput measurements we have been
doing?

    Matthias

caf-client-freebsd.pdf (16 KB)

caf-server-freebsd.pdf (17.7 KB)

the benchmark no longer
terminates and the server quickly stops getting data, and I would like
to know why.

I'll have a look at it.

I've tried various parameters for the scheduler throughput, but they do
not seem to make a difference. Would you mind taking a look at what's
going on here?

The throughput parameter does not apply to network inputs, so you only modify how many integers per scheduler run the server receives. You could additionally try to tweak caf#middleman.max_consecutive_reads, which configures how many new_data_msg messages a broker receives from the backend in a single shot. It makes sense to have the two separated, because one configures fairness in the scheduling and the other fairness of connection multiplexing.

It looks like the "sender overload protection" you
mentioned is not working as expected.

The new feature "merely" allows (CAF) brokers to receive messages from the backend when data is transferred. This basically uplifts TCP's backpressure. When blindly throwing messages at remote actors, there's nothing CAF could do about it. However, the new broker feedback will be one piece in the puzzle when implementing flow control in CAF later on.

I'm also attaching a new gperftools profiler output from the client and
server. The server is not too telling, because it was spinning idle for
a bit until I ran the client, hence the high CPU load in nanosleep.
Looking at the client, it seems that only 67.3% of time is spent in
local_actor::resume, which would mean that the runtime adds 33.7%
overhead.

The call to resume() happens in the BASP broker which dumps the messages to its output buffer. So the 67% load include serialization, etc. 28.3% of the remaining load are accumulated in main().

Still, why is intrusive_ptr::get consuming 27.9%?

The 27.9% is accumulating all load down the path, isn't it? intrusive_ptr::get itself simply returns a pointer: https://github.com/actor-framework/actor-framework/blob/d5f43de65c42a74afa4c979ae4f60292f71e371f/libcaf_core/caf/intrusive_ptr.hpp#L128

Looking on the left tree, it looks like this workload stresses the
allocator heavily:

   - 20.4% tc_malloc_skip_new_handler
   - 7% std::vector::insert in the BASP broker
   - 13.5% CAF serialization (adding two out-edges from
           basp::instance::write, 5.8 + 7.5)

Not really surprising. You are sending integers around. Each integer has to be wrapped in a heap-allocated message which gets enqueued to an actor's mailbox. By using many small messages, you basically maximize the messaging overhead.

Switching gears to your own performance measurements: it sounded like
that you got gains at the order 400% when comparing just raw byte
throughput (as opposed to message throughput). Can you give us an
intuition how that relates to the throughput measurements we have been
doing?

At the lowest level, a framework like CAF ultimately needs to efficiently manage buffers and events provided by the OS. That's the functionality of recv/send/poll/epoll and friends. That's what I was looking at, since you can't get good performance if you have problems at that level (which, as it turned out, CAF had).

Moving a few layers up, some overhead is inherent in a messaging framework. Stressing the heap (see 20% load in tc_malloc_skip_new_handler) when sending many small messages, for example.

From the gperf output (just looking at the client), I don't see that much CPU time spent in CAF itself. If I sum up CPU load from std::vector (6.2%), tcmalloc (20.4%), atomics (8%) and serialization (12.4%), I'm already at 47% out of 70% total for the multiplexer (default_multiplexer::run).

Pattern Matching (caf::detail::try_match) cause less than 6% CPU load, so that seems not to be an issue. Serialization has 12% CPU load, which probably mostly results from std::copy (cut out after std::function unfortunately). So, I don't see that many optimization opportunities in these components.

Tackling the "many small messages problem" isn't going to be easy. CAF could try to wrap multiple messages from the network into a single heap-allocated storage that is then shipped to an actor as a whole, but this optimization would have a high complexity.

That's of course just some thoughts after looking at the gperf output you provided. I'll hopefully have new insights after looking at the termination problem in detail.

    Dominik

You could additionally try to tweak
caf#middleman.max_consecutive_reads, which configures how many
new_data_msg messages a broker receives from the backend in a single
shot. It makes sense to have the two separated, because one configures
fairness in the scheduling and the other fairness of connection
multiplexing.

Good to know about this tuning knob. I played with a few values, from 1
to 1K, but could not find an improvement by tweaking this value alone.
Have you already performed some measurements to find the optimal
combination of parameters?

The 27.9% is accumulating all load down the path, isn't it?

Yeah, right, I must have confused the absolute vs. cumulative numbers in
this case. :-/

By using many small messages, you basically maximize the messaging
overhead.

Exactly. That is the worse-case scenario I'm trying to benchmark :-).

Tackling the "many small messages problem" isn't going to be easy. CAF
could try to wrap multiple messages from the network into a single
heap-allocated storage that is then shipped to an actor as a whole,
but this optimization would have a high complexity.

A common strategy to reduce high heap pressure involves custom
allocators, and memory pools in particular. Assuming that a single actor
produces a fixed number of message types (e.g., <= 10), one could create
one memory pool for each message type. What do you think about such a
strategy?

    Matthias

You could additionally try to tweak
caf#middleman.max_consecutive_reads, which configures how many
new_data_msg messages a broker receives from the backend in a single
shot. It makes sense to have the two separated, because one configures
fairness in the scheduling and the other fairness of connection
multiplexing.

Good to know about this tuning knob. I played with a few values, from 1
to 1K, but could not find an improvement by tweaking this value alone.
Have you already performed some measurements to find the optimal
combination of parameters?

I don't think there is an optimal combination for all use cases. You are always trading between fairness and throughput. The question is whether your application needs to stay responsive to multiple clients or if your workload is some form of non-interactive batch processing.

Any default value is arbitrary at the end of the day. As long as messages are distributed more-or-less evenly among actors and no actor receives hundreds of messages between scheduling cycles, the parameters don't matter anyway.

Tackling the "many small messages problem" isn't going to be easy. CAF
could try to wrap multiple messages from the network into a single
heap-allocated storage that is then shipped to an actor as a whole,
but this optimization would have a high complexity.

A common strategy to reduce high heap pressure involves custom
allocators, and memory pools in particular. Assuming that a single actor
produces a fixed number of message types (e.g., <= 10), one could create
one memory pool for each message type. What do you think about such a
strategy?

This is exactly what CAF does. A few years ago, this was absolutely necessary to get decent performance. Recently, however, standard heap allocators were getting much better (at least on Linux). You can build CAF with --no-memory-management to see if it makes a difference on BSD.

The optimization I meant is to not wrap each integer in its own message object, but rather make one message which then contains X integers that are transparently interpreted by the receiver as X messages. But this requires some form of "output queue" or lookahead mechanism.

    Dominik

The optimization I meant is to not wrap each integer in its own
message object, but rather make one message which then contains X
integers that are transparently interpreted by the receiver as X
messages. But this requires some form of "output queue" or lookahead
mechanism.

I can see that being a nice intrinsic performance gain. At this point,
we have too little experience with Broker to warrant such a specific
optimization, but I like this "SIMD approach" in general. I already have
some ideas regarding transparent compression/coding that could be
interesting to explore in the future.

    Matthias

Hi:

Forwarding reply to the bro-dev list: original was mistakenly posted elsewhere (sorry about that). Leaving original message content inline for context.

To me, the performance numbers themselves don't matter as much as
managing expectations does: should I *expect* to be able to pass all
of my events through broker?

This question depends on the event type and your concrete topology, and
is hard to answer in general. We can say "in our point-to-point test
scenario, our measurements show and upper bound of X events/sec for
a workload consisting of message type Y." As Broker gets more traction,
I assume we will get much more data points and a better understanding on
the performance boundaries.

Trying to express things a slightly different way, I was concerned
that the different numbers from the different libraries were being
interpreted as an apples-to-apples comparison. Modifying CAF to
achieve the same results as e.g. 0mq would, at some point and in some
way, eventually require modifying CAF to be more like 0mq. I don't
think that would be good, because 0mq and CAF aren't (and shouldn't
be, in my humble opinion) the same thing.

0mq/nanomsg are only a thin wrapper around a blob of bytes, whereas CAF
provides much more than that. However, I don't think the comparison we
did was unrealistic: we looked at the overhead of sending a stream of
simple (nearly empty) messages between two remote endpoints. This "dumbs
down" CAF to a point where we're primarily stressing the messaging
subsystem, without using much of the higher-level abstractions (CAF
still has to go through its serialization layer).

After Dominik's performance tweaks, the two libraries operate in the
same order of magnitude, which strikes me as reasonable. 0mq still
outperforms CAF in terms of maximum message rate for this specific
workload, but this is also not surprising at this point, because it has
received a lot of attention and optimizations over the past years
specifically targeting high-throughput scenarios.

    Matthias