Threading

Require Vala >= 0.11.0

Simple Threads with Context

class MyThread {

    private string name;
    private int count = 0;

    public MyThread (string name) {
        this.name = name;
    }

    public void* thread_func () {
        while (true) {
            stdout.printf ("%s: %i\n", this.name, this.count);
            this.count++;
            Thread.usleep (Random.int_range (0, 200000));
        }
    }
}

int main () {
    if (!Thread.supported ()) {
        stderr.printf ("Cannot run without thread support.\n");
        return 1;
    }

    var thread_a_data = new MyThread ("A");
    var thread_b_data = new MyThread ("B");

    try {

        // Start two threads
        unowned Thread<void*> thread_a = Thread.create<void*> (thread_a_data.thread_func, true);
        unowned Thread<void*> thread_b = Thread.create<void*> (thread_b_data.thread_func, true);

        // Wait for threads to finish (this will never happen in our case, but anyway)
        thread_a.join ();
        thread_b.join ();

    } catch (ThreadError e) {
        stderr.printf ("%s\n", e.message);
        return 1;
    }

    return 0;
}

$ valac --thread threads.vala
$ ./threads

Simple Threads with Context (GLib 2.32 version)

class MyThread {

    private string name;
    private int count = 0;

    public MyThread (string name) {
        this.name = name;
    }

    public void* thread_func () {
        while (true) {
            stdout.printf ("%s: %i\n", this.name, this.count);
            this.count++;
            Thread.usleep (Random.int_range (0, 200000));
        }
    }
}

int main () {
    if (!Thread.supported ()) {
        stderr.printf ("Cannot run without thread support.\n");
        return 1;
    }

    var thread_a_data = new MyThread ("A");
    var thread_b_data = new MyThread ("B");

    try {
        // Start two threads
        /* With error handling */
        Thread<void*> thread_a = new Thread<void*>.try ("thread_a", thread_a_data.thread_func);
        /* Without error handling (is not using the try/catch) */
        Thread<void*> thread_b = new Thread<void*> ("thread_b", thread_b_data.thread_func);

        // Wait for threads to finish (this will never happen in our case, but anyway)
        thread_a.join ();
        thread_b.join ();

    } catch (Error e) {
        stderr.printf ("%s\n", e.message);
        return 1;
    }


    return 0;
}

$ valac --thread --target-glib=2.32 threads.vala
$ ./threads

Synchronization With Mutex

This is an implementation of the dining philosophers problem, a classic multi-process synchronization problem.

/** Fork pool used by the philosophers */
class Forks {

    private bool[] fork = new bool[5];    // initially false, i.e. not used

    private Cond cond = new Cond ();
    private Mutex mutex = new Mutex ();

    // Try to pick up the forks with the designated numbers
    public void pick_up (int left, int right) {
        mutex.lock ();
        while (fork[left] || fork[right]) {
            cond.wait (mutex);
        }
        fork[left] = true;
        fork[right] = true;
        mutex.unlock ();
    }

    // Lay down the forks with the designated numbers
    public void lay_down (int left, int right) {
        mutex.lock ();
        fork[left] = false;
        fork[right] = false;
        cond.broadcast ();
        mutex.unlock ();
    }
}

/** A dining philosopher */
class Philosopher {

    private int number;         // this philosopher's number
    private int think_delay;    // how long does this philosopher think?
    private int eat_delay;      // how long does this philosopher eat?
    private int left;           // left fork number
    private int right;          // right fork number
    private Forks forks;        // forks used by all philosophers

    public Philosopher (int number, int think_delay, int eat_delay, Forks forks) {
        this.number = number;
        this.think_delay = think_delay;
        this.eat_delay = eat_delay;
        this.forks = forks;
        this.left = number == 0 ? 4 : number - 1;
        this.right = (number + 1) % 5;
    }

    public void* run () {
        while (true) {
            Thread.usleep (think_delay);
            forks.pick_up (left, right);
            stdout.printf ("Philosopher %d starts eating...\n", number);
            Thread.usleep (eat_delay);
            forks.lay_down (left, right);
            stdout.printf ("Philosopher %d stops eating...\n", number);
        }
    }
}

int main (string[] args) {

    if (!Thread.supported ()) {
        error ("Cannot run without thread support.");
    }

    var forks = new Forks ();

    Philosopher[] philos = {
        new Philosopher (0, 100000, 500000, forks),
        new Philosopher (1, 200000, 400000, forks),
        new Philosopher (2, 300000, 300000, forks),
        new Philosopher (3, 400000, 200000, forks),
        new Philosopher (4, 500000, 100000, forks)
    };

    try {
        foreach (var philosopher in philos) {
            Thread.create<void*> (philosopher.run, false);
        }
    } catch (ThreadError e) {
        error ("%s\n", e.message);
    }

    new MainLoop ().run ();

    return 0;
}

$ valac --thread philosophers.vala
$ ./philosophers

Communcation between two threads using async queues

In this example data is sent from one thread to another. This is done via GLib's AsyncQueue. The pop and push functions of AsyncQueue provide built-in locking.

class ThreadCommunication {

    private const int NUMBER_OF_MESSAGES = 200000;
    private AsyncQueue<DataBox> async_queue;

    public ThreadCommunication () {
        this.async_queue = new AsyncQueue<DataBox> ();
    }

    // data object for sending
    private class DataBox {

        public int number { get; private set; }
        public string name { get; private set; }

        public DataBox (int number, string name) {
            this.number = number;
            this.name = name;
        }
    }

    private void* writing_func () {
        var timer = new Timer ();
        timer.start ();
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            // prepare an object to send
            var databox = new DataBox (i, @"some text for value $i");
            // send a message to the queue
            async_queue.push (databox);
        }
        // show time result
        print ("Pushed %d DataBoxes into AsyncQueue in %f s\n", NUMBER_OF_MESSAGES, timer.elapsed ());
        return null;
    }

    private void* reading_func () {
        var timer = new Timer ();
        timer.start ();
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            // receive a message from the queue
            var databox = async_queue.pop ();

            // make sure the content is right
            assert (i == databox.number);
            assert (@"some text for value $i" == databox.name);

            // show one of the strings
            if ((NUMBER_OF_MESSAGES / 2) == databox.number) {
                print ("\tNO: %d \tTEXT: %s\n", databox.number, databox.name);
            }
        }
        // show time result
        print ("Popped %d DataBoxes from AsyncQueue in %f s\n", NUMBER_OF_MESSAGES, timer.elapsed ());
        return null;
    }

    public void run () {
        try {
            unowned Thread<void*> thread_a = Thread.create<void*> (writing_func, true);
            unowned Thread<void*> thread_b = Thread.create<void*> (reading_func, true);

            // Wait until the threads finish
            thread_a.join ();
            thread_b.join ();

        } catch (ThreadError e) {
            stderr.printf ("%s\n", e.message);
            return;
        }
    }
}

void main () {
    var thread_comm = new ThreadCommunication ();
    thread_comm.run ();
}

$ valac --thread async-queue-test.vala
$ ./async-queue-test


Vala/Examples

Projects/Vala/ThreadingSamples (last edited 2013-11-22 16:48:27 by WilliamJonMcCann)