EDS.EMsgPort

The e-msgport.c file contains a few functions, notably the EMsgPort code itself, EDList, and some other thread-related or misc primitives. It is all used extensively in Evolution mail code.

EMsgPort

EMsgPort is a very handy thread primitive which implements an asynchronous message queue. This predates the GAsync code in Glib, and has one very important extra feature - the ability to wait for data to arrive on the port in a non-blocking manner.

 typedef struct _EMsgPort EMsgPort;

The port itself is a completely opaque structure to client code.

But internally it is a queue of messages, access control locks, and file descriptors used for non-blocking wait.

 struct _EMsgPort {
        EDList queue;
        int condwait;
        union {
                int pipe[2];
                struct {
                        int read;
                        int write;
                } fd;
        } pipe;
 #ifdef HAVE_NSS
        struct {
                PRFileDesc *read;
                PRFileDesc *write;
        } prpipe;
 #endif
        GCond *cond;
        GMutex *lock;
 };

EMsg

Each message sent to the port consists of a list header, and an optional reply port. The semantics of the message will depend on the port it is being sent to - code simply sub-classes this structure and adds whatever fields they wish afterwards.

 typedef struct _EMsg {
        EDListNode ln;
        EMsgPort *reply_port;
 } EMsg;

The EMsg semantics are particularly simple, as soon as a message is placed on the port, all of its memory (including all pointers) can no longer be accessed by the calling thread. This works in both directions, so the recieving task can access the data until it has replied to the message. Depending on the semantics of the port some messages may be send-and-forget, and it will free any data associated with the message.

Methods

The methods are all very simple and straightforward.

 EMsgPort *e_msgport_new(void);
 void e_msgport_destroy(EMsgPort *mp);

The main fucntions are, put, get, and wait. put sends a message to a port, get will get a message if one is waiting, or simply return NULL - it can be used to poll the port, and wait will go to sleep using a GCond, to wait for a message to arrive. Using wait you can only wait on a single event or port at a time.

 void e_msgport_put(EMsgPort *mp, EMsg *msg);
 EMsg *e_msgport_wait(EMsgPort *mp);
 EMsg *e_msgport_get(EMsgPort *mp);

Then when the reciever is finished, they can reply, which will send it back to the originator.

 void e_msgport_reply(EMsg *msg);

Using just the above functions you can write synchronous or asynchronous inter-thread processing very simply and reliably.

But the real power is being able to wait on multiple ports or other i/o events. These can be done from GMainLoop's or other poll or select based mechanisms.

You can get either a normal Linux file descriptor or a NSPR based file descriptor (but not both on the same port). On these you simply poll for POLLIN events, and if one arrives, you can then further use get to verify if anytyhing has arrived.

 int e_msgport_fd(EMsgPort *mp);
 #ifdef HAVE_NSS
 struct PRFileDesc *e_msgport_prfd(EMsgPort *mp);
 #endif

Note that if you don't call these functions, no descriptor is ever allocated. Which means you can setup as many ports as you wish if you don't use the file descriptors, without fear of resource starvation.

Internals

Internally, each time a message is added to the port it is locked and just dumped on the queue. If there are any waiters, a condition is signalled. After unlocking, if there is a file descriptor setup, then a single byte is always written to the file descriptor as a wake-up signal.

When a message port is 'read', it is locked again, and checked for any waiting messages. If one exists, and either of the pipe file descritors are set, then a single byte is read from the pipe.

Waiting is slightly more complicated, if either pipe is setup then they are used to poll on, otherwise a cond_wait is used to wait for a condition signal.

EDlist

This is identical to Evolution/Camel.Misc#Camel.DList, just with a different name.

 typedef struct _EDListNode {
        struct _EDListNode *next;
        struct _EDListNode *prev;
 } EDListNode;
 
 typedef struct _EDList {
        struct _EDListNode *head;
        struct _EDListNode *tail;
        struct _EDListNode *tailpred;
 } EDList;

Examples

See the source-code I guess, particularly evolution/mail/mail-mt.c for extensive use of the thread and message primitives.

Example: Worker thread

This example sets up a single worker thread which then processes jobs from (any number of) other threads.

 struct _msg {
        EMsg msg;
 
        int data;
        int result;
 };
 
 void *worker_thread(void *p) {
        struct _msg *m;
        int go = 1;
        EMsgPort *port = p;
 
        while (go) {
                e_msgport_wait(port);
                m = e_msgport_get(port);
                if (m) {
                        if (m->data == -1)
                                go = FALSE;
                        else
                                m->result = m->data * time(0);
        }
 
        return NULL;
 }
 
 void *client_thread(void *p) {
        EMsgPort *reply;
        EMsgPort *port = p;
        int i;
 
        m = g_malloc(sizeof(*m));
        reply = e_msgport_new();
        m->msg.reply_port = reply;
        for (i=0;i<5;i++) {
                m->data = i;
                e_msgport_put(port, (EMsg *)m);
                e_msgport_wait(reply_port);
                e_msgport_get(reply_port);
        }
        e_msgport_destroy(reply_port);
        g_free(m);
 
        return NULL;
 }
 
 int main(int argc, char **argv) {
        EMsgPort *port;
        int i;
        pthread_t ids[5], worker;
        struct _msg quit;
 
        /* .. init .. */
 
        port = e_msgport_new();
        pthread_create(&worker, NULL, worker_thread, port);
        for (i=0;i<5;i++)
                pthread_create(&ids[i], NULL, client_thread, port);
 
        for (i=0;i<5;i++)
                pthread_join(ids[i], NULL);
 
        quit.msg.reply_port = NULL;
        quit.data = -1;
        e_msgport_put(port, &quit);
        pthread_join(worker, NULL);
 
        return 0;
 }

This example also shows a few important details, that reply ports and messages can be re-used multiple times - far more efficient than allocating them every time. Also note that the list fields DO NOT need to be intialised, saving some setup effort.

Also messages are just data, they can be allocated on the stack just as easily - if used in a synchronous manner! The quit message doesn't need to be waited for in this case since we have an alternative mechanism to detect thread exit, although you would normally need to for any stack-allocated message.

Notes

EMsgPort and EDList are based on AmigaOS's "exec.library". Although exec message ports and lists support a priority queue.

Apps/Evolution/EDS.EMsgPort (last edited 2013-08-08 22:50:06 by WilliamJonMcCann)