Тонкости использования языка Python: Часть 4. Параллельное исполнение

Статья является частью миницикла, посвященного тонким вопросам применения Python и разработки приложений на этом языке. В статье разбираются различные механизмы для реализации параллельного исполнения ветвей кода, доступные в Python.

Олег Цилюрик, преподаватель тренингового отделения, Global Logic

Фото автораОлег Иванович Цилюрик, много лет был разработчиком программного обеспечения в крупных центрах разработки: ВНИИ РТ, НПО "Дельта", КБ ПМ. Последние годы работал над проектами в области промышленной автоматики, IP телефонии и коммуникаций. Автор нескольких книг. Преподаватель тренингового отделения международной софтверной компании Global Logic.



05.12.2013

Введение

Существует большое количество публикаций, описывающих каким образом можно обеспечить параллельное выполнение различных ветвей кода в Python. В последующих двух статьях мы подробно опишем эти возможности и проанализируем, к каким последствиям может привести их применение.

Параллельное исполнение в современных инструментальных средствах реализуется двумя фундаментальными механизмами: параллельными процессами и параллельными потоками (thread), а все остальные способы являются комбинациями этих вариантов.

Мы начнём изучение аспектов реализации параллельного выполнения в Python именно с потоков, чтобы затем вернуться к процессам. Смысл выбранного порядка рассмотрения, отличающийся от используемого в большинстве публикаций, станет ясен в ходе дальнейшего изложения.

Понятия процесс (process) и поток (thread) окончательно разделились, когда аппаратные средства процессоров начали поддерживать защищённый режим с контролем прав (для x86, например, начиная с 386), а операционные системы смогли реализовать изолированные адресные пространства для каждой пользовательской задачи (на основе этой защищённости). Для оборудования, не контролирующего доступ (например, PDP-11, AVR), и в операционных системах, не разграничивающих адресных пространств (pSOS, VxWorks) эти два понятия (process и thread) вырождаются в единую сущность, в некоторых системах называемую задача (task).

Потоки

Механизм потоков в Python реализуется несколькими различающимися модулями, входящими в состав стандартной поставки Python, о чём часто не упоминается в публикациях. Как минимум, это низкоуровневый механизм из модуля thread и механизм более высокого уровня из модуля threading. Именно последний модуль чаще всего имеется в виду при обсуждении реализации потоков в Python.

Какой бы механизм не использовался (модуль threading экспортирует свои базовые элементы из низкоуровневого thread — ниже будут показаны оба), общие принципы работы с потоками в Python остаются неизменными. Потоки в Python не могут быть реализованы с использованием механизмов операционной системы с вытесняющей многозадачностью (preemptive multitasking) и диспетчированием по системному таймеру.

Это связано с тем, что код приложения должен выполняться внутри виртуальной машины интерпретатора Python, который сам по себе не является многопоточным и может исполнять только один поток программы. Интерпретатор Python сам переключает потоки в исполняемом коде, но может делать это только после завершения очередного оператора исполняемого байт-кода (virtual instructions, как это названо в документации), а реально выполняет эту операцию границе N. операторов байт-кода, где N обычно равно 100. Но это значение можно изменить вызовом setcheckinterval() из модуля sys. В листинге 1 представлено приложение idt.py, в котором используется вызов setcheckinterval(). (Полный код можно найти в архиве python_parallel.tgz в разделе "Материалы для скачивания").

Листинг 1. Диагностика и изменение интервала переключения потоков
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys

print( 'версия Python {}.{}.{} на платформе {}'. \
       format( sys.version_info.major, sys.version_info.minor, \
               sys.version_info.micro, sys.platform ) )

first = True; switch = True
while( True ):
    try:
        print( 'интервал *checkinterval : {} '.format( sys.getcheckinterval() ) )
        print( 'интервал *switchinterval : {} сек.'.format( sys.getswitchinterval() ) )
    except AttributeError:
        if first: print( '*switchinterval не реализованы в этой версии Python' ); 
    switch = False
    else:
        if first: print( '*checkinterval объявлено устаревшими в этой версии Python' )
    first = False
    while( True ):
        try: val = int( input( 'введите checkinterval : ' ) ); break;
        except ValueError: print( 'checkinterval должно быть целочисленным' )
        except KeyboardInterrupt: print(); sys.exit( 0 )
    sys.setcheckinterval( val )
    if not switch: continue
    while( True ):
        try: val = float( input( 'введите switchinterval : ' ) ); break;
        except ValueError: print( 'switchinterval должно быть вещественным' )
        except KeyboardInterrupt: print(); sys.exit( 0 )
    sys.setswitchinterval( val )

Начиная с версии Python 3.2 вызов setcheckinterval() был объявлен устаревшим (deprecated) и заменён вызовом setswitchinterval(). Если старый метод устанавливает интервал переключения как число виртуальных инструкций, то новая реализация определяет его в секундах, из-за чего изменяется модель поведения. Но в обоих случаях устанавливается только минимальная граница, на которой может произойти переключение, а реально момент переключения может наступить значительно позже, например, если выполняется продолжительная внутренняя инструкция или вызов метода.

Запустим данное приложение в разных версиях интерпретатора Python (программа сама диагностирует версию, а не совсем эстетичные обозначения в тесте, вида *checkinterval подчёркивают, что за ними скрыты по 2 библиотечных вызова, set...() и get...() соответственно):

Для версии 2 мы получим:

$ python idt.py
версия Python 2.7.3 на платформе linux2
интервал *checkinterval : 100
*switchinterval не реализованы в этой версии Python
введите checkinterval : 30
интервал *checkinterval : 30
введите checkinterval : 200
интервал *checkinterval : 200
введите checkinterval : ^C()

А для версии 3:

$ python3 idt.py
версия Python 3.2.3 на платформе linux2
интервал *checkinterval : 100
интервал *switchinterval : 0.005 сек.
*checkinterval объявлено устаревшими в этой версии Python
введите checkinterval : 30
введите switchinterval : .01
интервал *checkinterval : 30
интервал *switchinterval : 0.01 сек.
введите checkinterval : 77
введите switchinterval : .1
интервал *checkinterval : 77
интервал *switchinterval : 0.09999999999999999 сек.
введите checkinterval : ^C

В интерпретаторе Python используется глобальная блокировка интерпретатора (Global Interpreter Lock — GIL), которая захватывается на время выполнения очередного потока, и никакой другой поток не может быть активирован, пока эта блокировка не будет освобождена. В этой модели нет ничего крамольного, но нужно хорошо представлять последствия, к которым может привести её применение и уметь пользоваться предоставляемыми возможностями. К детальному обсуждению этой модели мы вернёмся позже, уже после рассмотрения техники создания потоков в Python.

Реализация низкого уровня

Реализация низкого уровня представлена модулем thread стандартной библиотеки Python (в версии 3 этот модуль называется _thread), описание которого редко встречается в документации и литературе. Этот модуль обладает ограниченными возможностями для синхронизации, предлагая единственный примитив LockType (аналог бинарного мютекса), содержащий только методы для захвата (acquire()) и освобождения (release()). Тем не менее, даже на одном этом модуле можно построить развитое многопоточное приложение, как показано в листинге 2 (файл tlspeed.py в архиве python_parallel.tgz):

Листинг 2. Запуск нескольких потоков и ожидание их завершения
#!/usr/bin/python -O
# -*- coding: utf-8 -*-
import getopt
import sys
import time
try:
    import thread as thr
except ImportError:
    import _thread as thr

debuglevel = 0
threadnum = 2
delay = 1
active = 0
numt = 0                      # текущее число активных дочерних потоков
lock = thr.allocate_lock()    # блокировка доступа к числу активных дочерних потоков
wait = thr.allocate_lock()    # блокировка ожидания завершения всех дочерних потоков
barier = { 'numt' : numt, 'lock' : lock, 'wait' : wait }

def delay_in_cycle( delay = 1.0 ):
    t = time.time()
    while time.time() - t < delay :
        pass

def thrfun( delay, num, tstart ):
    st = time.time() - tstart
    barier[ 'numt' ] = barier[ 'numt' ] + 1  
    barier[ 'lock' ].release()
    ss = '\t{} : {} <= старт: {:14.11f}'.format( num, id, st )
    if not active : time.sleep( delay )     # пауза
    else : delay_in_cycle( delay )          # или активное ожидание
    barier[ 'lock' ].acquire()
    barier[ 'numt' ] = barier[ 'numt' ] - 1
    st = time.time() - tstart               # время завершения потока
    print( '{} - финиш: {:14.11f}'.format( ss, st ) )
    if 0 == barier[ 'numt' ] :
        barier[ 'wait' ].release()
    barier[ 'lock' ].release()
    return

opts, args = getopt.getopt( sys.argv[1:], "vt:d:a" )
for opt, arg in opts: # опции (ключи) командной строки (-v, -t, -d, -a)
    if opt[ 1: ] == 'v' : debuglevel = debuglevel + 1
    if opt[ 1: ] == 't' : threadnum = int( arg )
    if opt[ 1: ] == 'd' : delay = int( arg )
    if opt[ 1: ] == 'a' : active = 1
if debuglevel > 0 :
    print( opts )
    print( args )
    print( debuglevel )
    print( threadnum )

barier[ 'wait' ].acquire()   # захват блокировки завершения
for n in range( threadnum ): # запуск threadnum потоков
    barier[ 'lock' ].acquire()
    id = thr.start_new_thread( thrfun, ( delay, n, time.time() ) )
    print( "\t{} : {} => запуск".format( n, id ) )
barier[ 'wait' ].acquire()  # ожидание завершения всех потоков
print( 'завершены все {} потоков \
        завершается ожидавший главный поток'.format( threadnum ) )

Это приложение успешно выполняется как в версии 2, так и в версии 3 (для этого пришлось усложнить процесс импортирования модуля thread или _thread в зависимости от версии):

$ python tlspeed.py  -t3 -d2
        0 : -1220105408 => запуск
        1 : -1229980864 => запуск
        2 : -1240466624 => запуск
        0 : -1220105408 <= старт:  0.00027799606 - финиш:  2.00136685371
        1 : -1229980864 <= старт:  0.00010585785 - финиш:  2.00126695633
        2 : -1240466624 <= старт:  0.00015902519 - финиш:  2.00125098228
завершены все 3 потоков,
завершается ожидавший главный поток
$ python3 tlspeed.py  -t3 -d2
        0 : -1220469952 => запуск
        1 : -1229980864 => запуск
        2 : -1240466624 => запуск
        0 : -1220469952 <= старт:  0.00017404556 - финиш:  2.00208616257
        1 : -1229980864 <= старт:  0.00010085106 - финиш:  2.00201487541
        2 : -1240466624 <= старт:  0.00019097328 - финиш:  2.00192403793
завершены все 3 потоков
завершается ожидавший главный поток

В этой модели поток создаётся и сразу же запускается на выполнение (подобно pthread_t в POSIX) вызовом start_new_thread(). В приложении запускается несколько потоков (число потоков — опция -t), которые выполняются некоторое время (число секунд — опция -d), а главный поток ожидает их завершения. Опцией -a операцией выполнения потоков можно сделать не пассивное ожидание, а выдержку на активных процессорных циклах.

За счёт бедности средств синхронизации, для того, чтобы дождаться окончания дочерних потоков, требуется создавать искусственные конструкции, типа контейнера (структуры) barier в программе, работающей по типу счётного семафора. В этом и заключается слабость низкоуровневого механизма потоков Python.

Реализация высокого уровня

Реализация высокого уровня — это модуль threading стандартной библиотеки Python, который чаще всего и имеют в виду, когда говорят о потоках в Python. Реализуемая в нём модель является надстройкой над рассмотренной выше, поэтому поведение потоков не будет заметно различаться. Но модуль threading предоставляет гораздо больше возможностей и примитивов синхронизации (Lock, RLock, Condition, Semaphore, Event, Queue). С помощью этого модуля функциональность, аналогичную представленной в листинге 2, можно реализовать гораздо проще, короче и понятнее, как показано в листинге 3. Полный код примера находиться в файле thspeed.py в архиве python_parallel.tgz.

Листинг 3. Запуск нескольких потоков и ожидание их завершения
#!/usr/bin/python -O
# -*- coding: utf-8 -*-
import getopt
import sys
import threading
import time

debuglevel = 0
threadnum = 2
delay = 1
active = 0

def delay_in_cycle( delay = 1.0 ):
    t = time.time()
    while time.time() - t < delay :
        pass

def thrfun( *args ):   
    st = time.time() - args[ 2 ]             # время старта потока
    ss = '\t{} : {} <= старт: {:14.11f}'. \
         format( args[ 1 ], threading.currentThread().getName(), st )
    if not active : time.sleep( args[ 0 ] )  # пауза
    else : delay_in_cycle( args[ 0 ] )       # или активное ожидание
    st = time.time() - args[ 2 ]             # время завершения потока
    print( '{} - финиш: {:14.11f}'.format( ss, st ) )
    return

opts, args = getopt.getopt( sys.argv[1:], "vt:d:a" )
for opt, arg in opts:     # опции (ключи) командной строки (-v, -t, -d, -a)
    if opt[ 1: ] == 'v' : debuglevel = debuglevel + 1
    if opt[ 1: ] == 't' : threadnum = int( arg )
    if opt[ 1: ] == 'd' : delay = int( arg )
    if opt[ 1: ] == 'a' : active = 1
if debuglevel > 0 :
    print( opts )
    print( args )
    print(  debuglevel )
    print(  threadnum )

threads = []
for n in range( threadnum ): # создание и запуск потоков
    parm = [ delay, n, 0 ]
    t = threading.Thread( target=thrfun, args=parm )
    threads.append( t )
    t.setDaemon( 1 )
    print( "\t{} : {} => запуск".format( n, t.getName() ) )
    parm[ 2 ] = time.time()
    t.start()
for n in range( threadnum ): # ожидание завершения всех потоков
    threads[ n ].join()
print( 'завершены все {} потоков \
        завершается ожидавший главный поток'.format( threadnum ) )

В этой модели объект потока создаётся вызовом конструктора класса Thread, но его запуск на выполнение производится отдельно вызовом метода start() этого объекта. В данном случае запуск потока, по сравнению с низкоуровневой моделью, упрощается, но передача параметров в потоковую функцию напротив усложняется. Как и в предыдущем случае, показанный код выполняется с одинаковыми результатами в обеих версиях Python:

$ python thspeed.py  -t3 -d2
        0 : Thread-1 => запуск
        1 : Thread-2 => запуск
        2 : Thread-3 => запуск
        0 : Thread-1 <= старт:  0.00039386749 - финиш:  2.00249791145
        1 : Thread-2 <= старт:  0.00040698051 - финиш:  2.00256109238
        2 : Thread-3 <= старт:  0.00021982193 - финиш:  2.00245380402
завершены все 3 потоков
завершается ожидавший главный поток
$ python3 thspeed.py  -t3 -d2
        0 : Thread-1 => запуск
        1 : Thread-2 => запуск
        2 : Thread-3 => запуск
        0 : Thread-1 <= старт:  0.00151085854 - финиш:  2.00366187096
        1 : Thread-2 <= старт:  0.00321602821 - финиш:  2.00535202026
        2 : Thread-3 <= старт:  0.00187206268 - финиш:  2.00381302834
завершены все 3 потоков
завершается ожидавший главный поток

Параллельные процессы

Аналогичного поведения можно добиться и с помощью параллельных процессов. Такие решения естественны для операционных системы семейства UNIX. В листинге 3 представлен пример реализации, использующей механизмы операционной системы. Полный код примера можно найти в файле fork.py в архиве python_parallel.tgz.

Листинг 4. Запуск нескольких дочерних процессов
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import time
import sys
import getopt

delay = 1
procnum = 2
debuglevel = 0

opts, args = getopt.getopt( sys.argv[1:], "p:d:v" )
for opt, arg in opts:     # опции (ключи) командной строки (-v, -t, -d, -a)
    if opt[ 1: ] == 'v' : debuglevel = debuglevel + 1
    if opt[ 1: ] == 't' : threadnum = int( arg )
    if opt[ 1: ] == 'd' : delay = int( arg )
    if opt[ 1: ] == 'a' : active = 1

childs = []
if debuglevel :
     print( 'родительский процесс {}'.format( os.getpid() ) )
for i in range( 0, procnum ) :
    tim = time.time();
    try :
        pid = os.fork();
    except :
        print( 'error: create child process' ); sys.exit( 33 )
    if pid == 0 :
        trun = time.time() - tim;
        if debuglevel :
           print( 'дочерний процесс {} - задержка на запуск: {:14.11f}'. \
                  format( os.getpid(), trun ) )
        time.sleep( delay )
        trun = time.time() - tim;
        if debuglevel :
           print( 'дочерний процесс {} - время завершения: {:14.11f}'. \
                  format( os.getpid(), trun ) )
        sys.exit( 3 )
    if pid > 0 :
        childs.append( pid )
        if debuglevel :
            print( '{}: создан новый дочерний процесс {}'.format( os.getpid(), pid ) )
print( 'ожидание завершения дочерних процессов ...' )
for p in childs :
    pid, status = os.wait()
    if debuglevel :
        print( 'код завершения процесса {} = {}'.\
                    format( pid, os.WEXITSTATUS( status ) ) )
print( 'все порождённые процессы успешно завершены' )

Это приложение также хорошо исполняется в обеих версиях Python:

$ python fork.py -v
родительский процесс 13612
13612: создан новый дочерний процесс 13613
13612: создан новый дочерний процесс 13614
ожидание завершения дочерних процессов ...
дочерний процесс 13614 - задержка на запуск:  0.00063419342
дочерний процесс 13613 - задержка на запуск:  0.00130796432
дочерний процесс 13614 - время завершения:  1.00162100792
дочерний процесс 13613 - время завершения:  1.00252485275
код завершения процесса 13613 = 3
код завершения процесса 13614 = 3
все порождённые процессы успешно завершены
$ python3 fork.py -v
родительский процесс 13647
13647: создан новый дочерний процесс 13648
дочерний процесс 13648 - задержка на запуск:  0.00055909157
13647: создан новый дочерний процесс 13649
ожидание завершения дочерних процессов ...
дочерний процесс 13649 - задержка на запуск:  0.00042104721
дочерний процесс 13648 - время завершения:  1.00205016136
дочерний процесс 13649 - время завершения:  1.00123310089
код завершения процесса 13648 = 3
код завершения процесса 13649 = 3
все порождённые процессы успешно завершены

В данном примере (как и в случаях с потоками) проставляются временные метки, обозначающие задержки при запуске дочерних параллельных ветвей. Как можно заметить, временные затраты на создание и запуск нового процесса или потока для исполняющей системы Python практически одинаковы. Это можно объяснить тем, что основное время съедает интерпретирующая система, а не выполнение системных вызовов POSIX. Этот эффект также встречается и при программировании на языке C: "тяжесть" параллельных процессов проявляется не в скорости их создания или переключения контекста в мультизадачной среде, а в сложности и медленности межпроцессных взаимодействий (IPC) в изолированных (защищённых) адресных пространствах.

Безусловно, не следует забывать и то, что число одновременно запущенных процессов в операционной системе может быть ограничено (это зависит от объёма ресурсов, доступных системе, главным образом, оперативной памяти):

$ python fork.py -p721
ожидание завершения дочерних процессов ...
все порождённые процессы успешно завершены
$ python fork.py -p722
error: create child process
$ echo $?
33

Заключение

В данной статье мы познакомились с особенностями реализации параллельного исполнения в среде Python. Однако представленные подходы не могут применяться с одинаковым успехом в различных операционных системах. В следующей статье мы рассмотрим принципы создания многопоточного Python-кода, способного запускаться в любой операционной системе.


Загрузка

ОписаниеИмяРазмер
параллельность в Pythonpython_parallel.tgz14KB

Ресурсы

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


  • Bluemix

    Узнайте больше информации о платформе IBM Bluemix, создавайте приложения, используя готовые решения!

  • developerWorks Premium

    Эксклюзивные инструменты для построения вашего приложения. Узнать больше.

  • Библиотека документов

    Более трех тысяч статей, обзоров, руководств и других полезных материалов.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Open source
ArticleID=956100
ArticleTitle=Тонкости использования языка Python: Часть 4. Параллельное исполнение
publish-date=12052013