14.3. Synchronizing Threads

In the examples we’ve looked at thus far, each thread executes without sharing data with any other threads. In the scalar multiplication program for instance, each element of the array is entirely independent from all the others, making it unnecessary for the threads to share data.

However, a thread’s ability to easily share data with other threads is one of its main features. Recall that all the threads of a multi-threaded process share the heap common to the process. In this section, we study the data sharing and protection mechanisms available to threads in detail.

Thread synchronization refers to forcing threads to execute in a particular order. While synchronizing threads can add to the run time of a program, it is often necessary to ensure program correctness. In this section, we primarily discuss how one synchronization construct (a mutex) helps ensure the correctness of a threaded program. We conclude the section with a discussion of some other common synchronization constructs, like semaphores, barriers and condition variables.

CountSort

Let’s study a slightly more complicated example called CountSort. The CountSort algorithm is a simple linear (O(N)) sorting algorithm for sorting a known small range of R values where R is much smaller than N. To illustrate how CountSort works, consider an array A of fifteen elements, all of which contain random values between 0 and 9 (ten possible values):

A = [9, 0, 2, 7, 9, 0, 1, 4, 2, 2, 4, 5, 0, 9, 1]

For a particular array, CountSort works as follows:

  1. Counts the frequency of each value in the array.

  2. Overwrites the original array by enumerating each value by its frequency.

After step 1, the frequency of each value is placed in a counts array of length 10, where the value of counts[i] is the frequency of the value i in array A. For example, since there are three 2 elements in array A, counts[2] is 3.

The corresponding counts array for the above example looks like the following:

counts = [3, 2, 3, 0, 2, 1, 0, 1, 0, 3]

Note that the sum of all the elements in the counts array is equal to the length of A, or fifteen.

Step 2 uses the counts array to overwrite A, using the frequency counts to determine the set of indices in A that store each consecutive value in sorted order. So, since the counts array indicates that there are three 0 elements and two 1 elements in array A, the first three elements of the final array will be 0, and the next two will be 1.

After running Step 2, the final array looks like the following:

A = [0, 0, 0, 1, 1, 2, 2, 2, 4, 4, 5, 7, 9, 9, 9]

Below is a serial implementation of the CountSort algorithm, with the count() (step 1) and overwrite() (step 2) functions clearly delineated. For brevity, we do not reproduce the whole program here, though you can download the source (countSort.c).

#define MAX 10 //the maximum value of an element. (10 means 0-9)

/*step 1:
 * compute the frequency of all the elements in the input array and store
 * the associated counts of each element in array counts. The elements in the
 * counts array are initialized to zero prior to the call to this function.
*/
void countElems(int *counts, int *array_A, long length) {
    int val, i;
    for (i = 0; i < length; i++) {
      val = array_A[i]; //read the value at index i
      counts[val] = counts[val] + 1; //update the corresponding location in counts
    }
}

/* step 2:
 * overwrite the input array (array_A) using the frequencies stored in the
 *  array counts
*/
void writeArray(int *counts, int *array_A) {
    int i, j = 0, amt;

    for (i = 0; i < MAX; i++) { //iterate over the counts array
        amt = counts[i]; //capture frequency of element i
        while (amt > 0) { //while all values aren't written
            array_A[j] = i; //replace value at index j of array_A with i
            j++; //go to next position in array_A
            amt--; //decrease the amount written by 1
        }
    }
}

/* main function:
 * gets array length from command line args, allocates a random array of that
 * size, allocates the counts array, the executes step 1 of the CountSort
 * algorithm (countsElem) followed by step 2 (writeArray).
*/
int main( int argc, char **argv ) {
    //code ommitted for brevity -- download source to view full file

    srand(10); //use of static seed ensures the output is the same every run

    long length = strtol( argv[1], NULL, 10 );
    int verbose = atoi(argv[2]);

    //generate random array of elements of specified length
    int *array = malloc(length * sizeof(int));
    genRandomArray(array, length);

    //print unsorted array (commented out)
    //printArray(array, length);

    //allocate counts array and initializes all elements to zero.
    int counts[MAX] = {0};

    countElems(counts, array, length); //calls step 1
    writeArray(counts, array); //calls step2

    //print sorted array (commented out)
    //printArray(array, length);

    free(array); //free memory

    return 0;
}

Running this program on an array of size fifteen yields the following output:

$ ./countSort 15 1
array before sort:
5 8 8 5 8 7 5 1 7 7 3 3 8 3 4
result after sort:
1 3 3 3 4 5 5 5 7 7 7 8 8 8 8

The second parameter to this program is a verbose flag, which indicates whether or not the program prints output. This is a useful option for larger arrays, where we may want to run the program but not necessarily print out the output.

Parallelizing countElems(): An initial attempt

CountSort consists of two primary steps, each of which benefits from being parallelized. In the remainder of the chapter, we primarily concentrate on the parallelization of step 1, or the countElems() function. Parallelizing the writeArray() function is left as an exercise to the reader.

The code block below depicts a first attempt at creating a threaded countElems() function. Parts of the code (arg parsing, error handling) are ommitted below for the sake of brevity, but the full source can be downloaded here (countElems_p.c). In the code that follows, each thread attempts to count the frequency of the array elements in its assigned component of the global array and updates a global count array with the discovered counts:

/*parallel version of step 1 (first cut) of CountSort algorithm:
 * extracts arguments from args value
 * calculates the portion of the array that thread is responsible for counting
 * computes the frequency of all the elements in assigned component and stores
 * the associated counts of each element in counts array
*/
void *countElems( void *args ) {
    struct t_arg * myargs = (struct t_arg *)args;
    //extract arguments (omitted for brevity)
    int *array = myargs->ap;
    long *counts = myargs->countp;
    //... (get nthreads, length, myid)

    //assign work to the thread
    long chunk = length / nthreads; //nominal chunk size
    long start = myid * chunk;
    long end = (myid + 1) * chunk;
    long val;
    if (myid == nthreads-1) {
        end = length;
    }

    long i;
    //heart of the program
    for (i = start; i < end; i++) {
        val = array[i];
        counts[val] = counts[val] + 1;
    }

    return NULL;
}

The main function looks nearly identical to our sample programs before:

int main( int argc, char **argv ) {

    if (argc != 4) {
        //print out usage info (ommitted for brevity)
        return 1;
    }

    srand(10); //static seed to assist in correctness check

    //parse command line arguments
    long t;
    long length = strtol(argv[1], NULL, 10);
    int verbose = atoi(argv[2]);
    long nthreads = strtol(argv[3], NULL, 10);

    //generate random array of elements of specified length
    int *array = malloc(length * sizeof(int));
    genRandomArray(array, length);

    //specify counts array and initialize all elements to zero
    long counts[MAX] = {0};

    //allocate threads and args array
    pthread_t *thread_array; //pointer to future thread array
    thread_array = malloc( nthreads * sizeof(pthread_t) ); //allocate the array
    struct t_arg *thread_args = malloc( nthreads * sizeof(struct t_arg) );

    //fill thread array with parameters
    for (t = 0; t < nthreads; t++) {
        //ommitted for brevity...
    }

    for (t = 0; t < nthreads; t++) {
        pthread_create(&thread_array[t], NULL, countElems, &thread_args[t]);
    }

    for (t = 0; t < nthreads; t++) {
        pthread_join(thread_array[t], NULL);
    }

    free(thread_array);
    free(array);

    if (verbose) {
        printf("Counts array:\n");
        printCounts(counts);
    }
    return 0;
}

For reproducibility purposes, the random number generator is seeded with a static value (10) to ensure that array (and therefore counts) always contains the same set of numbers. An additional function (printCounts()) prints out the contents of the global 'counts' array. The expectation is that, regardless of the number of threads used, the contents of the counts array should always be the same. For brevity, error handling has been removed from the listing.

Compiling the program and running it with one, two, and four threads over 10 million elements produces:

$ gcc -o countElems_p countElems_p.c -lpthread

$./countElems_p 10000000 1 1
Counts array:
999170 1001044 999908 1000431 999998 1001479 999709 997250 1000804 1000207

$./countElems_p 10000000 1 2
Counts array:
661756 661977 657828 658479 657913 659308 658561 656879 658070 657276

$./countElems_p 10000000 1 4
Counts array:
579846 580814 580122 579772 582509 582713 582518 580917 581963 581094

Note that the printed results change significantly on each run. In particular, they seem to change as we vary the number of threads! This should not happen, since our use of the static seed guarantees the same set of numbers every run. These results contradict one of the cardinal rules for threaded programs: the output of a program should be correct and consistent regardless of the number of threads used.

Since our first attempt at parallelizing countElems() doesn’t seem to be working, let’s delve deeper into what this program is doing and examine how we might fix it.

Data Races

To understand what’s going on, let’s consider an example run with two threads on two separate cores of a multicore system. Recall that the execution of any thread can be pre-empted at any time by the operating system, which means that each thread could be running different instructions of a particular functions at any given time (or possibly the same instruction). The table below shows one possible path of execution through the countElems() function. To better illustrate what is going on, we translated the line counts[val] = counts[val] + 1 into the following sequence of equivalent instructions:

  • read counts[val] and place into a register

  • modify register by incrementing it by one

  • write counts[val] with the contents of register

This is known as the read-modify-write pattern. In the example execution shown below, each thread executes on a separate core. We start inspecting the execution of the process at time step i, where both threads have a val of 1:

Table 1. A possible execution sequence of two threads running countElems. Thread 0 is running on core 0, and Thread 1 is running on core 1.
Time Thread 0 Thread 1

i

read counts[1] and place into core 0’s register

…​

i+1

increment register by one

read counts[1] and place into core 1’s register

i+2

overwrite counts[1] with contents of register

increment register by one

i+3

…​

overwrite counts[1] with contents of register

Suppose that prior to the execution sequence in shown above, counts[1] contains the value 60. In time step i, Thread 0 reads counts[1] and places the value 60 in core 0’s register. In time step i+1, while Thread 0 increments core 0’s register by one, the current value in counts[1] (60) is read into core 1’s register by Thread 1. In time step i+2, Thread 0 updates counts[1] with the value 61 while Thread 1 increments the value stored in its local register (60) by one. The end result is that during time step i+3, the value counts[1] is overwritten by Thread 1 with the value 61, not 62 as we would expect! This causes counts[1] to essentially "lose" an increment!

We refer to the scenario where two threads attempt to write to the same location in memory as a data race condition. More generally, a race condition refers to any scenario with the simultaneous execution of two operations gives an incorrect result. Note that a simultaneous read of the counts[1] location would NOT in itself constitute a race condition, since values can generally read alone from memory without issue. It was the combination of this step with the writes to counts[1] that caused the incorrect result. This read/modify/write pattern is a common source of a particular type of race condition called a data race in most threaded programs. In our discussion of race conditions and how to fix them, we focus on data races.

Atomic operations

An operation is defined as being atomic if threads perceive it as executing without interruption (in other words, as an "all or nothing" action). In some libraries, a keyword or type is used to specify that a block of computation should be treated as being atomic. In the above example, the line counts[val] = counts[val] + 1 (even if written as counts[val]++) is NOT atomic, since this line actually corresponds to several instructions at the machine level. A synchronization construct like mutual exclusion is needed to ensure there are no data races. In general, all operations should be assumed to be non-atomic unless mutual exclusion is explicitly enforced.

Keep in mind that not all execution sequences of the two threads cause a race condition. Consider the following sample execution sequence of Threads 0 and 1:

Table 2. Another possible execution sequence of two threads running countElems
Time Thread 0 Thread 1

i

read counts[1] and place into core 0’s register

…​

i+1

increment register by one

…​

i+2

overwrite counts[1] with contents of register

…​

i+3

…​

read counts[1] and place into core 1’s register

i+4

…​

increment register by one

i+5

…​

overwrite counts[1] with contents of register

In this execution sequence, Thread 1 does not read from counts[1] until after Thread 0 updates it with its new value (61). The end result is that Thread 1 reads the value 61 from counts[1] and places it into core 1’s register during time step i+3, and writes the value 62 to counts[1] in time step i+5.

To fix a data race, we must first isolate the critical section, or the subset of code that must execute atomically (in isolation) to ensure correct behavior. In threaded programs, blocks of code that update a shared resource are typically identified to be critical sections.

In the countElems() function, updates to the counts array should be put in a critical section to ensure that values are not lost due to multiple threads updating the same location in memory:

long i;
for (i = start; i < end; i++) {
    val = array[i];
    counts[val] = counts[val] + 1; //this line needs to be protected
}

Since the fundamental problem in countElems() is the simultaneous access of counts by multiple threads, a mechanism is needed to ensure that only one thread executes within the critical section at a time. Using a synchronization construct (like a mutex, which is covered in the next section) will force the threads to enter the critical section sequentially.