Одновременное исполнение на платформе JVM

Основы одновременного исполнения в Java и в Scala

Одновременное исполнение в языке Java и дополнительные возможности, предоставляемые языком Scala

Comments

Серия контента:

Этот контент является частью # из серии # статей: Одновременное исполнение на платформе JVM

Следите за выходом новых статей этой серии.

Этот контент является частью серии:Одновременное исполнение на платформе JVM

Следите за выходом новых статей этой серии.

Десятилетия непрерывного быстрого повышения скорости процессоров завершились примерно в конце 20 века. С тех пор производители процессоров повышают производительность процессорных чипов преимущественно посредством увеличения количества процессорных ядер, а не посредством повышения их тактовой частоты. Сегодня многоядерные системы являются нормой для всех компьютеризированных устройств — от сотовых телефонов до корпоративных серверов — и эта тенденция с большой вероятностью будет лишь продолжаться и усиливаться. Разработчикам во все большей степени необходимо учитывать в своем программном коде наличие нескольких ядер для удовлетворения потребностей в производительности.

В этом цикле статей рассматриваются некоторые современные подходы к программированию одновременного исполнения на языках Scala и Java, в том числе реализация в языке Java тех идей, которые уже применяются в языке Scala и в других языках на базе JVM. Эта первая статья цикла содержит базовую информацию, позволяющую получить более широкое представление о программировании одновременного исполнения на платформе JVM. В частности, в статье рассматривается несколько современных методик – как для Java 7, так и для Scala. Вы научитесь использовать Java-классы ExecutorService и ForkJoinPool для упрощения программирования одновременного исполнения. Вы также получите вводную информацию о некоторых базовых средствах Scala, которые расширяют возможности программирования одновременного исполнения по сравнению с теми, которые доступны в "чистом" Java. Вы также увидите, как разные подходы влияют на производительность одновременно исполняемых программ. В последующих статьях данного цикла будут рассмотрены усовершенствования Java 8 в сфере одновременного исполнения, а также расширения (в том числе инструментарий Akka) для масштабируемого программирования на Java и на Scala.

Поддержка одновременного исполнения в Java

Поддержка одновременного исполнения свойственна Java с самых ранних дней существования этой платформы, при этом чистая реализация потоков и синхронизация обеспечивает ей преимущество относительно конкурирующих языков. Язык Scala основан на Java и исполняется на платформе JVM с непосредственным доступом ко всей среде исполнения Java (в том числе ко всем средствам поддержки одновременного исполнения). Таким образом, перед изучением возможностей Scala я для начала кратко рассмотрю возможности, уже имеющиеся в языке Java.

Базовая поточная обработка в Java

При программировании на языке Java создание и использование потоков осуществляется достаточно просто. Потоки представляются с помощью класса java.lang.Thread, а код, подлежащий исполнению потоком, имеет форму экземпляра java.lang.Runnable. При необходимости вы можете создать в приложении большое количество потоков — вплоть до нескольких тысяч. При наличии нескольких доступных ядер JVM использует их для одновременного исполнения нескольких потоков; если количество потоков превышает количество ядер, то потоки используют эти ядра совместно.

Координация действий потоков сопровождается значительными сложностями. Одна из сложностей обусловлена тем, что компилятору Java и машине JVM разрешается изменять порядок операций в вашем коде — до тех пор все элементы вашей программы остаются непротиворечивыми. Например, если две операции сложения используют различные переменные, то компилятор или JVM могут выполнить операции в обратном порядке относительно указанного вами — при условии, что программа не использует ни одну из этих сумм до того, как обе эти операции будут выполнены. Такие гибкие возможности переупорядочивания операций помогают повысить производительность Java-кода, однако гарантия непротиворечивости действует лишь в пределах одного потока. Аппаратные средства тоже могут порождать проблемы с потоками. Современные системы используют несколько уровней кэш-памяти; эта кэш-память в общем случае не является идентичной для всех ядер системы. Когда одно ядро изменяет какое-либо значение в памяти, другие ядра могут не сразу увидеть это изменение.

Вследствие проблем, возникающих, когда один поток работает с данными, измененными другим потоком, порядком взаимодействия этих потоков необходимо управлять явно. В Java предусмотрены специальные операции для такого управления. Эти операции вводят упорядочивание в представлениях данных, которые видны разным потокам. Простейший вариант состоит в том, что потоки используют ключевое слова synchronized при обращении к объекту. Когда поток синхронизируется на каком-либо объекте, это поток получает монопольный доступ к блокировке, которая является уникальной для вышеуказанного объекта. Если другой поток уже удерживает эту блокировку, то поток, который хочет получить ее, должен ждать (блокироваться) до тех пор, пока эта блокировка не будет снята. Когда поток возобновляет исполнение в блоке кода, обозначенного как synchronized Java гарантирует, что этот поток "видит" все, записанное другими потоками, которые ранее удерживали эту же блокировку — но только те данные, которые были записаны этими потоками до момента времени, когда они сняли эту блокировку посредством оставления своего собственного блока в состоянии synchronized Эта гарантия распространяется на переупорядочение операций, выполняемых как компилятором или JVM, так и аппаратной кэш-памятью. Как следствие, внутренняя часть блока synchronized является островком устойчивости в вашем коде, в котором потоки могут поочередно исполняться, взаимодействовать и обмениваться информацией, причем совершенно безопасно.

Использование ключевого слова volatile с переменной обеспечивает несколько более слабую форму безопасного взаимодействия между потоками. Ключевое слово synchronized гарантирует, что при получении блокировки вы будете видеть результаты операций сохранения другого потока, и что другие потоки, получающие эту блокировку после вас, увидят результаты ваших операции сохранения. Ключевое слово volatile разбивает эту гарантию на две отдельных части. Если поток осуществляет запись в volatile-переменную, то сначала очищаются все его предшествующие записи в эту точку. Если поток читает переменную, то он видит не только значение, записанное в этой переменной, но также и все другие значения, записанные "пишущим" потоком. Таким образом, чтение volatile-переменной обеспечивает ту же разновидность гарантии памяти, что и вход в блок synchronized, а запись volatile-переменной дает ту же разновидность гарантии памяти, что и выход из блока synchronized. Однако есть и существенное различие: операция чтения или записи volatile-переменной никогда не осуществляет блокировку.

Абстрагирование одновременного исполнения в Java

Синхронизация весьма полезна, и многие многопоточные приложения разработаны на Java с использованием лишь простых synchronized-блоков. Но координирование потоков может оказаться непростым делом, особенно когда разработчик имеет дело со многими потоками и со многими блокировками. При этом трудно гарантировать, что потоки взаимодействуют только безопасными способами и что вы избегаете потенциальных взаимных блокировок (когда два или более потока ждут друг от друга снятия блокировки, без чего они не способны продолжать свое исполнение). Абстракции, которые поддерживают одновременное исполнение без необходимости непосредственного манипулирования потоками и блокировками, дают разработчикам более эффективные способы для управления распространенными вариантами использования.

Иерархия java.util.concurrent включает разновидности коллекций, поддерживающие параллельный доступ, классы-оболочки (wrapper) для атомарных операций и примитивы синхронизации. Многие из этих классов были созданы для поддержки неблокирующего доступа, что избавляет от проблем с взаимной блокировкой и обеспечивает более эффективную поточную обработку. Эти классы облегчают определение и регулирование взаимодействий между потоками, однако они все еще страдают от некоторой сложности базовой модели поточной обработки.

Две абстракции из состава пакета java.util.concurrent поддерживают более "отвлеченный" подход к реализации одновременного исполнения, основанный на использовании: интерфейса Future<T>, а также интерфейсов Executor и ExecutorService. В свою очередь, эти связанные интерфейсы лежат в основе многих расширений Scala и Akka для поддержки одновременного исполнения в Java, поэтому имеет смысл рассмотреть эти интерфейсы и их реализацию более подробно.

Интерфейс Future<T>— это "держатель" для значения типа T, характерной особенностью которого является то, что это значение в общем случае недоступно до некоторого момента после создания Future. Это значение является результатом асинхронной операции, которая допускает одновременное исполнение. Поток, получающий Future, может вызывать методы, решающие следующие задачи:

  • Проверка доступности значения
  • Ожидание доступности значения
  • Извлечение значения, когда оно является доступным
  • Отмена операции, если значение больше не является необходимым

Конкретные реализации интерфейса Future структурированы для поддержки различных способов обращения с асинхронной операцией.

Интерфейс Executor— это абстракция "чего-либо", исполняющего задачи. Это "что-либо" в конечном итоге и будет потоком, однако описываемый интерфейс скрывает подробности того, как этот поток осуществляет исполнение. Полезность интерфейса Executor как такового ограниченна; субинтерфейс ExecutorService предоставляет методы расширения для управления завершением и для создания Future-объектов для результатов задач. Все стандартные реализации "корневого" интерфейса Executor также реализуют субинтерфейс ExecutorService, поэтому на практике корневой интерфейс можно игнорировать.

Потоки потребляют сравнительно много ресурсов, поэтому имеет смысл повторно использовать их, а не выделять их однократно с последующим отбрасыванием. Интерфейс ExecutorService упрощает разделение работы между потоками, а также обеспечивает автоматическое повторное использование потоков, что облегчает программирование и повышает производительность. Реализация ThreadPoolExecutor интерфейса ExecutorService управляет пулом потоков с целью выполнения задач.

Применение одновременного исполнения в Java

Практическое применение одновременного исполнения нередко включает в себя задачи, которым требуются взаимодействия с внешними контрагентами (с пользователем, с хранилищем, с другими системами и т. д.), независимыми от вашей основной логики обработки. Такой тип применения сложно свести к простому примеру, поэтому для демонстрации одновременного исполнения специалисты зачастую используют простые задачи с большим объемом вычислений, такие как математические расчеты или сортировка. Я буду использовать подобный пример.

Задача состоит в отыскании ближайшего известного слова для неизвестного введенного ("входящего") слова, где термин "ближайшее" определяется по значению показателя под названием расстояние Левенштейна (другое название — расстояние редактирования): представляющее собой минимальное количество операций добавления, удаления и изменения символов, требуемых для преобразования введенного слова в известное слово. Я использую код, основанный на примере из статьи расстояния Левенштейна в Википедии, с целью вычисления расстояния Левенштейна для каждого известного слова и возвращения наилучшего соответствия (или неопределенного результата, если расстояние одинаково для нескольких известных слов).

В листинге 1 показан Java-код для вычисления расстояния Левенштейна. В процессе этого вычисления генерируется матрица, у которой количество строк и столбцов соответствует размерам двух сравниваемых текстов плюс единица в каждом измерении. С целью повышения эффективности в этой реализации используется два массива, размеры которых соответствуют целевому тексту. Эти массивы представляют последовательные строки матрицы. На каждом прохождении производится перестановка этих массивов, поскольку для вычисления следующей строки мне нужны только значения из непосредственно предшествующей строки.

Листинг 1. Вычисление расстояния Левенштейна в Java
/**
 * Вычисление расстояния редактирования от targetText до известного слова.
 *
 * @param word known word
 * @param v0 int array of length targetText.length() + 1
 * @param v1 int array of length targetText.length() + 1
 * @return distance
 */
private int editDistance(String word, int[] v0, int[] v1) {
    
    // initialize v0 (prior row of distances) as edit distance for empty 'word'
    for (int i = 0; i < v0.length; i++) {
        v0[i] = i;
    }
    
    // calculate updated v0 (current row distances) from the previous row v0
    for (int i = 0; i < word.length(); i++) {
        
        // first element of v1 = delete (i+1) chars from target to match empty 'word'
        v1[0] = i + 1;
        
        // use formula to fill in the rest of the row
        for (int j = 0; j < targetText.length(); j++) {
            int cost = (word.charAt(i) == targetText.charAt(j)) ? 0 : 1;
            v1[j + 1] = minimum(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost);
        }
        
        // swap v1 (current row) and v0 (previous row) for next iteration
        int[] hold = v0;
        v0 = v1;
        v1 = hold;
    }
    
    // return final value representing best edit distance
    return v0[targetText.length()];
}

При наличии большого количества известных слов для сравнения с неизвестным введенным словом (а также многоядерной системы) для ускорения обработки можно применить одновременное исполнение. Разбейте коллекцию известных слов на несколько фрагментов и обработайте каждый из этих фрагментов как отдельную задачу. Изменяя количество слов в каждом фрагменте, вы можете с легкостью изменять гранулярность разбиения задачи, чтобы увидеть ее влияние на общую производительность. В листинге 2 показан Java-код для вычисления на основе фрагментов, взятый из класса ThreadPoolDistance в примере кода. В листинге 2 используется стандартный интерфейс ExecutorService с количеством потоков, сответствующим количеству доступных процессоров.

Листинг 2. Пофрагментное вычисление расстояния в Java с несколькими потоками
private final ExecutorService threadPool;
private final String[] knownWords;
private final int blockSize;

public ThreadPoolDistance(String[] words, int block) {
    threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    knownWords = words;
    blockSize = block;
}

public DistancePair bestMatch(String target) {
    
    // build a list of tasks for matching to ranges of known words
    List<DistanceTask> tasks = new ArrayList<DistanceTask>();

    int size = 0;
    for (int base = 0; base < knownWords.length; base += size) {
        size = Math.min(blockSize, knownWords.length - base);
        tasks.add(new DistanceTask(target, base, size));
    }
    DistancePair best;
    try {
        
        // pass the list of tasks to the executor, getting back list of futures
        List<Future<DistancePair>> results = threadPool.invokeAll(tasks);
        
        // find the best result, waiting for each future to complete
        best = DistancePair.WORST_CASE;
        for (Future<DistancePair> future: results) {
            DistancePair result = future.get();
            best = DistancePair.best(best, result);
        }
        
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } catch (ExecutionException e) {
        throw new RuntimeException(e);
    }
    return best;
}

/**
 * Shortest distance task implementation using Callable.
 */
public class DistanceTask implements Callable<DistancePair>
{
    private final String targetText;
    private final int startOffset;
    private final int compareCount;
    
    public DistanceTask(String target, int offset, int count) {
        targetText = target;
        startOffset = offset;
        compareCount = count;
    }
    
    private int editDistance(String word, int[] v0, int[] v1) {
        ...
    }

    /* (non-Javadoc)
     * @see java.util.concurrent.Callable#call()
     */
    @Override
    public DistancePair call() throws Exception {
        
        // directly compare distances for comparison words in range
        int[] v0 = new int[targetText.length() + 1];
        int[] v1 = new int[targetText.length() + 1];
        int bestIndex = -1;
        int bestDistance = Integer.MAX_VALUE;
        boolean single = false;
        for (int i = 0; i < compareCount; i++) {
            int distance = editDistance(knownWords[i + startOffset], v0, v1);
            if (bestDistance > distance) {
                bestDistance = distance;
                bestIndex = i + startOffset;
                single = true;
            } else if (bestDistance == distance) {
                single = false;
            }
        }
        return single ? new DistancePair(bestDistance, knownWords[bestIndex]) :
            	new DistancePair(bestDistance);
    }
}

Метод bestMatch() в листинге 2 формирует список экземпляров DistanceTask, а затем передает этот список в интерфейс ExecutorService. Эта форма вызова ExecutorService использует параметр типа Collection<? extends Callable<T>>, представляющий подлежащие исполнению задачи. Этот вызов возвращает список интерфейсов Future<T>, представляющий результаты исполнения. ExecutorService асинхронно заполняет эти результаты значениями, возвращенными посредством вызова метода call() для каждой задачи. В этом случае типом T является DistancePair— объект в виде простого значения для расстояния и соответствующего слова или только расстояния, если уникальное соответствие не найдено.

Исходный поток исполнения в методе bestMatch() поочередно ждет завершения каждого Future, аккумулируя лучший результат и возвращая его после завершения. Когда выполнение DistanceTask осуществляется несколькими потоками, первоначальный поток ждет только части результатов. Остальная часть результатов вычисляется одновременно с теми результатами, которых ждет первоначальный поток.

Производительность одновременного исполнения

Чтобы полностью задействовать все процессоры, доступные в системе, необходимо задать в конфигурации интерфейса ExecutorService количество потоков не менее количества имеющихся процессоров. Кроме того, на исполнение в ExecutorService необходимо передать как минимум столько задач, сколько процессоров имеется в системе. На практике для повышения производительности вам, вероятно, потребуется значительно больше задач, чем количество имеющихся процессоров. В этом случае процессоры будут постоянно загружены поочередным выполнением задач, а простаивать будут лишь в конце. Однако поскольку имеют место издержки — на создание задач и интерфейсов future, на переключение потоков между задачами и, наконец, на возврат результатов задач — необходимо формировать задачи достаточно больших размеров, чтобы издержки были пропорционально малыми.

На рис. 1 показано, как меняется измеренная производительность в зависимости от количества задач при исполнении тестового кода на четырехъядерной системе AMD с Oracle Java 7 под управлением 64-разрядной Linux®. Каждое входное слово поочередно сравнивается с 12564 известными словами, при этом каждая задача находит наилучшее соответствие внутри набора известных слов. Весь набор из 933 входящих слов с орфографическими ошибками обрабатывается многократно, с паузами между проходами для стабилизации состояния JVM. Наилучшее время после 10 проходов используется в графике. Как показано на рис. 1, производительность (измеряемая количеством входящих слов в секунду) является стабильной в диапазоне разумных размеров фрагмента (от 256 до чуть более 1024) и резко снижается только в зонах экстремальных значений, в которых задачи становятся или очень малыми, или очень большими. Финальное значение (для фрагмента величиной 16384 слов) создает лишь одну задачу и поэтому демонстрирует производительность для однопоточного режима.

Рисунок 1. Производительность ThreadPoolDistance
Chart that shows performance of ThreadPoolDistance across a range of block sizes
Chart that shows performance of ThreadPoolDistance across a range of block sizes

Fork-Join

В выпуске Java 7 появилась другая реализация ExecutorService, а именно: класс ForkJoinPool. Класс ForkJoinPool предназначен для эффективного манипулирования задачами, которые могут неоднократно разбиваться на подзадачи, с использованием для задач класса RecursiveAction (когда задача не создает результата) или класса RecursiveTask<T> (когда задача имеет результат типа T). Класс RecursiveTask<T> предоставляет удобный способ для консолидации результатов исполнения подзадач (см. листинг 3).

Листинг 3. Пример класса RecursiveTask<DistancePair>
private ForkJoinPool threadPool = new ForkJoinPool();

private final String[] knownWords;

private final int blockSize;

public ForkJoinDistance(String[] words, int block) {
    knownWords = words;
    blockSize = block;
}

public DistancePair bestMatch(String target) {
    return threadPool.invoke(new DistanceTask(target, 0, knownWords.length, knownWords));
}

/**
 * Shortest distance task implementation using RecursiveTask.
 */
public class DistanceTask extends RecursiveTask<DistancePair>
{
    private final String compareText;
    private final int startOffset;
    private final int compareCount;
    private final String[] matchWords;
    
    public DistanceTask(String from, int offset, int count, String[] words) {
        compareText = from;
        startOffset = offset;
        compareCount = count;
        matchWords = words;
    }
    
    private int editDistance(int index, int[] v0, int[] v1) {
        ...
    }
    
    /* (non-Javadoc)
     * @see java.util.concurrent.RecursiveTask#compute()
     */
    @Override
    protected DistancePair compute() {
        if (compareCount > blockSize) {
            
            // split range in half and find best result from bests in each half of range
            int half = compareCount / 2;
            DistanceTask t1 = new DistanceTask(compareText, startOffset, half, matchWords);
            t1.fork();
            DistanceTask t2 = new DistanceTask(compareText, startOffset + half,
                compareCount - half, matchWords);
            DistancePair p2 = t2.compute();
            return DistancePair.best(p2, t1.join());
        }
        
        // directly compare distances for comparison words in range
        int[] v0 = new int[compareText.length() + 1];
        int[] v1 = new int[compareText.length() + 1];
        int bestIndex = -1;
        int bestDistance = Integer.MAX_VALUE;
        boolean single = false;
        for (int i = 0; i < compareCount; i++) {
            int distance = editDistance(i + startOffset, v0, v1);
            if (bestDistance > distance) {
                bestDistance = distance;
                bestIndex = i + startOffset;
                single = true;
            } else if (bestDistance == distance) {
                single = false;
            }
        }
        return single ? new DistancePair(bestDistance, knownWords[bestIndex]) :
            new DistancePair(bestDistance);
    }
}

На рис. 2 производительность кода ForkJoin, показанного в листинге 3, сравнивается с производительностью кода ThreadPool, показанного в листинге 2. Код ForkJoin гораздо стабильнее во всем диапазоне размеров фрагмента; его производительно значительно падает только в случае единственного блока (другими словами, когда исполнение является однопоточным). Стандартный код ThreadPool демонстрирует более высокую производительность только при размерах фрагмента 256 и 1024.

Рисунок 2. Производительность ThreadPoolDistance в сравнении с производительностью ForkJoinDistance
Chart that compares the performance of ThreadPoolDistance to that of  ForkJoinDistance across range of block sizes
Chart that compares the performance of ThreadPoolDistance to that of ForkJoinDistance across range of block sizes

Как показывают эти результаты, если вы в состоянии настроить размеры задачи в приложении с целью повышения производительности, то стандартный класс ThreadPool предоставляет несколько больше возможностей, чем ForkJoin. Однако при этом необходимо понимать, что "зона наилучших показателей" для ThreadPool зависит от задачи, от количества доступных процессоров и, возможно, от других особенностей вашей системы. В общем случае ForkJoin обеспечивает превосходную производительность с минимальной необходимостью настройки, поэтому рекомендуется использовать ForkJoin везде, где этой возможно.

Основы одновременного исполнения в Scala

Язык Scala расширяет язык программирования и среду исполнения Java по нескольким направлениям, включая добавление новых и более простых способов для осуществления одновременного исполнения. Для начинающих Scala-версия Future<T> является гораздо более гибкой, чем Java-версия. Вы можете создавать future-объекты непосредственно из блоков кода, а также присоединять к future обратные вызовы для завершения обработки. В листинге 4 показаны примеры использования future в Scala. Сначала этот код определяет метод futureInt() для предоставления Future<Int> по требованию, а затем использует future тремя различными способами.

Листинг 4. Пример Scala-кода Future<T>
import ExecutionContext.Implicits.global

val lastInteger = new AtomicInteger
def futureInt() = future {
  Thread sleep 2000
  lastInteger incrementAndGet
}

// use callbacks for completion of futures
val a1 = futureInt
val a2 = futureInt
a1.onSuccess {
    case i1 => {
      a2.onSuccess {
        case i2 => println("Sum of values is " + (i1 + i2))
      }
    }
}
Thread sleep 3000

// use for construct to extract values when futures complete
val b1 = futureInt
val b2 = futureInt
for (i1 <- b1; i2 <- b2) yield println("Sum of values is " + (i1 + i2))
Thread sleep 3000

// wait directly for completion of futures
val c1 = futureInt
val c2 = futureInt
println("Sum of values is " + (Await.result(c1, Duration.Inf) +
  Await.result(c2, Duration.Inf)))

В первом примере в листинге 4 замыкания обратных вызовов присоединяются к двум future-объектам, вследствие чего после завершения исполнения обоих этих future-объектов сумма двух результирующих значений выводится на консоль ("распечатывается"). Обратные вызовы вкладываются непосредственно в future-объект в порядке своего создания, однако они будут работать точно так же, если вы измените этот порядок. Если future-объект уже завершен к тому моменту, когда вы присоединяете обратный вызов, то обратный вызов все равно исполняется, хотя и без гарантии, что это произойдет немедленно. Исходный поток исполнения приостанавливается на строке Thread sleep 3000, чтобы future-объекты смогли завершиться до перехода к следующему примеру.

Во втором примере демонстрируется использование Scala-конструкции for comprehension с целью асинхронного извлечения значений из future-объекта и их применения непосредственно в выражении. Scala-конструкцию for comprehension можно использовать для выражения сложных комбинаций операций (map, filter, flatMap, and foreach). В большинстве случае она используется с различными формами коллекций, однако future-объект Scala реализует те же самые монадные методы, которые используются для обращения к значениям коллекций. Таким образом, future-объект можно использовать в качестве особой разновидности коллекций, которая содержит не более одного значения (и даже может не содержать этого одного значения до некоторого момента в будущем). В данном случае утверждение for дает указание взять результаты future-объектов и использовать значения этих результатов в выражении. "За кадром" эта технология генерирует в значительной степени тот же код, что и в первом примере, однако ее запись в форме линейного кода дает более простое и понятное выражение. Как и в первом примере, исходный поток исполнения приостанавливается, чтобы future-объекты смогли завершиться до перехода к следующему примеру.

В третьем примере для получения результатов future-объектов используется блокирующее ожидание. Это эквивалентно тому, как работает future-объект Java, хотя в случае Scala вызов специального метода Await.result(), принимающего параметр максимального времени ожидания, делает блокировку ожидания явной.

Код в листинге 4 не передает future в класс ExecutorService или в его эквивалент очевидным образом, поэтому если вы не работали с Scala, то можете не понять, каким образом исполняется код, обеспечивающий работу future-объекта. Ответ на этот вопрос находится в верхней строке кода в листинге 4: import ExecutionContext.Implicits.global. API-интерфейсы Scala нередко используют implicit значения для параметров, которые будут часто повторно использоваться в блоке программного кода. Конструкт future { } требует, чтобы ExecutionContext был доступен в виде неявного параметра. В данном случае ExecutionContext— это Scala-оболочка для Java-класса ExecutorService; он используется таким же образом для исполнения задач с применением одного или нескольких управляемых потоков.

Помимо этих базовых операций с future-объектами, язык Scala также предоставляет способ для преобразования любой коллекции в коллекцию, использующую параллельное программирование. После того как вы преобразуете свою коллекцию в параллельную форму, любая стандартная Scala-операция с коллекцией (map, filter, fold), применяемая к этой коллекции, будет автоматически выполняться в параллельном режиме везде, где это возможно (соответствующий пример приведен как часть кода в листинге 7, в котором ищется наилучшее соответствие для слова с использованием Scala).

Обработка ошибок

Future-объекты и в Java, и в Scala должны предусматривать обработку ошибок. В случае Java (точнее, Java 7), future-объект способен выдать исключение ExecutionException вместо возвращения результата. Приложения могут определять собственные субклассы ExecutionException для определенных типов ошибок или связывать исключения в цепочку с целью передачи сведений об этих ошибках дальше, однако на этом их гибкость исчерпывается.

Future-объект Scala обеспечивает более гибкую обработку ошибок. Есть два способа завершения future-объекта Scala: успешное со значением результата (в предположении, что таковой ожидается), или с ошибкой (с ассоциированным с ней Throwable). Завершение future-объекта также можно обработать несколькими способами. В листинге 4 метод onSuccess используется с целью присоединения обратных вызовов для обработки успешного завершения future. Можно также использовать для обработки любой формы завершения параметр onComplete (который обертывает результат или throwable как Try с целью учета обоих вариантов) или параметр onFailure для обработки определенной ошибки. Такая гибкость future-объектов Scala распространяется на все операции, которые можно выполнить с использованием future, что позволяет интегрировать обработку ошибок непосредственно в программный код.

Кроме того, Scala-класс Future<T> имеет тесно связанный с ним класс Promise<T>. Future-объект — это держатель для результата, который может стать доступным в некоторый момент (а может и не стать — не существует никакой гарантии, что future-объект когда-либо завершится). После завершения future-объекта полученный результат является фиксированным и неизменяемым. Promise — это другая сторона этого соглашения: это однократный присваиваемый держатель для результата, представленный в форме значения результата или throwable. Вы можете получить future из promise, а когда результат настроен на какой-либо promise, он также настроен и на соответствующий future.

Применение одновременного исполнения в Scala

Теперь, когда вы познакомились с некоторыми базовыми концепциями одновременного исполнения в Scala, пора рассмотреть программный код для задачи по нахождению расстояния Левенштейна. В листинге 5 показана более-менее характерная Scala-реализация вычисления расстояния Левенштейна, которая в основном соответствует Java-коду в листинге 1, но имеет функциональный стиль.

Листинг 5. Вычисление расстояния Левенштейна в Scala
val limit = targetText.length
/** Вычисление расстояния редактирования от targetText до известного слова.
  *
  * @param word known word
  * @param v0 int array of length targetText.length + 1
  * @param v1 int array of length targetText.length + 1
  * @return distance
  */
def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {

  val length = word.length

  @tailrec
  def distanceByRow(rnum: Int, r0: Array[Int], r1: Array[Int]): Int = {
    if (rnum >= length) r0(limit)
    else {

      // first element of r1 = delete (i+1) chars from target to match empty 'word'
      r1(0) = rnum + 1

      // use formula to fill in the rest of the row
      for (j <- 0 until limit) {
        val cost = if (word(rnum) == targetText(j)) 0 else 1
        r1(j + 1) = min(r1(j) + 1, r0(j + 1) + 1, r0(j) + cost);
      }

      // recurse with arrays swapped for next row
      distanceByRow(rnum + 1, r1, r0)
    }
  }

  // initialize v0 (prior row of distances) as edit distance for empty 'word'
  for (i <- 0 to limit) v0(i) = i

  // recursively process rows matching characters in word being compared to find best
  distanceByRow(0, v0, v1)
}

Код в листинге 5 использует метод хвостовой рекурсии distanceByRow() для каждого вычисления значения строки. Этот метод сначала проверяет, сколько строк было вычислено, и если полученное число соответствует количеству символов в проверяемом слове, возвращает результирующее расстояние. В противном случае он вычисляет новые значения строк, а затем завершается, вызывая себя рекурсивно для вычисления следующей строки (переключая два массива строк в ходе этого процесса, чтобы новые значения текущих строк передавались корректно). Scala преобразует методы хвостовой рекурсии в эквивалентные Java-циклы while, благодаря чему сохраняется подобие Java-коду.

Тем не менее между этим кодом и кодом на Java есть одно значительное различие. В коде, показанном в листинге 5, конструкции for comprehension используют замыкания. Нынешние JVM-машины обрабатывают замыкания не всегда эффективно (более подробная информация содержится в статье Why is using for/foreach on a Range slow?), в результате чего они добавляют значительные издержки к самому внутреннему циклу вычислений. Соответственно, в написанном здесь виде код в листинге 5 работает медленнее, чем Java-версия. В листинге 6 показан переписанный код, в котором вместо конструкций for comprehension применяются добавленные методы хвостовой рекурсии. Эта версия гораздо многословнее, однако ее производительность находится на одном уровне с Java-версией.

Листинг 6. Программный код вычислений, реструктурированный с целью повышения производительности
val limit = targetText.length

/** Вычисление расстояния редактирования от targetText до известного слова.
  *
  * @param word known word
  * @param v0 int array of length targetText.length + 1
  * @param v1 int array of length targetText.length + 1
  * @return distance
  */
def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {

  val length = word.length
  
  @tailrec
  def distanceByRow(row: Int, r0: Array[Int], r1: Array[Int]): Int = {
    if (row >= length) r0(limit)
    else {

      // first element of v1 = delete (i+1) chars from target to match empty 'word'
      r1(0) = row + 1

      // use formula recursively to fill in the rest of the row
      @tailrec
      def distanceByColumn(col: Int): Unit = {
        if (col < limit) {
          val cost = if (word(row) == targetText(col)) 0 else 1
          r1(col + 1) = min(r1(col) + 1, r0(col + 1) + 1, r0(col) + cost)
          distanceByColumn(col + 1)
        }
      }
      distanceByColumn(0)

      // recurse with arrays swapped for next row
      distanceByRow(row + 1, r1, r0)
    }
  }

  // initialize v0 (prior row of distances) as edit distance for empty 'word'
  @tailrec
  def initArray(index: Int): Unit = {
    if (index <= limit) {
      v0(index) = index
      initArray(index + 1)
    }
  }
  initArray(0)

  // recursively process rows matching characters in word being compared to find best
  distanceByRow(0, v0, v1)
}

В листинге 7 показан Scala-код для выполнения варианта пофрагментного вычисления расстояния, подобного тому, который осуществляет Java-код в листинге 2. Метод bestMatch() находит наилучшее соответствие для целевого текста внутри определенной группы слов, обрабатываемой экземпляром класса Matcher, с использованием метода хвостовой рекурсии best() при сканировании по словам. Классы *Distance создают несколько экземпляров Matcher, по одному для каждой группы слов, а затем координируют исполнение и комбинирование результатов matcher.

Листинг 7. Пофрагментное вычисление расстояния с несколькими потоками в Scala
class Matcher(words: Array[String]) {

  def bestMatch(targetText: String) = {

    val limit = targetText.length
    val v0 = new Array[Int](limit + 1)
    val v1 = new Array[Int](limit + 1)
    
    def editDistance(word: String, v0: Array[Int], v1: Array[Int]) = {
      ...
    }

    @tailrec
    /** Сканирование всех известных слов в наборе с целью нахождения наилучшего соответствия.
      *  
      * @param index next word index
      * @param bestDist minimum distance found so far
      * @param bestMatch unique word at minimum distance, or None if not unique
      * @return best match
      */
    def best(index: Int, bestDist: Int, bestMatch: Option[String]): DistancePair =
      if (index < words.length) {
        val newDist = editDistance(words(index), v0, v1)
        val next = index + 1
        if (newDist < bestDist) best(next, newDist, Some(words(index)))
        else if (newDist == bestDist) best(next, bestDist, None)
        else best(next, bestDist, bestMatch)
      } else DistancePair(bestDist, bestMatch)

    best(0, Int.MaxValue, None)
  }
}

class ParallelCollectionDistance(words: Array[String], size: Int) extends TimingTestBase {

  val matchers = words.grouped(size).map(l => new Matcher(l)).toList
  
  def shutdown = {}
  
  def blockSize = size

  /** Нахождение наилучшего результата для всех matcher с использованием параллельной коллекции. */
  def bestMatch(target: String) = {
    matchers.par.map(m => m.bestMatch(target)).
      foldLeft(DistancePair.worstMatch)((a, m) => DistancePair.best(a, m))
  }
}

class DirectBlockingDistance(words: Array[String], size: Int) extends TimingTestBase {

  val matchers = words.grouped(size).map(l => new Matcher(l)).toList
  
  def shutdown = {}
  
  def blockSize = size

  /** Нахождение наилучшего результата для всех matcher с использованием прямого ожидания блокировок. */
  def bestMatch(target: String) = {
    import ExecutionContext.Implicits.global
    val futures = matchers.map(m => future { m.bestMatch(target) })
    futures.foldLeft(DistancePair.worstMatch)((a, v) =>
      DistancePair.best(a, Await.result(v, Duration.Inf)))
  }
}

Два класса *Distance в листинге 7 демонстрируют разные способы координации исполнения и комбинирования результатов Matcher. Класс ParallelCollectionDistance использует вышеупомянутую Scala-функциональность параллельных коллекций для сокрытия деталей параллельных вычислений; ему требуется лишь простой foldLeft для объединения результатов.

Класс DirectBlockingDistance является несколько более явным; он создает список future-объектов, а затем применяет foldLeft к этому списку с ожиданием вложенных блокировок для каждого отдельного результата.

Еще раз о производительности

Каждая из реализаций *Distance в листинге 7 представляет собой разумный подход к обработке результатов Matcher (и этим перечень разумных подходов далеко не исчерпывается: в Примере кода приведено несколько других реализаций, которые я опробовал в своих экспериментах, но не включил в эту статью). В данном случае основным поводом для беспокойства является производительность, поэтому на рис. 3 показаны показатели этих двух реализаций в сравнении с показателями Java-кода ForkJoin.

Рисунок 3. Производительность ForkJoinDistance в сравнении с производительностью альтернатив на Scala
Chart that compares the performance ForkJoinDistance to that of Scala alternatives
Chart that compares the performance ForkJoinDistance to that of Scala alternatives

Как видно из рис. 3, Java-код ForkJoin в целом демонстрирует более высокую производительность, чем любая из Scala-реализаций, хотя DirectBlockingDistance обеспечивает наилучшую производительность при фрагментах размером в 1024 слова. Обе Scala-реализации имеют более высокую производительность, чем код ThreadPool из листинга 1, в большей части диапазона размеров фрагмента.

Эти результаты для производительности являются лишь иллюстративными и далеко не окончательными. Если вы выполните тесты на время исполнения в своей системе, то с большой вероятностью увидите различия в относительной производительности, особенно при ином количестве процессорных ядер. Кроме того, если бы я хотел получить максимально возможную производительность для задачи определения расстояния, я бы оптимизировал код. Я мог бы отсортировать известные слова по длине и начать со сравнения с вводимыми словами такой же длины (поскольку расстояние редактирования всегда не меньше разности длины слов). Я также мог бы разу прекращать вычисления расстояния, если результат превысил лучшее ранее полученное значение. Тем не менее при всей относительной простоте алгоритма этот эксперимент весьма полезен — он показывает, как одновременное исполнение операций позволяет повысить производительность и как влияют на результат разные подходы влияют на распределение работы.

Помимо производительности, интерес представляет сравнение двух Scala-версий управляющего кода в листинге 7 с Java-кодом в листинге 2 и листинге 3. Scala-код значительно короче и понятнее, чем Java-код (в предположении, что вы понимаете Scala!). Scala и Java хорошо взаимодействуют друг с другом, как показывает полный пример кода для этой статьи: Scala-код выполняет тесты на время исполнения для Scala-кода и для Java-кода, а Java-код, в свою очередь, работает непосредственно с фрагментами Scala-кода. Благодаря этой простоте взаимодействия Scala-код можно ввести в существующую базу Java-кода без крупномасштабных изменений. Зачастую имеет смысл на начальном этапе использовать Scala для высокоуровневого управления Java-кодом. Это позволит полностью задействовать мощные выразительные возможности Scala без какого-либо существенного влияния на производительность со стороны замыканий или преобразований.

Простота Scala-кода ParallelCollectionDistance в листинге 7 особенно впечатляет. Использование этого подхода позволяет полностью абстрагировать одновременное исполнение — написанная вами программа будет выглядеть как однопоточное приложение, но при этом она будет пользоваться возможностями нескольких процессоров. Хорошая новость для специалистов, которым нравится простота этого подхода, но которые не желают или не имеют возможности перейти на использование Scala в процессе разработки: выпуск Java 8 привносит подобную функциональность непосредственно в Java-программирование.

Будущие статьи данного цикла

Итак, вы познакомились с основами одновременного исполнения в Java и в Scala. В следующей статье данного цикла демонстрируется, каким образом Java 8 улучшает поддержку одновременного исполнения Java-кода (а потенциально, в более длительной перспективе — и Scala-кода). Многие изменения Java 8 будут выглядеть знакомыми — многие концепции, на которых базируется одновременное исполнение в Scala, включены в Java 8 — поэтому вы вскоре сможете применить некоторые приемы Scala в своем обычном Java-коде. Прочтите следующую статью, чтобы узнать подробности.


Ресурсы для скачивания


Похожие темы


Комментарии

Войдите или зарегистрируйтесь для того чтобы оставлять комментарии или подписаться на них.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Технология Java
ArticleID=985390
ArticleTitle=Одновременное исполнение на платформе JVM: Основы одновременного исполнения в Java и в Scala
publish-date=10072014