14.3. Synchronizing Threads
In the examples we’ve looked at thus far, each thread executes without sharing data with any other threads. In the scalar multiplication program for instance, each element of the array is entirely independent from all the others, making it unnecessary for the threads to share data.
However, a thread’s ability to easily share data with other threads is one of its main features. Recall that all the threads of a multi-threaded process share the heap common to the process. In this section, we study the data sharing and protection mechanisms available to threads in detail.
Thread synchronization refers to forcing threads to execute in a particular order. While synchronizing threads can add to the run time of a program, it is often necessary to ensure program correctness. In this section, we primarily discuss how one synchronization construct (a mutex) helps ensure the correctness of a threaded program. We conclude the section with a discussion of some other common synchronization constructs, like semaphores, barriers and condition variables.
CountSort
Let’s study a slightly more complicated example called CountSort. The CountSort algorithm is a simple linear (O(N)) sorting algorithm for sorting a known small range of R values where R is much smaller than N. To illustrate how CountSort works, consider an array A of fifteen elements, all of which contain random values between 0 and 9 (ten possible values):
A = [9, 0, 2, 7, 9, 0, 1, 4, 2, 2, 4, 5, 0, 9, 1]
For a particular array, CountSort works as follows:
-
Counts the frequency of each value in the array.
-
Overwrites the original array by enumerating each value by its frequency.
After step 1, the frequency of each value is placed in a counts
array of length
10, where the value of counts[i]
is the frequency of the value i in array
A. For example, since there are three 2
elements in array A, counts[2]
is 3
.
The corresponding counts
array for the above example looks like
the following:
counts = [3, 2, 3, 0, 2, 1, 0, 1, 0, 3]
Note that the sum of all the elements in the counts
array is equal to the length
of A, or fifteen.
Step 2 uses the counts
array to overwrite A, using the frequency counts to
determine the set of indices in A that store each consecutive value in sorted
order. So, since the counts
array indicates that there are three 0
elements
and two 1
elements in array A, the first three elements of the final array
will be 0
, and the next two will be 1
.
After running Step 2, the final array looks like the following:
A = [0, 0, 0, 1, 1, 2, 2, 2, 4, 4, 5, 7, 9, 9, 9]
Below is a serial implementation of the CountSort algorithm, with the count()
(step 1) and
overwrite()
(step 2) functions clearly delineated. For brevity, we do not reproduce the
whole program here, though you can download the source (countSort.c).
#define MAX 10 //the maximum value of an element. (10 means 0-9)
/*step 1:
* compute the frequency of all the elements in the input array and store
* the associated counts of each element in array counts. The elements in the
* counts array are initialized to zero prior to the call to this function.
*/
void countElems(int *counts, int *array_A, long length) {
int val, i;
for (i = 0; i < length; i++) {
val = array_A[i]; //read the value at index i
counts[val] = counts[val] + 1; //update the corresponding location in counts
}
}
/* step 2:
* overwrite the input array (array_A) using the frequencies stored in the
* array counts
*/
void writeArray(int *counts, int *array_A) {
int i, j = 0, amt;
for (i = 0; i < MAX; i++) { //iterate over the counts array
amt = counts[i]; //capture frequency of element i
while (amt > 0) { //while all values aren't written
array_A[j] = i; //replace value at index j of array_A with i
j++; //go to next position in array_A
amt--; //decrease the amount written by 1
}
}
}
/* main function:
* gets array length from command line args, allocates a random array of that
* size, allocates the counts array, the executes step 1 of the CountSort
* algorithm (countsElem) followed by step 2 (writeArray).
*/
int main( int argc, char **argv ) {
//code ommitted for brevity -- download source to view full file
srand(10); //use of static seed ensures the output is the same every run
long length = strtol( argv[1], NULL, 10 );
int verbose = atoi(argv[2]);
//generate random array of elements of specified length
int *array = malloc(length * sizeof(int));
genRandomArray(array, length);
//print unsorted array (commented out)
//printArray(array, length);
//allocate counts array and initializes all elements to zero.
int counts[MAX] = {0};
countElems(counts, array, length); //calls step 1
writeArray(counts, array); //calls step2
//print sorted array (commented out)
//printArray(array, length);
free(array); //free memory
return 0;
}
Running this program on an array of size fifteen yields the following output:
$ ./countSort 15 1 array before sort: 5 8 8 5 8 7 5 1 7 7 3 3 8 3 4 result after sort: 1 3 3 3 4 5 5 5 7 7 7 8 8 8 8
The second parameter to this program is a verbose flag, which indicates whether or not the program prints output. This is a useful option for larger arrays, where we may want to run the program but not necessarily print out the output.
Parallelizing countElems(): An initial attempt
CountSort consists of two primary steps, each of which benefits from being
parallelized. In the remainder of the chapter, we primarily concentrate on
the parallelization of step 1, or the countElems()
function. Parallelizing
the writeArray()
function is left as an exercise to the reader.
The code block below depicts a first attempt at creating a threaded
countElems()
function. Parts of the code (arg parsing, error handling) are
ommitted below for the sake of brevity, but the full source can be downloaded
here (countElems_p.c). In the code that
follows, each thread attempts to count the frequency of the array elements in its
assigned component of the global array and updates a global count array with
the discovered counts:
/*parallel version of step 1 (first cut) of CountSort algorithm:
* extracts arguments from args value
* calculates the portion of the array that thread is responsible for counting
* computes the frequency of all the elements in assigned component and stores
* the associated counts of each element in counts array
*/
void *countElems( void *args ) {
struct t_arg * myargs = (struct t_arg *)args;
//extract arguments (omitted for brevity)
int *array = myargs->ap;
long *counts = myargs->countp;
//... (get nthreads, length, myid)
//assign work to the thread
long chunk = length / nthreads; //nominal chunk size
long start = myid * chunk;
long end = (myid + 1) * chunk;
long val;
if (myid == nthreads-1) {
end = length;
}
long i;
//heart of the program
for (i = start; i < end; i++) {
val = array[i];
counts[val] = counts[val] + 1;
}
return NULL;
}
The main
function looks nearly identical to our sample programs before:
int main( int argc, char **argv ) {
if (argc != 4) {
//print out usage info (ommitted for brevity)
return 1;
}
srand(10); //static seed to assist in correctness check
//parse command line arguments
long t;
long length = strtol(argv[1], NULL, 10);
int verbose = atoi(argv[2]);
long nthreads = strtol(argv[3], NULL, 10);
//generate random array of elements of specified length
int *array = malloc(length * sizeof(int));
genRandomArray(array, length);
//specify counts array and initialize all elements to zero
long counts[MAX] = {0};
//allocate threads and args array
pthread_t *thread_array; //pointer to future thread array
thread_array = malloc( nthreads * sizeof(pthread_t) ); //allocate the array
struct t_arg *thread_args = malloc( nthreads * sizeof(struct t_arg) );
//fill thread array with parameters
for (t = 0; t < nthreads; t++) {
//ommitted for brevity...
}
for (t = 0; t < nthreads; t++) {
pthread_create(&thread_array[t], NULL, countElems, &thread_args[t]);
}
for (t = 0; t < nthreads; t++) {
pthread_join(thread_array[t], NULL);
}
free(thread_array);
free(array);
if (verbose) {
printf("Counts array:\n");
printCounts(counts);
}
return 0;
}
For reproducibility purposes, the random number generator is seeded with a static
value (10) to ensure that array
(and therefore counts
) always contains the
same set of numbers. An additional function (printCounts()
) prints out the
contents of the global 'counts' array. The expectation is that, regardless of
the number of threads used, the contents of the counts
array should always
be the same. For brevity, error handling has been removed from the listing.
Compiling the program and running it with one, two, and four threads over 10 million elements produces:
$ gcc -o countElems_p countElems_p.c -lpthread $./countElems_p 10000000 1 1 Counts array: 999170 1001044 999908 1000431 999998 1001479 999709 997250 1000804 1000207 $./countElems_p 10000000 1 2 Counts array: 661756 661977 657828 658479 657913 659308 658561 656879 658070 657276 $./countElems_p 10000000 1 4 Counts array: 579846 580814 580122 579772 582509 582713 582518 580917 581963 581094
Note that the printed results change significantly on each run. In particular, they seem to change as we vary the number of threads! This should not happen, since our use of the static seed guarantees the same set of numbers every run. These results contradict one of the cardinal rules for threaded programs: the output of a program should be correct and consistent regardless of the number of threads used.
Since our first attempt at parallelizing countElems()
doesn’t seem to be
working, let’s delve deeper into what this program is doing and examine how we
might fix it.
Data Races
To understand what’s going on, let’s consider an example run with two threads on
two separate cores of a multicore system. Recall that the execution of any thread
can be pre-empted at any time by the operating system, which means that each
thread could be running different instructions of a particular functions at any
given time (or possibly the same instruction). The table below shows one possible
path of execution through the countElems()
function. To better illustrate what
is going on, we translated the line counts[val] = counts[val] + 1
into the
following sequence of equivalent instructions:
-
read
counts[val]
and place into a register -
modify register by incrementing it by one
-
write
counts[val]
with the contents of register
This is known as the read-modify-write pattern. In the example execution
shown below, each thread executes on a separate core. We start inspecting
the execution of the process at time step i, where both threads have a
val
of 1:
Time | Thread 0 | Thread 1 |
---|---|---|
i |
read counts[1] and place into core 0’s register |
… |
i+1 |
increment register by one |
read counts[1] and place into core 1’s register |
i+2 |
overwrite counts[1] with contents of register |
increment register by one |
i+3 |
… |
overwrite counts[1] with contents of register |
Suppose that prior to the execution sequence in shown above, counts[1]
contains the value 60. In time step i, Thread 0 reads counts[1]
and places
the value 60 in core 0’s register. In time step i+1, while Thread 0
increments core 0’s register by one, the current value in counts[1]
(60)
is read into core 1’s register by Thread 1. In time step i+2, Thread 0
updates counts[1]
with the value 61 while Thread 1 increments the value
stored in its local register (60) by one. The end result is that during
time step i+3, the value counts[1]
is overwritten by Thread 1 with the
value 61, not 62 as we would expect! This causes counts[1]
to essentially
"lose" an increment!
We refer to the scenario where two threads attempt to write to the same
location in memory as a data race condition. More generally, a race condition
refers to any scenario with the simultaneous execution of two operations
gives an incorrect result. Note that a simultaneous read of the
counts[1]
location would NOT in itself constitute a race condition, since
values can generally read alone from memory without issue. It was the
combination of this step with the writes to counts[1]
that caused the
incorrect result. This read/modify/write pattern is a common source of a
particular type of race condition called a data race in most threaded
programs. In our discussion of race conditions and how to fix them, we focus
on data races.
Atomic operations
An operation is defined as being atomic if threads perceive it as executing
without interruption (in other words, as an "all or nothing" action). In some
libraries, a keyword or type is used to specify that a block of computation
should be treated as being atomic. In the above example, the line |
Keep in mind that not all execution sequences of the two threads cause a race condition. Consider the following sample execution sequence of Threads 0 and 1:
Time | Thread 0 | Thread 1 |
---|---|---|
i |
read counts[1] and place into core 0’s register |
… |
i+1 |
increment register by one |
… |
i+2 |
overwrite counts[1] with contents of register |
… |
i+3 |
… |
read counts[1] and place into core 1’s register |
i+4 |
… |
increment register by one |
i+5 |
… |
overwrite counts[1] with contents of register |
In this execution sequence, Thread 1 does not read from counts[1]
until
after Thread 0 updates it with its new value (61). The end result is that
Thread 1 reads the value 61 from counts[1]
and places it into core 1’s
register during time step i+3, and writes the value 62 to counts[1]
in time
step i+5.
To fix a data race, we must first isolate the critical section, or the subset of code that must execute atomically (in isolation) to ensure correct behavior. In threaded programs, blocks of code that update a shared resource are typically identified to be critical sections.
In the countElems()
function, updates to the counts
array should be
put in a critical section to ensure that values are not lost
due to multiple threads updating the same location in memory:
long i;
for (i = start; i < end; i++) {
val = array[i];
counts[val] = counts[val] + 1; //this line needs to be protected
}
Since the fundamental problem in countElems()
is the simultaneous access of
counts
by multiple threads, a mechanism is needed to ensure that only one
thread executes within the critical section at a time. Using a synchronization
construct (like a mutex, which is covered in the next section)
will force the threads to enter the critical section sequentially.