Пять вещей, которые вы не знали о ... пакете java.util.concurrent. Часть 1

Многопоточное программирование с параллельными коллекциями

Написание многопоточного кода, который как обеспечивает хорошую производительность, так и защищает данные приложения от повреждения – очевидно, сложная задача. Именно для таких задач предназначен пакет java.util.concurrent. Тед Ньювард покажет вам, как классы параллельных коллекций, такие как CopyOnWriteArrayList, BlockingQueue и ConcurrentMap, изменяют стандартные классы коллекций для нужд параллельного программирования.

Тед Ньювард, Глава, Neward & Associates

Тед Ньювард - глава Neward & Associates, где он консультирует, руководит, обучает и внедряет Java, .NET, XML Services и другие платформы. Он проживает возле Сиэтла, штат Вашингтон.



23.12.2011

Об этой серии статей

Вы думаете, что знаете, как программировать на Java? На самом деле большинство разработчиков лишь поверхностно знакомятся с платформой Java, узнавая лишь то, что необходимо для выполнения работы. В этом цикле, статей Тед Ньювард углубляется в основную функциональность платформы Java, чтобы рассказать о малоизвестных фактах, знание которых может помочь при решении самых сложных задач программирования.

Параллельные коллекции были огромным вкладом в Java™ 5, однако многие Java-разработчики упустили их из вида на фоне шумихи об аннотациях и родовых типах (generics). Кроме того (и, возможно, это будет более верным), многие разработчики избегают этого пакета, так как предполагают, что он, как и проблемы, которые он призван решать, обязательно должен быть сложным.

На самом деле, в пакете java.util.concurrent содержится множество классов, которые эффективно решают многие проблемы параллелизма, не утруждая при этом вас. Читайте дальше, чтобы узнать, как классы java.util.concurrent, такие как CopyOnWriteArrayList и BlockingQueue, помогают решать больные вопросы многопоточного программирования.

1. TimeUnit

Перечисление java.util.concurrent.TimeUnit хотя и не является коллекцией, тем не менее делает код гораздо более легким для чтения. Применение TimeUnit освобождает разработчиков, использующих ваш метод или API, от необходимости работать с миллисекундами.

TimeUnit включает в себя значения для всех единиц времени – от MILLISECONDS (миллисекунды) и MICROSECONDS (микросекунды) до DAYS (дни) и HOURS (часы), т.е. оно охватывает почти все интервалы времени, которые могут понадобиться разработчику. А благодаря имеющимся методам конвертации можно, например, легко пересчитать HOURS в MILLISECONDS.


2. CopyOnWriteArrayList

Создание копии массива – это слишком дорогая операция как по времени, так и по затратам ресурсов памяти. Поэтому вместо нее разработчики часто прибегают к использованию синхронизованной коллекции ArrayList. Однако это тоже затратный вариант, так как каждый раз при обходе содержимого коллекции для обеспечения целостности данных приходится синхронизировать все операции, в том числе чтение и запись.

Это неоправданно снижает производительность в сценариях, когда имеется множество операций чтения ArrayList, но мало операций изменения.

Эту проблему можно решить с помощью замечательной коллекции CopyOnWriteArrayList. Она определяется в Javadoc как "потокобезопасный вариант коллекции ArrayList, в которой все изменяющие операции (add, set и т.д.) реализованы посредством создания свежей копии массива".

Перед каждой модификацией коллекция копирует свое содержимое в новый массив, чтобы операции чтения содержимого массива выполнялись без синхронизации (так как они никогда не работают с изменяемыми данными).

Таким образом, коллекция CopyOnWriteArrayList идеальна именно для того сценария, в котором ArrayList нас подводит: коллекции с частым чтением и редкой записью, например слушатели (Listener) для события JavaBean.


3. BlockingQueue

Интерфейс BlockingQueue является очередью (Queue), т.е. его элементы хранятся в порядке «первый пришел, первый вышел» (FIFO – first in, first out). Элементы, вставленные в коллекцию в определенном порядке, будут извлечены из нее в том же самом порядке. Также интерфейс гарантирует, что любая попытка извлечь элемент из пустой очереди заблокирует вызывающий поток до тех пор, пока в коллекции не появится элемент, который можно извлечь. Аналогично, любая попытка вставить элемент в заполненную очередь заблокирует вызывающий поток, пока в коллекции не освободится место для нового элемента.

BlockingQueue изящно решает проблему передачи элементов, собранных одним потоком, для обработки в другой поток без явных хлопот о проблемах синхронизации. Хорошим примером является способ Guarded Blocks из официального руководства Java. В нем создается ограниченный буфер с одним слотом, после чего потоки, используя ручную синхронизацию и методы wait()/notifyAll(), сигнализируют друг другу, когда в слоте имеется новый элемент для обработки и когда слот готов к помещению в него нового элемента (cм. подробности в реализации Guarded Blocks).

Хотя код из руководства Guarded Blocks работает, он выглядит длинным, путаным и не вполне интуитивно понятным. В начале развития платформы Java разработчикам приходилось иметь дело с подобным кодом, однако сейчас 2010 год – наверняка положение дел улучшилось?

В листинге 1 показана переписанная версия кода Guarded Blocks, где я использую ArrayBlockingQueue вместо написанного вручную типа Drop.

Листинг 1. BlockingQueue
import java.util.*;
import java.util.concurrent.*;

class Producer
    implements Runnable
{
    private BlockingQueue<String> drop;
    List<String> messages = Arrays.asList(
        "Mares eat oats",
        "Does eat oats",
        "Little lambs eat ivy",
        "Wouldn't you eat ivy too?");
        
    public Producer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            for (String s : messages)
                drop.put(s);
            drop.put("DONE");
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }    
}

class Consumer
    implements Runnable
{
    private BlockingQueue<String> drop;
    public Consumer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            String msg = null;
            while (!((msg = drop.take()).equals("DONE")))
                System.out.println(msg);
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }
}

public class ABQApp
{
    public static void main(String[] args)
    {
        BlockingQueue<String> drop = new ArrayBlockingQueue(1, true);
        (new Thread(new Producer(drop))).start();
        (new Thread(new Consumer(drop))).start();
    }
}

Интерфейс ArrayBlockingQueue также уважает "честность", т.е. он может давать потокам чтения и записи доступ согласно принципу «первый пришел, первый вышел». Альтернативой этому может быть более эффективная политика, в которой допускается риск простаивания некоторых потоков. (А именно, было бы более эффективным позволять потокам чтения выполняться в то время, когда другие потоки чтения владеют блокировкой, но в такой стратегии есть риск того, что постоянная вереница потоков чтения не позволит потокам записи когда-либо выполнить свою работу).

Опасайтесь ошибки!

Кстати, если вы заметили, что в реализации Guarded Blocks имеется огромная ошибка, – вы правы. Что бы случилось, если бы разработчик синхронизировал экземпляр класса Drop в методе main()?

BlockingQueue также поддерживает методы, принимающие параметр time, обозначающий, как долго потоки должны пребывать в блокированном состоянии перед тем как вернуть управление, сигнализируя о том, что добавление или извлечение элемента не удалось. Это позволяет избежать неограниченного ожидания, которое может привести к катастрофе в рабочей системе, а также легко может "подвесить" систему и принудить к перезагрузке.


4. ConcurrentMap

В Map имеется неочевидная ошибка параллелизма, которая вводит в заблуждение многих не знающих о ней Java-разработчиков. Легким решением данной проблемы является ConcurrentMap.

Когда доступ к коллекции Map осуществляется из нескольких потоков, перед сохранением пары ключ–значение часто с помощью методов containsKey() или get() определяется, имеется ли уже данный ключ в коллекции. Однако даже при использовании synchronized Map какой-либо поток может "подкрасться" и в середине процесса перехватить управление коллекцией Map. Проблема в том, что блокировка захватывается в начале вызова метода get(), затем освобождается, после чего снова захватывается в вызове метода put(). В результате может возникнуть гоночная ситуация между двумя потоками, итог которой может быть разным в зависимости от того, какой поток получит управление первым.

Если два потока вызывают метод в один и тот же момент времени, для каждого из них будет выполнена проверка, и каждый из них выполнит метод put, в результате чего значение, сохраненное первым потоком, будет потеряно. К счастью, интерфейс ConcurrentMap поддерживает несколько дополнительных методов, спроектированных для выполнения двух операций под одной блокировкой: например, метод putIfAbsent() сначала выполняет проверку, а затем помещает новое значение, только если данный ключ еще не хранится Map.


5. SynchronousQueues

SynchronousQueue, согласно Javadoc, является довольно интересной сущностью.

Блокирующая очередь, в которой каждая операция добавления должна ждать соответствующей операции удаления в другом потоке и наоборот. Синхронная очередь не имеет никакой внутренней емкости, даже емкости в один элемент.

В сущности, SynchronousQueue – это еще одна реализация упомянутого выше интерфейса BlockingQueue. Она предоставляет необычайно легковесный способ обмена одиночными элементами между потоками посредством семантики блокировки, используемой в ArrayBlockingQueue. В листинге 2 я переписал код из листинга 1 с использованием SynchronousQueue вместо ArrayBlockingQueue.

Листинг 2. SynchronousQueue
import java.util.*;
import java.util.concurrent.*;

class Producer
    implements Runnable
{
    private BlockingQueue<String> drop;
    List<String> messages = Arrays.asList(
        "Mares eat oats",
        "Does eat oats",
        "Little lambs eat ivy",
        "Wouldn't you eat ivy too?");
        
    public Producer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            for (String s : messages)
                drop.put(s);
            drop.put("DONE");
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }    
}

class Consumer
    implements Runnable
{
    private BlockingQueue<String> drop;
    public Consumer(BlockingQueue<String> d) { this.drop = d; }
    
    public void run()
    {
        try
        {
            String msg = null;
            while (!((msg = drop.take()).equals("DONE")))
                System.out.println(msg);
        }
        catch (InterruptedException intEx)
        {
            System.out.println("Interrupted! " + 
                "Last one out, turn out the lights!");
        }
    }
}

public class SynQApp
{
    public static void main(String[] args)
    {
        BlockingQueue<String> drop = new SynchronousQueue<String>();
        (new Thread(new Producer(drop))).start();
        (new Thread(new Consumer(drop))).start();
    }
}

Код этой реализации выглядит почти точно так же, однако у приложения появляется дополнительное преимущество: SynchronousQueue позволяет вставить элемент в очередь только в том случае, если имеется поток, ожидающий поступления элемента для обработки.

На практике SynchronousQueue похожа на "каналы для рандеву", имеющиеся в таких языках, как Ada или CSP. Также в некоторых других средах, в том числе .NET, такие сущности известны как "объединения" (см. раздел Ресурсы).


В заключение

Зачем добавлять параллелизм в ваши классы коллекций, когда библиотека среды выполнения Java предлагает удобные, встроенные эквиваленты? В следующей статье этого цикла мы изучим пространство имен java.util.concurrent еще подробнее.


Загрузка

ОписаниеИмяРазмер
Пример кода для этой статьиj-5things4-src.zip23КБ

Ресурсы

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Технология Java
ArticleID=782752
ArticleTitle=Пять вещей, которые вы не знали о ... пакете java.util.concurrent. Часть 1
publish-date=12232011