Switching to multiple threads, with a non-thread-safe resource

About

Your application used to be single threaded and is consuming a resource that is not thread-safe. You're splitting your application up into two or more threads. Both threads want to consume the non-thread-safe resource.

GThreadPool

The GThreadPool helps you. As a solution you can for example choose to have one extra thread to be the exclusive consumer of your resource. All other threads must ask this extra thread to solve the task. It would of course perform better if your resource was indeed thread-safe. But we know it's not and we want to solve that later, or we just don't care about performance (which is typical, else you would probably have made a state machine in stead of splitting things up into threads).

Normal locking

Normal locking the resource with a mutex works of course most of the times. Very often you end up with quite complex locking problems. I don't recommend this for a resource that is complex by itself (one that for example calls other methods in your code that might call the resource back, which could require a recursive mutex, etc).

However, I'm not trying to say that a mutex doesn't work. This sample tries to illustrate how to make one specific thread x be the exclusive consumer of your resource.

We don't want to lose the structure of our application

By that I mean that we don't want to make it more difficult for the user of our resource. The API must be exactly the same. For that we can use the decorator pattern: we will decorate the actual resource with a wrap that makes the actual resource behave in such a way that it's always used by one thread.

decorator.png

The decorator pattern, vala like syntax

namespace I {
  interface Resource {
        GLib.Object do_work (void* arg1, void* arg2);
  }

  public class AmNotThreadSafe: Resource {
        public GLib.Object do_work (void* arg1, void* arg2) {
                Perform the work
        }
  }

  public class AmThreadSafe: Resource {
        Resource real;
        public GLib.Object do_work (void* arg1, void* arg2) {
                Perform the work
                but on this.real and do it thread safe
        }
        public AmThreadSafe (Resource real) {
                this.real = real;
        }
  }
}

That IResource interface

So, to make the IResource interface, we just do this:

i-resource.vala:

namespace I {
  public interface Resource {
        public abstract GLib.Object do_work (void* arg1, void* arg2);
  }
}

This will generate i-resource.c and i-resource.h for you:

valac i-resource.vala 

With GObject and glib, a bit more complete

The file i-am-thread-safe.c

I'll use one of those crazy GObject boilerplate macros. Personally, I don't like them.

#include "i-am-thread-safe.h"

G_DEFINE_TYPE_WITH_CODE (IAmThreadSafe, 
        i_am_thread_safe, 
        G_TYPE_OBJECT,
        G_IMPLEMENT_INTERFACE (I_TYPE_RESOURCE,
                my_iface_init))

typedef struct {
        gpointer arg1, arg2;
        GObject *retval, *self;

        GCond *condition;
        GMutex *mutex;
        gboolean had_callback;
} WorkTask;

static void
i_am_thread_safe_finalize (GObject *object) 
{
        IAmThreadSafe *self = object;
        g_thread_pool_free (self->pool, TRUE, TRUE);
        g_object_unref (self->real);
}

static void
i_am_thread_safe_class_init (IAmThreadSafeClass *class)
{
        GObjectClass *object_class = G_OBJECT_CLASS (class);
        object_class->finalize = i_am_thread_safe_finalize;
}

static void 
do_the_work (gpointer data, gpointer user_data)
{
        WorkTask *task = data;
        IAmThreadSafe *self = task->self;

        /* You indeed call the interface's method on
         * self->real */

        task->retval = i_resource_do_work (self->real, 
                        task->arg1, 
                        task->arg2);

        /* Make the caller stop its waiting */

        g_mutex_lock (task->mutex);
        g_cond_broadcast (task->condition);
        task->had_callback = TRUE;
        g_mutex_unlock (task->mutex);
}


static void 
i_am_thread_safe_init (IAmThreadSafe *self)
{
        /* Because max_threads is 1, there wont ever be more 
         * than one thread being created by g_thread_pool_push, 
         * indeed */

        GThreadPool *pool = g_thread_pool_new (do_the_work, 
                                NULL, 1, TRUE, NULL);
        self->pool = pool;
}

static GObject*
i_am_thread_safe_do_work (IResource *self, gpointer arg1, gpointer arg2)
{
        WorkTask *info = g_slice_new (WorkTask);
        GObject *retval;

        task->mutex = g_mutex_new ();
        task->condition = g_cond_new ();
        task->had_callback = FALSE;

        /* When passing GObjects to another context (like 
           another thread), always add a reference and just
           take it back later. Better safe than sorry */

        task->arg1 = arg1;
        task->arg2 = arg2;
        task->self = g_object_ref (self);

        /* max_threads of pool is 1, so this wont create n threads */

        g_thread_pool_push (self->pool, task, NULL);

        /* If we don't wait here (like in non-blocking async APIs), 
         * we'll need to freeup our task and the other resources in 
         * do_the_work */

        g_mutex_lock (task->mutex);
        if (!task->had_callback)
                g_cond_wait (task->condition, task->mutex);
        g_mutex_unlock (task->mutex);

        /* But since we did wait, we can even pass a 
         * return-value */

        retval = task->retval;

        /* And we free our stuff up */

        g_object_unref (task->self);
        g_mutex_free (task->mutex);
        g_cond_free (task->condition);
        g_slice_free (WorkTask, task);

        return retval;
}

static void
my_iface_init (IResourceIface *iface)
{
        iface->do_work = i_am_thread_safe_do_work;
}

IResource *
i_am_thread_safe_new (IResource *real)
{
        IAmThreadSafe *self = g_object_new (I_TYPE_AM_THREAD_SAFE, 
                                        NULL);

        /* You probably want to do this with a property 
         * and in a private structure instead */

        self->real = g_object_ref (real);

        return self;
}

The file i-am-thread-safe.h

#ifndef __I_AM_THREAD_SAFE_H__
#define __I_AM_THREAD_SAFE_H__
#include <glib.h>
#include <glib-object.h>
#include "i-resource.h"
G_BEGIN_DECLS

#define I_TYPE_AM_THREAD_SAFE (i_am_thread_safe_get_type ())
#define I_AM_THREAD_SAFE(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), 
                I_TYPE_AM_THREAD_SAFE, IAmThreadSafe))
#define I_AM_THREAD_SAFE_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c),
                I_TYPE_AM_THREAD_SAFE, IAmThreadSafeClass))
#define I_IS_AM_THREAD_SAFE(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), 
                I_TYPE_AM_THREAD_SAFE))
#define I_IS_AM_THREAD_SAFE_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((o),
                I_TYPE_AM_THREAD_SAFE))
#define I_AM_THREAD_SAFE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o),
                I_TYPE_AM_THREAD_SAFE, IAmThreadSafeClass))

typedef struct IAmThreadSafe IAmThreadSafe;
typedef struct IAmThreadSafeClass IAmThreadSafeClass;

struct IAmThreadSafe {
        GObject parent_instance;
        IResource *real;
        GThreadPool *pool;
};
struct IAmThreadSafeClass {
        GObjectClass parent_class;
};

GType i_am_thread_safe_get_type (void);
IResource * i_am_thread_safe_new (IResource *real);

G_END_DECLS
#endif /* __I_AM_THREAD_SAFE_H__ */

TODO: the same example but in Vala

...

Using in C

IResource *real = i_am_not_thread_safe_new ();
IResource *safe = i_am_thread_safe_new (real);

thread1:

GObject *retval = i_resource_do_work (safe, arg1, arg2);

thread2:

GObject *retval = i_resource_do_work (safe, arg1, arg2);

threadN:

GObject *retval = i_resource_do_work (safe, arg1, arg2);

Examples where you can use this

Attic/MTWithNonThreadSafeResources (last edited 2013-11-23 01:26:29 by WilliamJonMcCann)