Программирование на Python: Часть 9. Процессы и потоки

Параллельное программирование становится в последнее время жизненной необходимостью, которая диктуется темпами развития многоядерных процессоров. Одним из вариантов организации параллельного программирования является многопоточное программирование.

Сергей Яковлев, Консультант, независимый специалист

Яковлев Сергей — независимый разработчик с многолетним опытом прикладного и системного программирования; вносит вклад в развитие open-source на своем персональном сайте www.iakovlev.org. Консультант.



02.09.2010

С появлением многоядерных процессоров стала общеупотребительной практика распространять нагрузку на все доступные ядра. Существует два основных подхода в распределении нагрузки: использование процессов и потоков.

Использование нескольких процессов фактически означает использование нескольких программ, которые выполняются независимо друг от друга. Программно это решается с помощью системных вызовов exec и fork. Такой подход создает большие неудобства в управлении обмена данными между этими программами.

В качестве альтернативы существует другой подход – создание многопоточных программ. Обмен данными между потоками существенно упрощается. Но управление такими программами усложняется, и вся ответственность ложится на программиста.

Сегодня мы рассмотрим следующие темы.

  • Как работают процессы.
  • Как работают потоки в питоне.
  • Создание потока.
  • Очереди (Queue).
  • Блокировки (Lock).

1. Как работают процессы

В питоне есть стандартный модуль subprocess, который упрощает управление другими программами, передавая им опции командной строки и организуя обмен данными через каналы (pipe). Мы рассмотрим пример, в котором пользователь запускает программу из командной строки, которая в свою очередь запустит несколько дочерних программ. В данном примере два скрипта – рarent.py и child.py. Запускается parent.py. Child.py выступает в роли аргумента command, который передается в запускаемый процесс. У этого процесса есть стандартный вход, куда мы передаем два аргумента – поисковое слово и имя файла. Мы запустим два экземпляра программы child.py, каждый экземпляр будет искать слово word в своем файле – это будут файлы исходников самих программ. Запись на стандартный вход осуществляет модуль subprocess. Каждый процесс пишет результат своего поиска в консоль. В главном процессе мы ждем, пока все child не закончат свою работу.

Код parent.py:

import os
import subprocess
import sys


child = os.path.join(os.path.dirname(__file__), "./child.py")
word  = 'word'
file = ['./parent.py','./child.py']

pipes = []
for i in range(0,2):
  command = [sys.executable, child]
  pipe = subprocess.Popen(command, stdin=subprocess.PIPE)
  pipes.append(pipe)
  pipe.stdin.write(word.encode("utf8") + b"\n")
  pipe.stdin.write(file[i].encode("utf8") + b"\n")
  pipe.stdin.close()

while pipes:
    pipe = pipes.pop()
    pipe.wait()

Код child.py:

import sys

word = sys.stdin.readline().rstrip()
filename = sys.stdin.readline().rstrip()

try:
  with open(filename, "rb") as fh:
    while True:
      current = fh.readline()
      if not current:
          break
      if (word in current ):
          print("find: {0} {1}".format(filename,word))
except :
    pass

2. Как работают потоки в питоне

Если нужно, чтобы ваше приложение выполняло несколько задач в одно и то же время, то можете воспользоваться потоками (threads). Потоки позволяют приложениям выполнять в одно и то же время множество задач. Многопоточность (multi-threading) важна во множестве приложений, от примитивных серверов до современных сложных и ресурсоёмких игр.

Когда в одной программе работают несколько потоков, возникает проблема разграничения доступа потоков к общим данным. Предположим, что есть два потока, имеющих доступ к общему списку. Первый поток может делать итерацию по этому списку:

  for x in L

а второй в этот момент начнет удалять значения из этого списка. Тут может произойти все что угодно: программа может упасть, или мы просто получим неверные данные.

Решением в этом случае является применение блокировки. При этом доступ к заблокированному списку будет иметь только один поток, второй будет ждать, пока блокировка не будет снята.

Применение блокировки порождает другую проблему – дедлок (deadlock) – мертвая блокировка. Пример дедлока: имеется два потока и два списка. Первый поток блокирует первый список, второй поток блокирует второй список. Первый поток изнутри первой блокировки пытается получить доступ к уже заблокированному второму списку, второй поток пытается проделать то же самое с первым списком. Получается неопределенная ситуация с бесконечным ожиданием. Эту ситуации легко описать, на практике все гораздо сложнее.

Вариантом решения проблемы дедлоков является политика определения очередности блокировок. Например, в предыдущем примере мы должны определить, что блокировка первого списка идет всегда первой, а уже потом идет блокировка второго списка.

Другая проблема с блокировками – в том, что несколько потоков могут одновременно ждать доступа к уже заблокированному ресурсу и при этом ничего не делать. Каждая питоновская программа всегда имеет главный управляющий поток.

Питоновская реализация многопоточности ограниченная. Интерпретатор питона использует внутренний глобальный блокировщик (GIL), который позволяет выполняться только одному потоку. Это сводит на нет преимущества многоядерной архитектуры процессоров. Для многопоточных приложений, которые работают в основном на дисковые операции чтения/записи, это не имеет особого значения, а для приложений, которые делят процессорное время между потоками, это является серьезным ограничением.


3. Создание потока

Для создания потоков мы будем использовать стандартный модуль threading. Есть два варианта создания потоков:

вызов функции

  threading.Thread()

вызов класса

  threading.Thread

Следующий пример показывает, как к потоку приаттачить функцию через вызов функции:

import threading
import time
def clock(interval):
    while True:
        print("The time is %s" % time.ctime())
        time.sleep(interval)
t = threading.Thread(target=clock, args=(15,))
t.daemon = True
t.start()

Пример на создание потока через вызов класса: в конструкторе обязательно нужно вызвать конструктор базового класса. Для запуска потока нужно выполнить метод start() объекта-потока, что приведет к выполнению действий в методе run():

import threading
import time
class ClockThread(threading.Thread):
    def __init__(self,interval):
        threading.Thread.__init__(self)
        self.daemon = True
        self.interval = interval
    def run(self):
        while True:
            print("The time is %s" % time.ctime())
            time.sleep(self.interval)
t = ClockThread(15)
t.start()

Для управления потоками существуют методы:

start() – дает потоку жизнь.

run() –этот метод представляет действия, которые должны быть выполнены в потоке.

join([timeout]) – поток, который вызывает этот метод, приостанавливается, ожидая завершения потока, чей метод вызван. Параметр timeout (число с плавающей точкой) позволяет указать время ожидания (в секундах), по истечении которого приостановленный поток продолжает свою работу независимо от завершения потока, чей метод join был вызван. Вызывать join() некоторого потока можно много раз. Поток не может вызвать метод join() самого себя. Также нельзя ожидать завершения еще не запущенного потока.

getName() – возвращает имя потока.

setName(name) – присваивает потоку имя name.

isAlive() – возвращает истину, если поток работает (метод run() уже вызван).

isDaemon() – возвращает истину, если поток имеет признак демона.

setDaemon(daemonic) – устанавливает признак daemonic того, что поток является демоном.

activeCount() – возвращает количество активных в настоящий момент экземпляров класса Thread. Фактически это len(threading.enumerate()).

currentThread() – возвращает текущий объект-поток, т.е. соответствующий потоку управления, который вызвал эту функцию.

enumerate() – возвращает список активных потоков.


4. Очереди (Queue)

В следующем примере будет решена аналогичная задача, что и в предыдущем примере с процессами: будут запущены три потока, каждый из которых будет работать по принципу утилиты grep. Имеется глобальный ресурс – work_queue – список файлов для поиска, который мы положим в очередь. Для этого будет использован объект Queue, который имеет встроенную блокировку:

import threading
import Queue

class Worker(threading.Thread):

    def __init__(self, work_queue, word):
        super(Worker,self).__init__()
        self.work_queue = work_queue
        self.word = word

    def run(self):
        try:
            filename = self.work_queue.get()
            self.process(filename)
        finally:
            pass

    def process(self, filename):
        previous = "
        current=True
        with open(filename, "rb") as fh:
            while current:
                current = fh.readline()
                if not current: break
                current = current.decode("utf8", "ignore")
                if self.word in current :
                    print("find {0}: {1}".format(self.word,filename))
                previous = current

word = 'import'
filelist = ['./file1.py','./file2.py','./file3.py']
work_queue = Queue.Queue()
for filename in filelist:
    work_queue.put(filename)
for i in range(3):
    worker = Worker(work_queue, word)
    worker.start()

5. Блокировки (Lock)

В следующем примере будут созданы три потока, каждый из которых будет считывать стартовую страницу по указанному Web-адресу. В примере имеется глобальный ресурс – список урлов – url_list – доступ к которому будет блокироваться с помощью блокировки threading.Lock(). Объект Lock имеет методы:

acquire([blocking=True]) – делает запрос на запирание замка. Если параметр blocking не указан или является истиной, то поток будет ожидать освобождения замка.

Если параметр не был задан, метод не возвратит значения.

Если blocking был задан и истинен, метод возвратит True (после успешного овладения замком).

Если блокировка не требуется (т.е. задан blocking=False), метод вернет True, если замок не был заперт и им успешно овладел данный поток. В противном случае будет возвращено False.

release() – запрос на отпирание замка.

locked() – возвращает текущее состояние замка (True – заперт, False – открыт).

import threading
from urllib import urlopen

class WorkerThread(threading.Thread):
  def __init__(self,url_list,url_list_lock):
    super(WorkerThread,self).__init__()
    self.url_list=url_list
    self.url_list_lock=url_list_lock
    
  def run(self):
    while (1):
      nexturl = self.grab_next_url()
      if nexturl==None:break
      self.retrieve_url(nexturl)
        
  def grab_next_url(self):
    self.url_list_lock.acquire(1)
    if len(self.url_list)<1:
      nexturl=None
    else:
      nexturl = self.url_list[0]
      del self.url_list[0]
    self.url_list_lock.release()
    return nexturl  
        
        
  def retrieve_url(self,nexturl):
    text = urlopen(nexturl).read()
    print text
    print '################### %s #######################' % nexturl
    
url_list=['http://linux.org.ru','http://kernel.org','http://python.org']
url_list_lock = threading.Lock()
workerthreadlist=[]
for x in range(0,3):
  newthread = WorkerThread(url_list,url_list_lock)
  workerthreadlist.append(newthread)
  newthread.start()
for x in range(0,3):
  workerthreadlist[x].join()

Заключение

Параллельное программирование становится в последнее время жизненной необходимостью, которая диктуется темпами развития многоядерных процессоров. Одним из вариантов организации параллельного программирования является многопоточное программирование. В обычной программе действует всего один поток управления, а в многопоточной одновременно могут работать несколько потоков.

В многопоточной программе усложняется контроль за обменом данных между потоками. Глобальные ресурсы необходимо предохранять от одновременного доступа со стороны нескольких потоков, чтобы не нарушить их целостности. В этой статье были рассмотрены инструменты контроля глобальных данных – блокировки, очереди.

Приведенные примеры проверялись на версии питона 2.6.

< Предыдущая статья. Следующая статья >

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Linux, Open source
ArticleID=515198
ArticleTitle=Программирование на Python: Часть 9. Процессы и потоки
publish-date=09022010