Trouble passing a message between two local endpoints.

Hi all,

I’m trying to pass a message between two local endpoints, using only the C-interface in broker.h:

#include <stdio.h>

#include “broker.h”

int main (int argc, char* argv) {

broker_init(0);

broker_endpoint* ep1 = broker_endpoint_create_with_flags(“ep1”, 3);

broker_endpoint* ep2 = broker_endpoint_create_with_flags(“ep2”, 3);

broker_peering* p = broker_endpoint_peer_locally(ep2, ep1);

broker_string* bs = broker_string_create(“”);

broker_message_queue* q = broker_message_queue_create(bs, ep2);

broker_string* msg_str = broker_string_create(“Hello, World!\n”);

broker_string* topic = broker_string_create(“test”);

broker_data* msg = broker_data_from_string(msg_str);

broker_vector* vec = broker_vector_create();

int res = broker_vector_insert(vec, msg, 0L);

res = broker_endpoint_send(ep1, topic, vec);

broker_deque_of_message* msg_list = broker_message_queue_want_pop(q);

size_t num_msgs = broker_deque_of_message_size(msg_list);

printf(“There are %ld messages.\n”, num_msgs);

}

And I’m failing:

davids-air-2:broker-haskell dbanas$ ./a.out

There are 0 messages.

Does anyone see what I’m doing wrong?

Thanks!
-db

There’s a few race conditions to be aware of:

1) For the sample code you made, you want to establish/create the message queues attached to an endpoint before initiating peerings/connections with other endpoints. The point here is to allow topic advertisements/subscriptions to be established before actually sending any messages.

2) If you request a message to be sent before the connection w/ a peer is actually established, it may just get dropped because it’s seen that no one is interested in that message. In your example, you can check the outgoing/incoming connection status queues to wait for the connection to establish. Using the blocking “need” version of the function works fine as a convenience in this case, but I’d think it more typical in real code to use the non-blocking “want” version and to have integrated queues into event loop (e.g. select(), poll(), etc.)

3) Using the non-blocking “want” version of popping the message queue doesn’t give any time for the message to actually be sent and arrive at the peer endpoint. Either integrate into an event loop or just use the blocking “need” version to wait for the message to arrive.

Here’s an example of revising your code w/ those 3 suggestions:

#include <stdio.h>
#include "broker.h"

int main (int argc, char* argv[]) {
    broker_init(0);
    broker_endpoint* ep1 = broker_endpoint_create_with_flags("ep1", 3);
    broker_endpoint* ep2 = broker_endpoint_create_with_flags("ep2", 3);
    broker_string* bs = broker_string_create("");
    broker_message_queue* q = broker_message_queue_create(bs, ep2);
    
    broker_peering* p = broker_endpoint_peer_locally(ep2, ep1);

    const broker_outgoing_connection_status_queue* ocsq =
            broker_endpoint_outgoing_connection_status(ep2);
    broker_deque_of_outgoing_connection_status_delete(
        broker_outgoing_connection_status_queue_need_pop(ocsq));

    broker_string* msg_str = broker_string_create("Hello, World!\n");
    broker_string* topic = broker_string_create("test");
    broker_data* msg = broker_data_from_string(msg_str);
    
    broker_vector* vec = broker_vector_create();
    int res = broker_vector_insert(vec, msg, 0L);
                   res = broker_endpoint_send(ep1, topic, vec);
    
    broker_deque_of_message* msg_list = broker_message_queue_need_pop(q);
    size_t num_msgs = broker_deque_of_message_size(msg_list);
    
    printf("There are %ld messages.\n", num_msgs);
}

Hope that helps.

- Jon

Hi Jon,

Thanks for your reply!

I implemented your recommended changes, recompiled, and re-ran, but am getting the same result:

davids-air-2:broker-haskell dbanas$ ./a.out
There are 0 messages.

Any thoughts?

Thanks!
-db

It shouldn’t be possible for broker_message_queue_need_pop() to return zero messages (it’s a bug if it does). Double check your revised code against what I posted and that you compiled/ran that version. Else post the exact code you ran and other details you may think would help reproduce those results (OS, compiler version, Broker version, CAF version, etc.).

- Jon

Hi Jon,

Thanks for your reply!

I implemented your recommended changes, recompiled, and re-ran, but am getting the same result:

davids-air-2:broker-haskell dbanas$ ./a.out
There are 0 messages.

Any thoughts?

Thanks!
-db

Thanks, Jon.

Commands:

davids-air-2:broker-haskell dbanas$ touch test.c
davids-air-2:broker-haskell dbanas$ gcc test.c -lbroker
davids-air-2:broker-haskell dbanas$ ./a.out
There are 0 messages.

Code:

#include <stdio.h>
#include <stdlib.h>
#include “…/bro/aux/broker/broker/broker.h”

void my_exit (char* msg) {
printf("%s", msg);
printf("\n");
exit(-1);
}

int main (int argc, char* argv[]) {
int res = broker_init(0);
if(res) my_exit(“broker_init() failed!”);

broker_endpoint* ep1 = broker_endpoint_create_with_flags(“ep1”, 3);
if(!ep1) my_exit(“Failed to create first endpoint!”);

broker_endpoint* ep2 = broker_endpoint_create_with_flags(“ep2”, 3);
if(!ep2) my_exit(“Failed to create second endpoint!”);

broker_string* bs = broker_string_create("");
if(!bs) my_exit(“Failed to create topic string!”);

broker_message_queue* q = broker_message_queue_create(bs, ep2);
if(!q) my_exit(“Failed to create message queue!”);

broker_peering* p = broker_endpoint_peer_locally(ep2, ep1);
if(!p) my_exit(“Failed to create peering!”);

const broker_outgoing_connection_status_queue* ocsq =
broker_endpoint_outgoing_connection_status(ep2);
if(!ocsq) my_exit(“Failed to create status queue!”);

broker_deque_of_outgoing_connection_status_delete(
broker_outgoing_connection_status_queue_need_pop(ocsq));

broker_string* msg_str = broker_string_create(“Hello, World!\n”);
if(!msg_str) my_exit(“Failed to create message string!”);

broker_string* topic = broker_string_create(“test”);
if(!topic) my_exit(“Failed to create topic string!”);

broker_data* msg = broker_data_from_string(msg_str);
if(!msg) my_exit(“Failed to create message data!”);

broker_vector* vec = broker_vector_create();
if(!vec) my_exit(“Failed to create message vector!”);

res = broker_vector_insert(vec, msg, 0L);
if(!res) my_exit(“Failed to insert into vector!”);

res = broker_endpoint_send(ep1, topic, vec);
if(!res) my_exit(“Failed to send message!”);

broker_deque_of_message* msg_list = broker_message_queue_want_pop(q);
if(!msg_list) my_exit(“Failed to pop queue!”);

size_t num_msgs = broker_deque_of_message_size(msg_list);
printf(“There are %ld messages.\n”, num_msgs);
}

-db

This ends up checking for messages while the one you just sent is still in-flight. Try changing it to use “broker_message_queue_need_pop(q)”. That version will block until at least one message can be retrieved from the queue.

Alternatively, what you may want in a real application is integrate the “want_pop” version into a poll()/select() loop so you get signaled when something is actually available to retrieve.

- Jon

That worked!:

Davids-MacBook-Air-2:broker-haskell dbanas$ ./a.out
There are 1 messages.

Thanks, so much, for hand holding me through this, Jon!

Have a great weekend,
-db