Rethinking Broker's blocking API

Broker's current API to receive messages is as follows:

    context ctx;
    auto ep = ctx.spawn<blocking>();
    ep.receive([&](const topic& t, const data& x) { .. });
    ep.receive([&](const status& s) { .. });

or the last two in one call:

    ep.receive(
      [&](const topic& t, const data& x) { .. },
      [&](const status& s) { .. }
    );

The idea behind this API is that it's similar to the non-blocking
endpoint API:

    auto ep = ctx.spawn<nonblocking>();
    ep.subscribe([=](const topic& t, const data& x) { .. });
    ep.subscribe([=](const status& s) { .. });

Non-blocking endpoints should be the default, because they are more
efficient due to the absence of blocking. For simplicity, the current
API also provides a non-lambda overload of receive:

    auto ep = ctx.spawn<blocking>();
    auto msg = ep.receive();
    std::cout << msg.topic() << " -> " << msg.data() << std::endl;

Users can also check the mailbox of the blocking endpoint whether it
contains a message:

    // Only block if know that we have a message.
    if (!ep.mailbox().empty())
      auto msg = ep.receive();

What I haven't considered up to now is the interaction of data and
status messages in the blocking API. Both broker::message and
broker::status are messages that linger the endpoint's mailbox. I find
the terminology confusing, because a status instance is technically also
a message. I'd rather speak of "data messages" and "status messages" as
opposed to "messages" and "statuses". But more about the terminology
later.

There's a problem with the snippet above. If the mailbox is non-empty
because it contains a status message, the following call to receive()
would hang, because it expects a data message. The only safe solution
would be to use this form:

    if (!ep.mailbox().empty())
      ep.receive(
        [&](const topic& t, const data& x) { .. },
        [&](const status& s) { .. }
      );

The problem lies in the receive() function that returns a message. It
doesn't match the current architecture (a blocking endpoint has a single
mailbox) and is not a safe API for users.

Here are some solutions I could think of:

    (1) Let receive() return a variant<message, status> instead, because
        the caller cannot know a priori what to expect. While simple to
        call, it burdens the user with type-based dispatching afterwards.
    
    (2) Specify the type of message a user wants to receive, e.g.,

          auto x = ep.receive<message>();
          auto y = ep.receive<status>();

        Here, don't like the terminology issues I mentioned above. More
        reasonable could be

          auto x = ep.receive<data>();
          auto y = ep.receive<status>();

        where x could have type data_message with .topic() and .data(),
        and y be a direct instance of type status.

        But because callers don't know whether they'll receive a status
        or data message, this solution is only an incremental
        improvement.

    (3) Equip blocking endpoints with separate mailboxes for data and
        status messages. In combination with (2), this could lead to
        something like:

            if (!ep.mailbox<data>().empty())
              auto msg = ep.receive<data>();

            if (!ep.mailbox<status>().empty())
              auto s = ep.receive<status>();

        But now users have to keep track of two mailboxes, which is more
        error-prone and verbose.

    (4) Drop status messages unless the user explicitly asks for them.
        Do not consider them when working with an endpoint's mailbox,
        which only covers data messages.

        While keeping the semantics of ep.receive() simple, it's not
        clear how to poll for status messages. Should they just
        accumulate in the endpoint and be queryable? E.g.,:

            // Bounded? Infinite?
            const std::vector<status>& xs = ep.statuses();

Ultimately, I think it's important to have consistent API of blocking
and non-blocking endpoints. Any thoughts on how to move forwards would
be appreciated.

    Matthias

Ultimately, I think it's important to have consistent API of blocking
and non-blocking endpoints. Any thoughts on how to move forwards would
be appreciated.

Nice summary of the challenge! I agree that none of the options you
list sound really appealing. Here's an alternative idea: could we
change your option 1 (the variant) into always returning *both*, i.e.,
tuple<status, message>? To make that work, we'd add an additional
(say) GOT_MESSAGE status. Each time receive() gets called, it returns
whatever's internally next on deck: either a message (with status set
to GOT_MESSAGE), or a "real" status (with the tuple's message set to
null). The caller would then check the status first:

    next = ev.receive()

    if ( next.status() == GOT_MESSAGE )
        process_message(next.msg());
    else {
        // Status/error handling here.
    }

This would actually align pretty nicely with how blocking APIs
normally operate: returning potential errors directly with the call.
And having the client code check for errors before using the result
feels natural to me. (And it's actually same approach we discussed for
the Bro-side API if we had tuples there. :slight_smile:

What do you think?

Robin

Nice summary of the challenge! I agree that none of the options you
list sound really appealing. Here's an alternative idea: could we
change your option 1 (the variant) into always returning *both*, i.e.,
tuple<status, message>?

You pushed me into a new direction. Broker already returns expected<T>
for operations frequently (e.g., for blocking operations with data
store) that yield either a T or a broker::error (which is just an alias
for caf::error). How about we get rid of statuses entirely? Here are the
current status enum values:

    enum status_info : uint8_t {
      unknown_status = 0,
      peer_added, ///< Successfully peered
      peer_removed, ///< Successfully unpeered
      peer_incompatible, ///< Version incompatibility
      peer_invalid, ///< Referenced peer does not exist
      peer_unavailable, ///< Remote peer not listening
      peer_lost, ///< Lost connection to peer
      peer_recovered, ///< Re-gained connection to peer
    };

And the error values:

    enum class ec : uint8_t {
      /// The unspecified default error code.
      unspecified = 1,
      /// Version mismatch during peering.
      version_incompatible,
      /// Master with given name already exist.
      master_exists,
      /// Master with given name does not exist.
      no_such_master,
      /// The given data store key does not exist.
      no_such_key,
      /// The operation expected a different type than provided
      type_clash,
      /// The data value cannot be used to carry out the desired operation.
      invalid_data,
      /// The storage backend failed to execute the operation.
      backend_failure,
    };

Clearly, this could be merged together. This would yield a natural
API for receive:

    expected<message> receive();

To be used as follows:

    auto msg = ep.receive();
    if (msg)
      return f(*msg); // unbox contained message
    switch (msg.error()) {
      default:
        cout << to_string(msg.error()) << endl;
        break;
      case status::peer_added:
        cout << "got new peer: " << msg.context() << endl;
        break;
      case status::peer_lost::
        break;
    }

This is pretty much what you suggested, Robin, just with a slight
syntactical twist. The only downside I see is that "msg.error()" could
be misleading, as we're sometimes not dealing with an error on the
Broker framework level, just with respect to the call to
expected<message>. That is, the error is that we didn't get a message,
which we expected, but something that is a status change in the global
topology, such as a new peer.

    Matthias

    expected<message> receive();

Yeah, I like that, except for one concern:

switch (msg.error()) {
      default:
        cout << to_string(msg.error()) << endl;
        break;
      case status::peer_added:
        cout << "got new peer: " << msg.context() << endl;
        break;
      case status::peer_lost::
        break;
    }

I think the name "error" is not just misleading but would also turn
out tricky to use correctly. In that switch statement, if the default
case is to handle only (real) errors, one would need to fully
enumerate all the status:* messages so that they don't arrive there
too. More generally: the distinction between errors that signal
trouble with the connection and expected state changes doesn't come
through here.

What do you think about adding two methods instead of one to allow
differentiating between status updates and errors explicitly: status()
returns a joined result code like your error(); and failed() returns a
boolean indicating if that status reflects an error situation:

    auto msg = ep.receive();

    if (msg)
        return f(*msg); // unbox contained message

    if (msg.failed())
        cout << "Trouble: " << to_string(msg.status()) << endl;
    else
        cout << "Status change: " << to_string(msg.status()) << endl;

In either case one could then also use a switch to differentiate the
status() further.

Robin

I think the name "error" is not just misleading but would also turn
out tricky to use correctly.

Agreed.

    auto msg = ep.receive();

    if (msg)
        return f(*msg); // unbox contained message

    if (msg.failed())
        cout << "Trouble: " << to_string(msg.status()) << endl;
    else
        cout << "Status change: " << to_string(msg.status()) << endl;

In either case one could then also use a switch to differentiate the
status() further.

I think this is semantically what we want. Because broker::error is just
a type alias for caf::error (and broker::expected just caf::expected),
it's currently not possible to change the API of those classes. I see
two solutions, one based on the existing vehicles and one that
introduces a new structure with three states for T, error, and status.

Here's the first. A caf::error class has three interesting member
functions:

  class error {
    uint8_t code();
    atom_value category();
    const message& context();
  };

The member category() returns the type of error. In Broker, this is
always "broker". CAF errors have "caf" as category. We could simply
split the "broker" error category into "broker-status" and
"broker-error" to distinguish the error class. We would keep the two
different enums and provide free functions for a user-friendly API.
Example:

  // -- Usage -------------------------------------------

  auto msg = ep.receive(); // expected<message>

  if (msg) {
    f(*msg); // unbox message
  } else if (is_error(msg)) {
    cout << "error: " << to_string(msg.error()) << endl;
    if (msg == error::type_clash)
      // dispatch concrete error
  } else if (is_status(msg)) {
    cout << "status: " << to_string(msg.error()) << endl;
    if (msg == status::peer_added)
      // dispatch concrete status
  } else {
    // CAF error
  }

  // -- Broker implementation ---------------------------

  using failure = caf::error;

  enum status : uint8_t { /* define status codes * /};

  enum error : uint8_t { /* define error codes */ };

  bool is_error(const failure& f) {
    return f.category() == atom("broker-error");
  }

  bool is_status(const failure& f) {
    return f.category() == atom("broker-status");
  }

  template <class T>
  bool failed(const expected<T>& x) {
    return !x && failed(x.error());
  }

  template <class T>
  bool operator==(const expected<T>& x, status s) {
    return !x && x.error() == s;
  }

The only downside here is that we're still calling msg.error() in the
case of a status. That's where the second option comes in. Let's call it
result<T> for now (it won't matter much, because most people will use it
with "auto"). A result<T> wraps an expected<T> and provides a nicer API
to distinguish errors and status more cleanly. Example:

  // -- Usage -------------------------------------------

  auto msg = ep.receive(); // result<message>

  if (msg) {
    f(*msg); // unbox T
  } else if (auto e = msg.error()) {
    cout << "error: " << to_string(*e) << endl;
    if (*e == error::type_clash)
      // dispatch concrete error
  } else if (auto s = msg.status()) {
    cout << "status: " << to_string(*s) << endl;
    if (*s == status::peer_added)
      // dispatch concrete status
  } else {
    assert(!"not possible");
  }

  // -- Broker implementation ---------------------------

  enum status : uint8_t { /* define status codes * /};

  enum error : uint8_t { /* define error codes */ };

  template <class T>
  class result {
  public:
    optional<caf::error> status() const;
    optional<caf::error> error() const;

    // Box semantics
    explicit operator bool() const;
    T& operator*();
    const T& operator*() const;
    T* operator->();
    const T* operator->() const;

  private:
    expected<T> x_;
  };

Thoughts?

    Matthias

I'm going back and forth between the two versions. I think I'd take
the 2nd (the custom class), though maybe with the API I had used in my
example instead (i.e, msg.failed() and then a single status() method
for both cases) as having two methods error/status returning the same
thing looks a bit odd. But not a big deal either way, any of these
options sounds fine to me.

Robin

But not a big deal either way, any of these options sounds fine to me.

This is the synopsis for the slightly adapted message class, no other
changes:

    class message {
    public:
      /// Checks whether a message is a (topic, data) pair.
      /// @returns `true` iff the message contains a (topic, data) pair.
      explicit operator bool() const;

      /// @returns the contained topic.
      /// @pre `static_cast<bool>(*this)`
      const broker::topic& topic() const;

      /// @returns the contained topic.
      /// @pre `static_cast<bool>(*this)`
      const broker::data& data() const;

      /// @returns the contained status.
      /// @pre `!*this`
      const broker::status& status() const;
    };

I opted against a .failed() member in broker::message, because it's up
to the concrete status instance to define failure or a mere status
change. A message is now either a (topic, data) pair or a status
instance.

To distinguish between the two status, I use operator bool. Since this
is almost the same as a failed() method, I have one last idea: instead
of returning const-references to topic, data, and instances. We could
simply return non-owning const-pointers, which are non-null if and only
if the respective type is active:

    if (auto data = msg.data())
      f(*data)
    else
      g(*msg.status())

I've also updated how to do status/error handling. See the documentation
at http://bro.github.io/broker/comm.html#status-and-error-handling for
details.

    Matthias

To distinguish between the two status, I use operator bool.

I don't see that in the "status and error handling" section. Would I
do "if (! status) { <error stuff> }"? That doesn't seem quite
intuitive. I think a method with a descriptive name would be better
here.

    if (auto data = msg.data())
      f(*data)
    else
      g(*msg.status())

For this I think I prefer the boolean version: "if (msg) { <use
msg.data() })".

Robin

> To distinguish between the two status, I use operator bool.

I don't see that in the "status and error handling" section. Would I
do "if (! status) { <error stuff> }"? That doesn't seem quite
intuitive. I think a method with a descriptive name would be better
here.

Sorry, that was misleading. Statuses don't provide an operator bool.
They could, however, to distinguish them from errors.

> if (auto data = msg.data())
> f(*data)
> else
> g(*msg.status())

For this I think I prefer the boolean version: "if (msg) { <use
msg.data() })".

Okay, I'll switch that over. I prefer the other way, though :-).

    Matthias

Sorry, that was misleading. Statuses don't provide an operator bool.
They could, however, to distinguish them from errors.

Mulling over this more, I'm not sure if it's clear which status codes
constitute an error. For example, there's a peer_lost and peer_recovered
status code. Is only the first an error? Some users may consider peer
churn normal.

Here's the list of all status codes:

    enum class sc : uint8_t {
      /// The unspecified default error code.
      unspecified = 1,
      /// Successfully added a new peer.
      peer_added,
      /// Successfully removed a peer.
      peer_removed,
      /// Version incompatibility.
      peer_incompatible,
      /// Referenced peer does not exist.
      peer_invalid,
      /// Remote peer not listening.
      peer_unavailable,
      /// An peering request timed out.
      peer_timeout,
      /// Lost connection to peer.
      peer_lost,
      /// Re-gained connection to peer.
      peer_recovered,
      /// Master with given name already exist.
      master_exists,
      /// Master with given name does not exist.
      no_such_master,
      /// The given data store key does not exist.
      no_such_key,
      /// The store operation timed out.
      request_timeout,
      /// The operation expected a different type than provided
      type_clash,
      /// The data value cannot be used to carry out the desired operation.
      invalid_data,
      /// The storage backend failed to execute the operation.
      backend_failure,
    };

If we provided operator bool() for statuses, then it would be true for
peer_added, peer_removed_, peer_recovered, and false for all others.
This selection seems arguable to me, which is why I'm inclined to let
users probe for specific instances themselves.

    Matthias

I see the challenge but I think we need some way to differentiate
serious errors from expected updates, otherwise we're back at writing
switch statements that need to comprehensively list all cases. One can
always post-filter if, e.g., one does consider status X not an error
even though it's flagged as such. Instead of a binary error yes/no,
what about levels along these lines: (1) Error: *we* did something
seriously wrong; (2) Warning: something's seems off, including
problems with peers; and (3) Info: just an update on activity. It's
not clear-cut of course but it would still be good to have some
default classification for cases that one doesn't handle directly (and
if it's only for then logging as error/warning/info). One could then
also compare the level directly with the status object: "if ( status
== ERROR ) ..."

Robin

I see the challenge but I think we need some way to differentiate
serious errors from expected updates, otherwise we're back at writing
switch statements that need to comprehensively list all cases.

I agree that writing switch statements is not very productive. From a
user perspective, it's important to distinguish whether I can ignore
a status update, or whether I have to react to it. I would consider more
fine-grained classifications arbitrary. In particular, I find 3 levels
too complex. Users will wonder "wait, was this an info or a warning?"
and then have to go back to the documentation.

How about making it explicit by providing two types of codes, status and
errors:

    enum class sc : uint8_t {
      unspecified = 1,
      peer_added,
      peer_removed,
      peer_recovered,
    };

    enum class ec : uint8_t {
      unspecified = 1,
      peer_incompatible,
      peer_invalid,
      peer_unavailable,
      peer_lost,
      peer_timeout,
      master_exists,
      no_such_master,
      ...
    };

This is close to the original design, except that the peer_* codes are
now split into errors and status, and that we still have a single status
class to combine them both:

    auto s = msg.status();
    if (s.error())
      // Compare against error codes for details.
    else
      // Compare against status codes for details.

Or simpler: avoiding the explicit distinction of error and status codes
and let the function status::error() return true for what we deem
actionable errors.

    Matthias

From a user perspective, it's important to distinguish whether I can
ignore a status update, or whether I have to react to it.

Yep, exactly.

I would consider more fine-grained classifications arbitrary. In
particular, I find 3 levels too complex.

Well, weren't you just saying that two levels are arbitrary? :slight_smile: I
tried to make it a bit less arbitrary by providing more options. Your
new suggestion goes to back yes/no in terms of whether it's an error
or not, which I think is ok, but you seemed concerned about that.

auto s = msg.status();
   if (s.error())
     // Compare against error codes for details.
   else
     // Compare against status codes for details.

I like this (with the two separate types for error/status code, as
that makes the distinction explicit).

Could we then now also lift the error() method into the message class?
So "if (msg.error())" would be a shortcut for
"if(msg.status().error()? (And then we'd back be where we started I
believe. :slight_smile:

Btw, there's one more complexity with all this: when one gets a
status, one cannot tell which operation it belongs to, as it depends
on how thing got queued and processed internally. That can make it
hard to react to something more specifically. But that's something we
can't avoid with the asynchronous processing.

Robin

Could we then now also lift the error() method into the message class?
So "if (msg.error())" would be a shortcut for
"if(msg.status().error()? (And then we'd back be where we started I
believe. :slight_smile:

Done.

And yes, we've done the full circle. :wink: But at least we're in full
agreement now. Sometimes converging takes a little longer.

Btw, there's one more complexity with all this: when one gets a
status, one cannot tell which operation it belongs to, as it depends
on how thing got queued and processed internally. That can make it
hard to react to something more specifically. But that's something we
can't avoid with the asynchronous processing.

I think we can address this. First, the status code tells roughly what
went wrong. Second, a status has a context<T>() method that returns
additional information when available. For example, all peering-related
statuses include a broker::endpoint_info instance to figure out exactly
what peer created the issue. If we need the fine granularity, all we
need is to add the necessary context information. (This is enforced at
compile time, by the way; for a given status code, the context is either
a T or not present, but not of a different type.)

    Matthias