Содержание


Инструменты ОС Linux для разработчиков приложений для ОС Windows. Часть 16. Поддержка многопоточности в POSIX API

Comments

В предыдущих статьях мы рассматривали API POSIX для работы с процессами и сигналами, но всё это относилось к посылке сигналов однопоточному приложению.

В этой статье мы рассмотрим возможности POSIX API для создания многопоточных приложений и модель отправки сигналов приложению из нескольких потоков, которая появилась только в POSIX 1003.b (расширение реального времени).

Параллельные потоки

Реализация потоков в Linux выполнена в соответствии с POSIX 1003.b. Все определения находятся в файле <pthread.h>, при этом необходимо учитывать, что развитие этой линии API началось не так давно и всё еще продолжается.

Список функций и определений, относящихся к потокам и представленный в POSIX API, намного шире по детализации и возможностям механизма потоков, реализованного в ядре Linux, так как именно POSIX API является общим стандартом для всех API подобного рода. Кроме того, этот механизм принципиально отличается от API потоков, реализованного в ОС Windows.

Кроме собственно определения потоков и операций с ними, в <pthread.h> описываются примитивы синхронизации и API для работы с ними в соответствии с стандартом реального времени POSIX 1003.b:

  • мьютексы — pthread_mutex_t;
  • блокировки чтения/записи — pthread_rwlock_t;
  • условные переменные — pthread_cond_t;
  • спин-блокировки — pthread_spinlock_t;
  • барьеры — pthread_barrier_t;

Создание потока

Новый поток создаётся следующим вызовом API:

int pthread_create( pthread_t *newthread, const pthread_attr_t *attr,
                    void *(*start_routine)(void*), void *arg );

Все примеры из этой статьи находятся в архиве upthread.tgz в разделе "Материалы для скачивания". В листинге 1 представлен простейший пример создания нового потока.

Листинг 1. Создание нового потока (файл ptid.с)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

void put_msg( char *title, struct timeval *tv ) {
   printf( "%02u:%06lu : %s\t: pid=%lu, tid=%lu\n",
           ( tv->tv_sec % 60 ), tv->tv_usec, title, getpid(), pthread_self() );
}

void *test( void *in ) {
   struct timeval *tv = (struct timeval*)in;
   gettimeofday( tv, NULL );
   put_msg( "pthread started", tv );
   sleep( 5 );
   gettimeofday( tv, NULL );
   put_msg( "pthread finished", tv );
   return NULL;
}

#define TCNT 5
static pthread_t tid[ TCNT ];
int main( int argc, char **argv, char **envp ) {
   pthread_t tid[ TCNT ];
   struct timeval tm;.
   int i;.
   gettimeofday( &tm, NULL );
   put_msg( "main started", &tm );
   for( i = 0; i < TCNT; i++ ) {
      int status = pthread_create( &tid[ i ], NULL, test, (void*)&tm );
         if( status != 0 ) perror( "pthread_create" ), exit( EXIT_FAILURE );
   };
   for( i = 0; i < TCNT; i++ )
      pthread_join( tid[ i ], NULL ); // это обычная техника ожидания!
   gettimeofday( &tm, NULL );
   put_msg( "main finished", &tm );
   return( EXIT_SUCCESS );
}

Как ни странно, но в Linux, в отличие от многих других UNIX систем, компиляция кода с подобным вызовом завершится ошибкой:

$ g++ ptid.cc -o ptid
/tmp/ccnW2hnx.o: In function `main':
ptid.cc:(.text+0x6e): undefined reference to `pthread_create'
collect2: выполнение ld завершилось с кодом возврата 1

Чтобы исправить её, потребуется явно включить библиотеку libpthread.so в сборку, как показано ниже

$ ls /usr/lib/*pthr*
/usr/lib/libgpgme-pthread.so.11      /usr/lib/libgpgme++-pthread.so.2.4.0
/usr/lib/libgpgme-pthread.so.11.6.6  /usr/lib/libpthread_nonshared.a
/usr/lib/libgpgme++-pthread.so.2     /usr/lib/libpthread.so

Достаточно будет добавить следующую опцию к вызову компилятора:

$ gcc ptid.c -lpthread -o ptid

Теперь можно запустить пример и проверить его вывод:

$ ./ptid
50:259188 : main started         : pid=13745, tid=3079214784
50:259362 : pthread started	: pid=13745, tid=3079211888
50:259395 : pthread started	: pid=13745, tid=3068722032
50:259403 : pthread started	: pid=13745, tid=3058232176
...
55:259532 : pthread finished	: pid=13745, tid=3047742320
55:259691 : pthread finished	: pid=13745, tid=3037252464
55:259936 : main finished        : pid=13745, tid=3079214784

Первый параметр вызова вызова pthread_create является адресом для хранения идентификатора создаваемого потока типа pthread_t, определённого в </usr/include/bits/pthread/types.h>.

Этот идентификатор принципиально отличается от идентификатора присваиваемого ядром, для получения которого предусмотрен вызов gettid() (показан вывод одновременно с выполнением примера выше):

$ ps -efL | grep ptid
UID        PID  PPID   LWP  C NLWP STIME TTY          TIME CMD
olej     13745  8859 13745  0    6 17:14 pts/10   00:00:00 ./ptid
olej     13745  8859 13746  0    6 17:14 pts/10   00:00:00 ./ptid
olej     13745  8859 13747  0    6 17:14 pts/10   00:00:00 ./ptid
olej     13745  8859 13748  0    6 17:14 pts/10   00:00:00 ./ptid
olej     13745  8859 13749  0    6 17:14 pts/10   00:00:00 ./ptid
olej     13745  8859 13750  0    6 17:14 pts/10   00:00:00 ./ptid

Этот же идентификатор потока (типа pthread_t) впоследствии можно получить и внутри самого потока вызовом pthread_self().

Параметры создания потока

Созданный поток может иметь много параметров, определяющих его поведение. Эти параметры описываются в атрибутной записи потока — параметре attr (2-й) при создании потока. Если в качестве этого параметра указывается NULL, то создаётся поток с параметрами по умолчанию. Основные определения (константы) для таких параметров:

enum  { /* присоединение: ённое — автономное состояние */
  PTHREAD_CREATE_JOINABLE, /* присоединённое выполнение */
  PTHREAD_CREATE_DETACHED  /* автономное выполнение */
};
enum { /* противодействие инверсии приоритетов: */
  PTHREAD_PRIO_NONE,     /* не используется          */
  PTHREAD_PRIO_INHERIT,  /* наследование приоритетов */
  PTHREAD_PRIO_PROTECT   /* граничные приоритеты     */
};
enum { /* наследование дисциплины диспетчирования от родителя */
  PTHREAD_INHERIT_SCHED,
  PTHREAD_EXPLICIT_SCHED
};
enum { /* окружение диспетчирования: */
  PTHREAD_SCOPE_SYSTEM,     /* в рамках системы  */
  PTHREAD_SCOPE_PROCESS     /* в рамках процесса */
};

Параметры определяются в структуре типа pthread_attr_t (атрибутная запись потока). В Linux этот тип определён в файле </usr/include/bits/pthread/types.h>, как показано ниже:

#define __SIZEOF_PTHREAD_ATTR_T 36
typedef union {
  char __size[__SIZEOF_PTHREAD_ATTR_T];
  long int __align;
} pthread_attr_t;

Эта структура обычно не используется напрямую, так как существует множество API для установки и чтения разных параметров из атрибутной записи потока.

При создании стандартной атрибутной записи потока (PTHREAD_JOINABLE, SCHED_OTHER, ...) она должна быть инициализирована вызовом pthread_attr_init(…).

После старта потока атрибутная запись уже не нужна и может быть инициализирована ещё раз (если предполагается ещё инициировать иные потоки) или уничтожена вызовом pthread_attr_destroy(…).

После создания атрибутной записи потока к ней можно применять множество функций pthread_attr_*(), подготавливающих нужный набор параметров атрибутов запускаемого потока:

  • int pthread_attr_getguardsize( const pthread_attr_t *attr, size_t *guardsize ) - получить размер охранной области, создаваемой для защиты от переполнения стека;
  • extern int pthread_attr_setguardsize( pthread_attr_t *attr, size_t guardsize) - установить размер охранной области, создаваемой для защиты от переполнения стека;
  • int pthread_attr_getinheritsched( const pthread_attr_t *attr, int *inherit ) - получить характер наследования (PTHREAD_INHERIT_SCHED, PTHREAD_EXPLICIT_SCHED) параметров для потока;
  • int pthread_attr_setinheritsched( pthread_attr_t attr, int inherit ) - установить характер наследования (PTHREAD_INHERIT_SCHED, PTHREAD_EXPLICIT_SCHED) параметров для потока;
  • int pthread_attr_getscope( const pthread_attr_t *attr, int *scope ) - получить область диспетчирования для потока (PTHREAD_SCOPE_SYSTEM, PTHREAD_SCOPE_PROCESS);
  • int pthread_attr_setscope( pthread_attr_t *attr, int scope) - установить область диспетчирования для потока (PTHREAD_SCOPE_SYSTEM, PTHREAD_SCOPE_PROCESS);
  • int pthread_attr_getstackaddr( const pthread_attr_t *attr, void **stackaddr ) - получить адрес, ранее установленный для стека;
  • int pthread_attr_setstackaddr( pthread_attr_t *attr, void *stackaddr ) - установить адрес стека, минимальный размер кадра стека PTHREAD_STACK_MIN;
  • int pthread_attr_getstacksize( const pthread_attr_t *attr, size_t *stacksize ) - получить текущий установленный минимальный размер стека;
  • int pthread_attr_setstacksize( pthread_attr_t *attr, size_t __stacksize) - добавить информацию о минимальном стеке, необходимом для старта потока; этот размер не может быть менее PTHREAD_STACK_MIN и не должен превосходить установленные в системе пределы;
  • int pthread_getattr_np( pthread_t th, pthread_attr_t *attr ) - инициализировать атрибутную запись нового потока в соответствии с атрибутной записью ранее существующего (клонирование);

По умолчанию поток создаётся, как присоединённый, но может быть позже отсоединён вызовом pthread_detach. При этом перевести его обратно в состояние присоединённости (PTHREAD_JOINABLE) будет уже невозможно.

Временные затраты на создание потока

Сравним затраты времени на создание нового процесса с затратами на создание нового процесса:

Листинг 2. Создание нового потока (файл p2-2.c)
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include "libdiag.h"

static uint64_t tim;
void* threadfunc ( void* data ) {
   tim = rdtsc() - tim;
   pthread_exit( NULL );
   return NULL;
};

int main( int argc, char *argv[] ) {
   tim = rdtsc();
   pthread_t tid;
   pthread_create( &tid, NULL, threadfunc, NULL );
   pthread_join( tid, NULL );
   printf( "thread create time : %llu\n", tim );
   exit( EXIT_SUCCESS );
};

Выполним сравнение, запустив несколько потоков и процессов. Приложение p2-1 создаёт процессы, а приложение p2-2 - потоки, запуски чередуются, чтобы уменьшить влияние кэширования страниц памяти:

$ ./p2-1
process create time : 525430
$ ./p2-2
thread create time : 314980
$ ./p2-1
process create time : 2472100
$ ./p2-2
thread create time : 362210

Результаты в обоих случаях оказались практически идентичными в пределах статистической погрешности. Поэтому можно сделать вывод, что процессы создания и потока и процесса требуют одинаковых затрат времени (вопреки многим утверждениям в учебниках).

Активность потока

Вызовом pthread_yield поток может передать управление другому потоку. Но какому потоку будет передана активность (этого процессора) — вопрос непредсказуемый! Это может быть даже тот же самый поток, только что выполнивший pthread_yield().

К такому же результату (передача активности иному потоку) приведёт и любой вызов, переводящий поток в блокированное (пассивное) состояние, например sleep(), pause() и подобные им.

Завершение потока

В POSIX API существует гораздо больше условий и возможностей для завершения потока, чем для его запуска. Созданный поток завершается в одном из следующих случаев:

  1. сам поток вызывает pthread_exit() и завершается со статусом завершения, доступным другому потоку по вызову ожидания pthread_join();
  2. поток осуществляет возврат из функции потока, это эквивалентно pthread_exit(), при этом возвращаемое значение является кодом возврата;
  3. поток завершается по pthread_cancel() (этот сценарий будет рассматриваться в следующих разделах);
  4. какой либо поток процесса вызывает exit(), или сама главная программа main завершается, при этом все порождённые потоки процесса также завершаются.

При завершении потока в нём вызывается метод pthread_exit( … ), а в вызывающем потоке, который ожидает завершения, вызывается метод pthread_join( … ).

Поведение потока при завершении определяется ещё одной группой параметров, задаваемых в атрибутной записи потока pthread_attr_t:

enum { /* cancellation — состояние завершаемости */
  PTHREAD_CANCEL_ENABLE,    /* завершение разрешено */
  PTHREAD_CANCEL_DISABLE    /* завершение запрещено */
};
enum { /* тип завершаемости */
  PTHREAD_CANCEL_DEFERRED,    /* только в точках завершаемости     */
  PTHREAD_CANCEL_ASYNCHRONOUS /* немедленное, в произвольном месте */
};

И группой соответствующих вызовов в API:

int pthread_setcancelstate( int state, int *oldstate );
int pthread_setcanceltype( int type, int *oldtype);

Остановить поток немедленно или при ближайшей возможности (в точке завершаемости) можно вызовом pthread_cancel( … ).

Вызовы для работы со стеком процедур завершения:

void pthread_cleanup_push( void(*routine)(void*), void *arg );
void pthread_cleanup_push( int exec );

На самом деле такие вызовы определены как макросы, хотя это и не меняет техники их использования:

#define pthread_cleanup_push( routine, arg )
#define pthread_cleanup_pop( execute )

При этом обязательно, чтобы использование в коде pthread_cleanup_push и pthread_cleanup_push было парным. Первый из этих вызовов добавляет новую процедуру завершения в стек, а второй — выталкивает последнюю находящуюся процедуру завершения из стека, и если параметр вызова не нулевой — выполняет эту процедуру.

Собственные данные потока

Техника создания собственных данных потоков (TSD - thread specific data) создаёт по одному экземпляру каждого вида данных. В стандарте POSIX указывается, что максимальное число видов данных (тип pthread_key_t) не превышает 128. Последовательность действий при создании TSD:

  1. Поток запрашивает pthread_key_create() для создания ключа доступа к блоку данных определённого типа; если потоку нужно иметь несколько блоков данных разной типизации (назначения), он выполняет такой вызов необходимое число раз для каждого типа.
  2. Некоторая сложность заключается в том, что запросить распределение ключа должен только один поток, первым достигший точки распределения. Последующие потоки должны только воспользоваться ранее распределённым значением ключа. Для разрешения этой сложности вводится вызов pthread_once().
  3. Теперь каждый поток, использующий подобный блок данных, должен запросить специфический экземпляр данных по pthread_getspecific() и, если он убеждается, что это NULL, то запросить распределение блока для этого значения ключа по pthread_setspecific().
  4. В дальнейшем поток (и все вызываемые из него функции) может работать со своим собственным экземпляром, запрашивая его вызовом pthread_getspecific().
  5. При завершении потока система уничтожает и его экземпляр данных. При этом вызывается деструктор пользователя, который устанавливается при создании ключа pthread_key_create(). Деструктор (как функция) является общим для всех экземпляров данных во всех потоках для этого значения ключа (pthread_key_t), но он получает значение указателя на индивидуальный экземпляр данных завершаемого потока в виде входного параметра.

В листинге 3 представлен пример, в котором демонстрируются разного рода данные, которые могут использоваться потоком:

  • параметр, передаваемый функции потока в стеке (и точно так же локальные данные функции потока);
  • глобальные данные, доступные всем потокам;
  • экземпляр собственных данных потока.
Листинг 3. Использование собственных данных потока (файл own.c)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

static int global = 0;
static  pthread_key_t key;
typedef struct data_bloc {                 // наш собственный тип данных
   pthread_t tid;
} data_t;

void put_msg( int param ) {
   printf( "global=%u , parameter=%u , own=%lu\n",
           global, param, ((data_t*)pthread_getspecific( key ))->tid );
}

static pthread_once_t once = PTHREAD_ONCE_INIT;
static void destructor( void* db ) {       // деструктор собственных данных
   data_t *p = (data_t*)db;
   free( p );
}

static void once_creator( void ) { // создаёт единый на процесс ключ для данных data_
   pthread_key_create( &key, destructor );
}

void* thread_proc( void *data ) {   // функция потока
   int param = (int)data;
   global++;
   pthread_once( &once,  once_creator );  // гарантия уникальности создания ключа
   pthread_setspecific( key, malloc( sizeof( data_t ) ) );
   data_t *db = pthread_getspecific( key );
   db->tid = pthread_self();
   put_msg( param );
   return NULL;
}

int main( int argc, char **argv, char **envp ) {
#define TCNT 5
   pthread_t tid[ TCNT ];
   int i;
   for( i = 0; i < TCNT; i++ )
      pthread_create( &tid[ i ], NULL, thread_proc, (void*)( i + 1 ) );
   for( i = 0; i < TCNT; i++ ).
      pthread_join( tid[ i ], NULL );
   return( EXIT_SUCCESS );
}

Выполним несколько запусков приложения, и получим различающиеся результаты для двух последовательно выполненных прогонов.

$ ./own
global=1 , parameter=4 , own=3047005040
global=2 , parameter=3 , own=3057494896
global=3 , parameter=1 , own=3078474608
global=4 , parameter=5 , own=3036515184
global=5 , parameter=2 , own=3067984752
$ ./own
global=4 , parameter=1 , own=3078527856
global=5 , parameter=4 , own=3042863984
global=4 , parameter=3 , own=3057548144
global=4 , parameter=2 , own=3068038000
global=5 , parameter=5 , own=3030383472

Сигналы в потоках

В POSIX нельзя отправлять сигналы отдельным потокам внутри процесса, так как сигналы направляются только процессу в целом, как оболочке, обрамляющей несколько потоков. Точно так же, для каждого сигнала может быть переопределена функция-обработчик, но это переопределение действует глобально в рамках процесса.

Тем не менее, каждый из потоков (в том числе и главный поток процесса main()) могут независимо определить (в терминах модели надёжной обработки сигналов, сигнальных наборов) собственную маску реакции на сигналы. Благодаря этому в POSIX API возможно:

  • распределить потоки, ответственные за обработку каждого сигнала (замаскировать нечувствительность к отдельным сигналам);
  • динамически изменять потоки, в которых (в контексте которых) обрабатывается реакция на сигнал;
  • создавать обработчики сигналов в виде отдельных специальных потоков.

Ниже показан многопоточный пример (3 потока запускаются из главного потока main), в котором направляемая извне (из другой консоли) последовательность повторяемого сигнала поочерёдно обрабатывается каждым из дочерних потоков по 1-му разу, после чего реакция на сигнал блокируется.

Листинг 4. Обработка сигналов в потоках (файл s6.cc)
#include <iostream>
#include <iomanip>
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
using namespace std;

static void handler( int signo, siginfo_t* info, void* context ) {
   cout << "sig=" << signo << "; tid=" << pthread_self() << endl;
};

sigset_t sig;
void* threadfunc ( void* data ) {
   sigprocmask( SIG_UNBLOCK, &sig, NULL );
   while( true ) {
      pause();
      sigprocmask( SIG_BLOCK, &sig, NULL );
   }
   return NULL;
};

int main() {
   const int thrnum = 3;
   sigemptyset( &sig );
   sigaddset( &sig, SIGRTMIN );
   sigprocmask( SIG_BLOCK, &sig, NULL );
   cout << "main + " << thrnum << " threads : waiting fot signal " << SIGRTMIN
        << "; pid=" << getpid() << "; tid(main)=" << pthread_self() << endl;
   struct sigaction act;
   act.sa_mask = sig;
   act.sa_sigaction = handler;
   act.sa_flags = SA_SIGINFO;
    if( sigaction( SIGRTMIN, &act, NULL ) < 0 ) perror( "set signal handler: " );
   pthread_t pthr;
   for( int i = 0; i < thrnum; i++ )
      pthread_create( &pthr, NULL, threadfunc, NULL );
   pause();
};

Запустим данный пример и изучим полученные результаты:

$ ./s6
main + 3 threads : waiting fot signal 34; pid=7455; tid(main)=3078510288
sig=34; tid=3078503280
sig=34; tid=3068013424
sig=34; tid=3057523568
^C
$ kill -34 7455
$ kill -34 7455
$ kill -34 7455
$ kill -34 7455
$ kill -34 7455

Можно сделать следующие выводы:

  • главный поток процесса не реагирует на получаемые извне сигналы, его реакция изначально заблокирована;
  • tid потока, который принимает каждый последующий сигнал, отличается от предыдущего;
  • после 3-х реакций, обслуженных в каждом из 3-х потоков, реагирование на этот сигнал прекращается (блокируется).

Пользуясь гибкостью API и расширениями реального времени POSIX 1003.b, можно реализовать обработку получаемых сигналов в отдельных потоках, вообще без специальных обработчиков, правда, с некоторыми ограничениями на операции в контексте сигнального обработчика. В листинге 4 процесс приостанавливается до тех пор, пока поток обработчика сигналов не сообщит о завершении.

Листинг 5. Альтернативный подход к обработке сигналов (файл sigthr.c)
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <signal.h>
#include <pthread.h>

int quitflag = 0;
sigset_t mask;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wait = PTHREAD_COND_INITIALIZER;

void* threadfunc ( void* data ) {
   int signo;
   while( 1 ) {
      if( sigwait( &mask, &signo ) != 0 )
         perror( "sigwait:" ), exit( EXIT_FAILURE );
      switch( signo ) {
         case SIGINT:
            printf( "  ... signal SIGINT\n" );
            break;
         case SIGQUIT:
            printf( "  ... signal SIGQUIT\n" );
            pthread_mutex_lock( &lock );
            quitflag = 1;
            pthread_mutex_unlock( &lock );
            pthread_cond_signal( &wait );
            return NULL;
         default:
            printf( "undefined signal %d\n", signo ), exit( EXIT_FAILURE );
      }
   };
};

int main() {
   printf( "process started with PID=%d\n", getpid() );
   sigemptyset( &mask );
   sigaddset( &mask, SIGINT );
   sigaddset( &mask, SIGQUIT );
   sigset_t oldmask;
   if( sigprocmask( SIG_BLOCK, &mask, &oldmask ) < 0 )
      perror( "signals block:" ), exit( EXIT_FAILURE );
   pthread_t tid;
   if( pthread_create( &tid, NULL, threadfunc, NULL ) != 0 )
      perror( "thread create:" ), exit( EXIT_FAILURE ); ;
   pthread_mutex_lock( &lock );
   while( 0 == quitflag )
      pthread_cond_wait( &wait, &lock );
   pthread_mutex_unlock( &lock );
   /* SIGQUIT был перехвачен, но к этому моменту снова заблокирован */
   if( sigprocmask( SIG_SETMASK, &oldmask, NULL ) < 0 )
      perror( "signals set:" ), exit( EXIT_FAILURE );
   return EXIT_SUCCESS;
};

Изменение флага quitflag производится под защитой мьютекса lock, чтобы главный поток не мог пропустить изменение значения флага.

Проверим, что наше приложение по-прежнему справляется с обработкой сигналов:

$ ./sigthr
^C  ... signal SIGINT
^C  ... signal SIGINT
^C  ... signal SIGINT
^\  ... signal SIGQUIT

Заключение

В этой статье мы рассмотрели сложную тему: создание многопоточных приложений средствами POSIX API и работа с сигналами в контексте потоков.

В следующей статье мы завершим знакомство со специфическими возможностями POSIX API на примере использования библиотек ввода / вывода.


Ресурсы для скачивания


Похожие темы


Комментарии

Войдите или зарегистрируйтесь для того чтобы оставлять комментарии или подписаться на них.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Linux, Open source
ArticleID=969423
ArticleTitle=Инструменты ОС Linux для разработчиков приложений для ОС Windows. Часть 16. Поддержка многопоточности в POSIX API
publish-date=04242014