Многопоточные структуры данных для параллельных вычислений

Часть 1, Разработка параллельных структур данных

Сейчас стало модно рассуждать о параллельных вычислениях. В этой первой из двух статей о многопоточных структурах я расскажу о проектировании параллельных структур данных в многопоточной среде с помощью библиотеки POSIX.

Арпан Сен, технический директор, Synapti Computer Aided Design Pvt Ltd

Арпан Сен (Arpan Sen) – ведущий инженер, работающий над разработкой программного обеспечения в области автоматизации электронного проектирования. На протяжении нескольких лет он работал над некоторыми функциями UNIX, в том числе Solaris, SunOS, HP-UX и IRIX, а также Linux и Microsoft Windows. Он проявляет живой интерес к методикам оптимизации производительности программного обеспечения, теории графов и параллельным вычислениям. Арпан является аспирантов в области программных систем.



06.06.2012

Введение

Итак, теперь ваш компьютер оборудован четырехъядерным процессором; и поскольку сейчас все говорят о параллельных вычислениях, вам тоже не терпится вступить в игру. Но параллельные вычисления — это не только применение мьютексов и условных переменных в произвольных функциях и методах. Один из основных инструментов, который должен иметься в арсенале разработчика на языке C++, — это возможность проектирования параллельных структур данных. Эта статья, первая в серии из двух статей, посвящена проектированию параллельных структур данных в многопоточной среде. В этой статье вы будете пользоваться библиотекой POSIX Threads (известной также как Pthreads; ссылка приведена в Ресурсах), но можно использовать и такие реализации, как Boost Threads (ссылка приведена в Ресурсах).

Предполагается, что вы обладаете базовыми знаниями структур данных и знакомы с библиотекой POSIX Threads. Также вы должны иметь представление о создании потоков, мьютексов и условных переменных. Из набора Pthreads мы достаточно часто будем использовать в приведенных примерах функции pthread_mutex_lock, pthread_mutex_unlock, pthread_cond_wait, pthread_cond_signal и pthread_cond_broadcast.


Проектирование параллельной очереди

Мы начнем с расширения одной из базовых структур данных — очереди. Ваша очередь строится на основе связного списка, а интерфейс самого списка опирается на стандартную библиотеку шаблонов (STL; см. Ресурсы). Несколько управляющих потоков могут пытаться одновременно добавить данные в очередь или удалить их, поэтому вам понадобится объект для управления синхронизацией — мьютекс. Ответственность за создание и уничтожения мьютекса несут конструктор и деструктор класса очереди, как показано в листинге 1.

Листинг 1. Связный список и параллельная очередь на основе мьютекса
#include <pthread.h>
#include <list.h> // вы можете использовать std::list или свою реализацию 

namespace concurrent { 
template <typename T>
class Queue { 
public: 
   Queue( ) { 
       pthread_mutex_init(&_lock, NULL); 
    } 
    ~Queue( ) { 
       pthread_mutex_destroy(&_lock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
}

};

Вставка данных и удаление данных из параллельной очереди

Ясно, что вставка данных в очередь напоминает добавление данных в список, и эта операция должна контролироваться захватом мьютекса. Но что произойдет, если в очередь захотят добавить данные несколько потоков? Первый поток захватит мьютекс и добавит данные в очередь, тогда как другие будут ждать своей очереди. Решение о том, какой поток будет добавлять данные следующим после того, как первый поток освободит мьютекс, принимает операционная система. Обычно в системе Linux®, где отсутствуют потоки с приоритетами реального времени, следующим получит доступ к мьютексу и добавит данные в очередь тот поток, который ждет дольше всех. В листинге 2 показана первая рабочая версия этого кода.

Листинг 2. Добавление данных в очередь
void Queue<T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       _list.push_back(value);
       pthread_mutex_unlock(&_lock);
}

Аналогично выглядит код для извлечения данных из очереди, показанный в листинге 3.

Листинг 3. Извлечение данных из очереди
T Queue<T>::pop( ) { 
       if (_list.empty( )) { 
           throw ”элемент не найден”;
       }
       pthread_mutex_lock(&_lock); 
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);
       return _temp;
}

Справедливости ради следует отметить, что код в листинге 2 и листинге 3 работает хорошо. Но представьте себе такую ситуацию: у вас есть длинная очередь (скажем, больше 100000 элементов), и в некоторый момент число потоков, считывающих данные из очереди, становится значительно больше числа потоков, вносящих данные. Поскольку для внесения и извлечения данных используется один и тот же мьютекс, скорость чтения будет несколько снижаться, когда мьютекс захватывают пишущие потоки. А что если использовать два мьютекса — один для операции чтения, а другой для операции записи? В листинге 4 показан видоизмененный класс Queue (очередь).

Листинг 4. Параллельная очередь с отдельными мьютексами для чтения и записи
template <typename T>
class Queue { 
public: 
   Queue( ) { 
       pthread_mutex_init(&_rlock, NULL); 
       pthread_mutex_init(&_wlock, NULL);
    } 
    ~Queue( ) { 
       pthread_mutex_destroy(&_rlock);
       pthread_mutex_destroy(&_wlock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _rlock, _wlock;
}

В листинге 5 показано определение метода добавления/извлечения.

Листинг 5. Операции добавления/извлечения для параллельной очереди с раздельными мьютексами
void Queue<T>::push(const T& value ) { 
       pthread_mutex_lock(&_wlock);
       _list.push_back(value);
       pthread_mutex_unlock(&_wlock);
}

T Queue<T>::pop( ) { 
       if (_list.empty( )) { 
           throw ”элемент не найден”;
       }
       pthread_mutex_lock(&_rlock);
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_rlock);
       return _temp;
}

Проектирование параллельной очереди с блокировкой

До сих пор, если читающий поток хотел считать данные из пустой очереди, мы просто генерировали ошибку и двигались дальше. Такой подход не всегда приемлем. Может быть предпочтительнее, чтобы читающий поток подождал или заблокировался до появления данных. Очередь такого типа называется очередью с блокировкой. Что будет делать читающий процесс, если обнаружит, что очередь пуста? Одним из вариантов является периодический опрос очереди. Но поскольку такой подход не гарантирует наличия данных в очереди, он может нерационально растрачивать вычислительные ресурсы ЦП. Вместо этого рекомендуется использовать условные переменные, то есть переменные типа pthread_cond_t. Прежде чем углубиться в семантику, давайте взглянем на видоизмененное определение очереди, показанное в листинге 6.

Листинг 6. Параллельная очередь с блокировкой, использующая условные переменные
template <typename T>
class BlockingQueue { 
public: 
   BlockingQueue ( ) { 
       pthread_mutex_init(&_lock, NULL); 
       pthread_cond_init(&_cond, NULL);
    } 
    ~BlockingQueue ( ) { 
       pthread_mutex_destroy(&_lock);
       pthread_cond_destroy(&_cond);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
    pthread_cond_t _cond;
}

В листинге 7 показана видоизмененная версия операции извлечения для очереди с блокировкой.

Листинг 7. Извлечение данных из очереди
T BlockingQueue<T>::pop( ) { 
       pthread_mutex_lock(&_lock);
       if (_list.empty( )) { 
           pthread_cond_wait(&_cond, &_lock) ;
       }
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);
       return _temp;
}

Теперь, вместо того чтобы сообщать об ошибке, если очередь пуста, читающий поток блокируется по условной переменной. Опосредованно pthread_cond_wait также освобождает мьютекс _lock. Теперь давайте рассмотрим такую ситуацию: имеется два читающих потока и пустая очередь. Первый читающий поток захватывает мьютекс, понимает, что очередь пуста, и блокирует себя согласно _cond, что опосредованно освобождает мьютекс. Второй читающий поток делает то же самое. В результате вы получаете два читающих потока, ожидающих условную переменную, а мьютекс остается свободным.

Теперь взгляните на определение метода push(), показанное в листинге 8.

Листинг 8. Занесение данных в очередь с блокировкой
void BlockingQueue <T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       const bool was_empty = _list.empty( );
       _list.push_back(value);
       pthread_mutex_unlock(&_lock);
       if (was_empty) 
           pthread_cond_broadcast(&_cond);
}

Если изначально список был пуст, вы вызываете pthread_cond_broadcast, чтобы занести данные в список. Эта операция пробуждает все читающие потоки, ожидающие условную переменную _cond; теперь читающие потоки начинают опосредованно бороться за доступ к мьютексу, как только он освободится. Поток, который следующим получит доступ к мьютексу, определяется планировщиком операционной системы — обычно первым считывает данные тот поток, который ждет дольше всех.

Вот несколько тонкостей, относящихся к параллельным очередям с блокировкой:

  • Вместо pthread_cond_broadcast можно использовать pthread_cond_signal. Однако pthread_cond_signal разблокирует по меньшей мере один из потоков, ожидающих условную переменную, и не обязательно тот, который ждет дольше всех. И хотя работа очереди с блокировкой от этого не нарушается, применение pthread_cond_signal потенциально может привести к неприемлемо большому времени ожидания для некоторых читающих потоков.
  • Возможно спонтанное пробуждение потоков. Поэтому после пробуждения читающего потока убедитесь, что список не пуст и только после этого идите дальше. В листинге 9 показана немного измененная версия метода pop(); настоятельно рекомендуется использовать версию pop() на основе цикла while.
Листинг 9. Извлечение данных из очереди с учетом спонтанных пробуждений
T BlockingQueue<T>::pop( ) { 
pthread_cond_wait(&_lock) ; //нужен writer(s)  для захвата и ожидания условий
while(_list.empty( )) { 
pthread_cond_wait(&_cond,&_lock) ;
}
T _temp = _list.front( );
_list.pop_front( );
pthread_mutex_unlock(&_lock);
return _temp;
}

Проектирование параллельной очереди с блокировкой и таймаутами

Существует множество систем, которые, если не смогут обработать данные в течение некоторого времени, то не будут обрабатывать их вообще. Хорошим примером такой системы является бегущая строка новостного канала, отображающая текущие курсы акций, причем новые данные поступают каждые n секунд. Если некоторые предшествующие данные не удается обработать за n секунд, то лучше их отбросить и отобразить более новую информацию. Вооружившись этой идеей, давайте взглянем на концепцию параллельной очереди, в которой операции занесения и извлечения снабжены таймаутами. Это значит, что если система не сможет выполнить операцию занесения или извлечения в указанное вами время, она не будет выполняться вообще. Такой интерфейс показан в листинге 10.

Листинг 10. Параллельная очередь с ограниченными по времени операциями занесения и извлечения
template <typename T>
class TimedBlockingQueue { 
public: 
   TimedBlockingQueue ( );
    ~TimedBlockingQueue ( );
    bool push(const T& data, const int seconds);
    T pop(const int seconds); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
    pthread_cond_t _cond;
}

Давайте начнем с ограниченного по времени метода push(). Теперь метод push() не зависит от условных переменных, что исключает лишнее ожидание. Единственной причиной задержки может быть большое количество пишущих потоков, в результате чего может пройти достаточно много времени до получения доступа к мьютексу. Так почему бы не повысить приоритет пишущего потока? Причина в том, что повышение приоритета пишущего потока не решит проблемы, если все пишущие потоки будут иметь повышенный приоритет. Вместо этого попробуйте создать новые пишущие потоки с более высокими приоритетами и передайте данные этим потокам, которые всегда будут заноситься в очередь. Такой код показан в листинге 11.

Листинг 11. Занесение данных в очередь с блокировкой и таймаутами
bool TimedBlockingQueue <T>::push(const T& data, const int seconds) {
       struct timespec ts1, ts2;
       const bool was_empty = _list.empty( );
       clock_gettime(CLOCK_REALTIME, &ts1);
       pthread_mutex_lock(&_lock);
       clock_gettime(CLOCK_REALTIME, &ts2);
       if ((ts2.tv_sec – ts1.tv_sec) <seconds) {
       was_empty = _list.empty( );
       _list.push_back(value);
       {
       pthread_mutex_unlock(&_lock);
       if (was_empty) 
           pthread_cond_broadcast(&_cond);
}

Процедура clock_gettime возвращает в структуру timespec время, прошедшее с момента epoch (подробнее это описано в Ресурсах). Эта процедура вызывается дважды — до и после получения доступа к мьютексу — чтобы по прошедшему времени определить, требуется ли дальнейшая обработка.

Извлечение данных с таймаутом сложнее занесения; заметьте, что читающий поток ждет условную переменную. Первая проверка аналогична push(). Если таймаут возникнет до того, как читающий поток получит доступ к мьютексу, то ничего делать не надо. Затем читающий поток должен убедиться в том, что он ждет условную переменную не дольше указанного таймаута (это вторая проверка, которую нужно выполнить). Если читающий поток не был пробужден другим способом, по истечении таймаута он должен пробудить себя сам и отпустить мьютекс.

Учитывая сказанное, давайте рассмотрим функцию pthread_cond_timedwait, которая используется для второй проверки. Эта функция подобна pthread_cond_wait за исключением того, что третий аргумент является абсолютным моментом времени, до которого будет ждать читающий поток, прежде чем отказаться от дальнейших попыток. Если читающий поток пробудится до таймаута, функция pthread_cond_timedwait возвратит 0. Этот код показан в листинге 12.

Листинг 12. Извлечение данных из очереди с блокировкой и таймаутами
T TimedBlockingQueue <T>::pop(const int seconds) { 
       struct timespec ts1, ts2; 
       clock_gettime(CLOCK_REALTIME, &ts1); 
       pthread_mutex_lock(&_lock);
       clock_gettime(CLOCK_REALTIME, &ts2);

       // First Check 
       if ((ts1.tv_sec – ts2.tv_sec) < seconds) { 
           ts2.tv_sec += seconds; // определяет время пробуждения
           while(_list.empty( ) && (result == 0)) { 
               result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;
           }
           if (result == 0) { // Вторая проверка 
               T _temp = _list.front( );
              _list.pop_front( );
              pthread_mutex_unlock(&_lock);
              return _temp;
          }
      }
      pthread_mutex_unlock(&lock);
      throw “timeout happened”;
}

Цикл while в листинге 12 гарантирует корректную обработку спонтанных пробуждений. И, наконец, в некоторых системах Linux clock_gettime может быть частью librt.so, поэтому может понадобиться добавить ключ –lrt в командную строку компилятора.

Применение API pthread_mutex_timedlock

Одно из слабых мест кода в листинге 11 и листинге 12 в том, что к тому моменту, когда поток наконец получит доступ к мьютексу, может уже произойти таймаут. В результате все, что ему остается — это отпустить мьютекс. Можно дополнительно оптимизировать эту ситуацию, применив API pthread_mutex_timedlock, если ваша система его поддерживает (см. Ресурсы).Эта процедура принимает два аргумента. Второй из них является абсолютным временем, по достижении которого, если доступ к мьютексу не получен, процедура возвращает ненулевое состояние. Таким образом, применение этой процедуры сокращает число ждущих потоков в системе. Ниже приведено объявление этой процедуры:

int pthread_mutex_timedlock(pthread_mutex_t *mutex,
       const struct timespec *abs_timeout);

Проектирование ограниченной параллельной очереди с блокировкой

Завершим наше обсуждение рассмотрением ограниченной параллельной очереди с блокировкой. Очередь этого типа подобна параллельной очереди с блокировкой за тем исключением, что размер очереди ограничен. Существует множество встроенных систем, память которых ограничена и для которых реально нужны очереди с ограниченными размерами.

В очереди с блокировкой только читающий поток должен ждать, если очередь пуста. В ограниченной очереди с блокировкой пишущий поток тоже должен ждать, если очередь заполнена. Внешний интерфейс такой очереди напоминает интерфейс очереди с блокировкой, как показано в листинге 13. (Заметьте, что в данном случае используется не список, а вектор. Можно использовать базовый массив C/C++, инициализировав его с подходящим размером).

Листинг 13. Ограниченная параллельная очередь с блокировкой
template <typename T>
class BoundedBlockingQueue { 
public: 
   BoundedBlockingQueue (int size) : maxSize(size) { 
       pthread_mutex_init(&_lock, NULL); 
       pthread_cond_init(&_rcond, NULL);
       pthread_cond_init(&_wcond, NULL);
       _array.reserve(maxSize);
    } 
    ~BoundedBlockingQueue ( ) { 
       pthread_mutex_destroy(&_lock);
       pthread_cond_destroy(&_rcond);
       pthread_cond_destroy(&_wcond);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    vector<T> _array; // или T* _array, если вы так предпочитаете
    int maxSize;
    pthread_mutex_t _lock;
    pthread_cond_t _rcond, _wcond;
}

Прежде чем приступить к пояснению операции занесения, давайте взглянем на код, приведенный в листинге 14.

Листинг 14. Занесение данных в ограниченную очередь с блокировкой
void BoundedBlockingQueue <T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       const bool was_empty = _array.empty( );
       while (_array.size( ) == maxSize) { 
           pthread_cond_wait(&_wcond, &_lock);
       } 
       _ array.push_back(value);
      pthread_mutex_unlock(&_lock);
      if (was_empty) 
          pthread_cond_broadcast(&_rcond);
}

Существует ли общая идея блокировки, применимая к другим структурам данных?

Конечно. Но оптимален ли этот подход? Нет. Давайте рассмотрим связный список, который может использоваться несколькими потоками. В отличие от очереди список не имеет единой точки вставки или удаления, и применение одного мьютекса для контроля доступа к списку позволит создать действующую, но медленную систему. Другой возможный способ реализации заключается в применении отдельных мьютексов для каждого узла, но это, очевидно, приведет к повышенному расходу системной памяти. Некоторые из этих проблем мы обсудим во второй части этой серии статей.

Первое, на что нужно обратить внимание в листинге 13 и листинге 14, — это то, что в отличие от очереди с блокировкой используется не одна, а две условные переменные. Если очередь заполнена, пишущий поток ждет условную переменную _wcond; читающий поток после извлечения данных из очереди должен известить об этом все потоки. Аналогично, если очередь пуста, читающий поток будет ждать условную переменную _rcond, а пишущий поток после вставки данных в очередь отправит уведомление всем потокам, ждущим переменную _rcond. Но что будет, если потоков, ждущих _wcond или _rcond, нет, но появляется широковещательное уведомление? К счастью, не произойдет ничего; система просто проигнорирует эти сообщения. Заметьте также, что обе условные переменные используют один и тот же мьютекс. В листинге 15 показан код метода pop() для ограниченной очереди с блокировкой.

Листинг 15. Извлечение данных из ограниченной очереди с блокировкой
T BoundedBlockingQueue<T>::pop( ) { 
       pthread_mutex_lock(&_lock);
       const bool was_full = (_array.size( ) == maxSize);
       while(_array.empty( )) { 
           pthread_cond_wait(&_rcond, &_lock) ;
       }
       T _temp = _array.front( );
       _array.erase( _array.begin( ));
       pthread_mutex_unlock(&_lock);
       if (was_full)
           pthread_cond_broadcast(&_wcond);
       return _temp;
}

Обратите внимание, что после освобождения мьютекса вызывается функция pthread_cond_broadcast. Это рекомендуется делать, потому что время ожидания читающего потока после пробуждения сокращается.


Заключение

В этом выпуске обсуждалось несколько новых типов параллельных очередей и рассматривались методы их реализаций. Список возможных вариантов этим не ограничивается. Например, можно создать очередь, в которой читающим потокам разрешено считывать данные лишь через некоторое время после их занесения. Обязательно загляните в раздел Ресурсы и ознакомьтесь с подробным описанием потоков POSIX и алгоритмами работы параллельных очередей.

Ресурсы

Научиться

Получить продукты и технологии

Обсудить

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=AIX и UNIX
ArticleID=819990
ArticleTitle=Многопоточные структуры данных для параллельных вычислений
publish-date=06062012