Java development 2.0: Вторая волна разработки Java-приложений: Обмен сообщениями в облаке с помощью Amazon SQS

Платите за то, что используете, с помощью системы для организации очередей сообщений Amazon

Служба очередей Amazon Simple Queue Service (SQS) сочетает в себе все необходимое для ориентированного на обработку сообщений промежуточного ПО (MOM), но не привязывает вас к какому-либо одному языку программирования или системе. Узнайте, как использовать Amazon SQS для решения проблем установки и поддержки систем организации очередей сообщений, сохранив преимущества гибкой модели оплаты “по мере использования”, свойственной AWS.

Эндрю Гловер, президент компании, Stelligent Incorporated

Эндрю ГловерЭндрю Гловер является президентом компании Stelligent Incorporated , которая помогает другим фирмам решать проблемы качества программного обеспечения. Для этого используются эффективные стратегии тестирования и технологии непрерывной интеграции, которые позволяют коллективам разработчиков постоянно контролировать качество кода, начиная с ранних стадий разработки. Просмотрите блог Энди , там можно найти список его публикаций.



23.03.2012

Об этой серии

С момента первого появления технологии Java™ подходы к разработке ПО на этом языке существенно изменились. Благодаря появлению мощных систем с открытым исходным кодом и надежных инфраструктур для предоставления средств разработки в аренду появилась возможность быстро и дешево транслировать, тестировать, исполнять и поддерживать приложения Java. В этой серии статей, Эндрю Гловер описывает широкий спектр средств и технологий, благодаря которым стала возможной эта новая парадигма разработки на языке Java.

Очереди сообщений широко применяются в различных архитектурах и областях применения программного обеспечения, включая финансовые системы, здравоохранение и туристический бизнес. Тем не менее для применения ориентированного на обработку сообщений промежуточного ПО (MOM) — наиболее распространенного решения для обмена сообщениями в распределенных системах — требуется установить и поддерживать систему организации очередей. В этом месяце я представляю основанное на облачных вычислениях альтернативное решение: службу Amazon Simple Queue Service (SQS).

Так же, как часто имеет смысл размещать Web-приложения на Google App Engine или Amazon Elastic Beanstalk (см. Ресурсы), бывает целесообразно воспользоваться облачной системой обмена сообщениями. В обоих случаях вы сможете посвятить больше времени написанию приложения, а не установке и поддержке базовой инфраструктуры.

В данной статье описывается, каким образом Amazon SQS решает проблему установки и поддержки системы организации очередей. Также вы сможете попрактиковаться в создании очередей сообщений в SQS, а затем в записи и извлечении из них сообщений. И, наконец, я продемонстрирую, что получится, если добавить службу обмена сообщениями в Magnus, мобильное Web-приложение, которое я описывал в опубликованной в прошлом месяце статье Введение в Amazon Elastic Beanstalk.

Кто говорит? MOM.

Ориентированное на обработку сообщений промежуточное ПО, или MOM, — это термин, используемый для описания слабосвязанных систем, взаимодействующих через очереди сообщений. Компоненты такой системы не имеют тесных связей (например, через зависимости времени компиляции), а распределены по сети. Такое распределение, в котором роль среды обмена информацией играют очереди сообщений, позволяет легко масштабировать основанные на обмене сообщениями системы.

Традиционно в системе, ориентированной на обработку сообщений, архитекторы сами решают, какие компоненты будут взаимодействовать друг с другом. Поскольку все взаимодействие опирается на обмен сообщениями, сами сообщения зачастую имеют универсальный кросс-платформенный формат. В роли сообщений могут выступать простые строковые данные или даже документы в формате XML или JSON.

Поскольку архитектура MOM разъединяет компоненты и обеспечивает кросс-платформенную связь между ними, отдельные блоки могут быть разнородными. Это значит, что компоненты распределенной архитектуры могут быть написаны на разных языках, таких как Java, C# и Ruby. Кроме того, компоненты могут работать на разных платформах, таких как UNIX® и Windows®. К тому же MOM упрощает системную интеграцию. Являясь промежуточным ПО, MOM может соединять как унаследованные, так и новые системы. Это связано с тем, что в роли API между компонентами выступают простые сообщения, которые могут иметь любую форму, от документа XML до сериализованного объекта или простых строковых данных.

GAE — вот ваш MOM!

Очереди сообщений в системе MOM, подобно водопроводной системе, соединяют разные компоненты системы, обеспечивая свободный поток сообщений. Как мы увидим, превосходным примером ПО, ориентированного на обработку сообщений, является GAE.

Подобно всем хорошим MOM, Google App Engine использует для развязывания системных процессов очереди сообщений. В частности, очереди GAE позволяют отделить долго работающие процессы от Web-запросов. Используя GAE, вы выгружаете URL, соответствующие сервлетам или JSP, в очередь сообщений, откуда они выбираются и обрабатываются службами GAE. Сервлеты работают асинхронно по отношению к главной логической последовательности Web-приложения. (Дополнительную информацию о GAE можно найти в Ресурсах.)

Впрочем, постановка в очередь долго работающих процессов для управления длительностью главных процессов используется не только в GAE. Такая MOM-подобная функция предлагается и в других реализациях PaaS, таких как Heroku. Однако Amazon SQS позволяет делать это для любого Web-приложения, независимо от платформы.


Введение в Amazon SQS

Amazon SQS предлагает множество функций, которые должны быть вам знакомы, если вы пользовались очередями сообщений в JMS.

Amazon SQS - это не JMS

В очередях сообщений на платформе Java нет ничего нового, что хорошо видно на примере спецификаций JMS. JMS известна уже более десяти лет и имеет внушительный список реализаций, в том числе RabbitMQ, Apache ActiveMQ и даже IBM Websphere® MQ. Однако API Amazon SQS не использует интерфейсы JMS. Более того, он, пожалуй, значительно проще JMS, и с ним легче работать.

Amazon SQS:

  • Позволяет нескольким процессам выполнять чтение и запись в одной и той же очереди. Кроме того, он блокирует сообщения на время обработки, гарантируя, что сообщение будет обрабатываться лишь одним читающим процессом, даже если несколько процессов выполняют чтение из одной очереди.
  • Выгодно использует многократно резервированную архитектуру Amazon, обеспечивая чрезвычайно высокую степень доступности при одновременном доступе. Это также гарантирует доставку сообщения (хотя бы один раз).
  • Вам необходимо платить только за то, чем вы воспользовались. В случае Amazon SQS это значит, что вы будете платить $0,000001 за сообщение. В настоящее время AWS предлагает бесплатный уровень обслуживания, по которому первые 100000 сообщений в месяц бесплатны. Но не забывайте, что существует общая для всех продуктов AWS плата за трафик, исчисляемый в гигабайтах.

Начать работу с SQS просто, как и все в AWS. Если у вас еще нет учетной записи AWS, для начала создайте ее. Затем активируйте Amazon SQS. И, наконец, используйте интерфейс AWS Java SDK для публикации и чтения облачных сообщений! (Сам процесс написания сообщений описан ниже.)


Написание сообщений SQS

Как следует из самого названия Amazon SQS, логика чтения и записи в очередь здесь очень проста. Сначала устанавливается соединение с AWS с помощью действующего ключа доступа и секретного ключа, как показано в листинге 1:

Листинг 1. Установка соединения с AWS
AmazonSQS sqs = new AmazonSQSClient(new BasicAWSCredentials(AWS_KEY, AWS_SECRET));

Затем вам понадобится очередь. Вызов функции createQueue в AWS API, как показано в листинге 2, не обязательно создает новую очередь. Если очередь уже существует, то возвращается ее дескриптор. В SQS очереди представляют собой URL, поэтому дескриптор очереди — это тоже URL. Обратите внимание, что в AWS SDK API, Queue URL имеет тип String, а не Java URL.

Листинг 2. Получение дескриптора очереди
String url = sqs.createQueue(new CreateQueueRequest("a_queue")).getQueueUrl();

Теперь, когда у вас есть очередь, можно записывать в нее сообщения. Формат сообщения SQS подобен формату SimpleDB (см. Ресурсы), в котором сообщения имеют тип String. Однако не забывайте, что данные типа String можно легко структурировать и сделать легко поддающимися разбору, представив их в правильно сформированном формате JSON или XML.

Листинг 3. Отправка сообщений через SQS
sqs.sendMessage(new SendMessageRequest(url, "It's a wonderful life!"));

SQS — что может быть проще

Не забывайте, что первой и основной характеристикой Amazon SQS является простота, а это значит, что он не имеет дополнительных возможностей, к которым вы могли привыкнуть. Например, SQS не делает упреждающих уведомлений, поэтому процессы, читающие очередь SQS, должны периодически ее опрашивать на предмет появления новых сообщений. Это, конечно, не страшно, но это дополнительно нагружает приложение, что в некоторых случаях может оказаться неприемлемым. Эту проблему решает служба уведомлений Amazon Simple Notification Service (SNS), но ее обсуждение выходит за рамки данной статьи.

Длина сообщения ограничена и, по умолчанию, не может превышать 8 КБ. Если вам нужны сообщения большей длины, их можно разделить на части, обозначив эти части последовательными идентификаторами. Затем эти сообщения можно собрать на стороне приема.

Вот и все — для помещения сообщения в очередь SQS вам понадобятся лишь три строки кода.

Об AWS SDK

Возможно, инструментарий AWS SDK показался вам знакомым, особенно если вы читали мое введение в SimpleDB (см. Ресурсы). Поскольку все функции AWS являются Web-сервисами, все взаимодействие выполняется через HTTP. Поэтому API имитирует логические запросы с помощью Request-подобных объектов, таких как SendMessageRequest или CreateQueueRequest. В обоих случаях имена описывают назначение объекта.

Следует также отметить, что помещенные в SQS сообщения долговечны: они остаются на месте до тех пор, пока вы их не удалите. (На самом деле сообщения со временем исчезают, даже если их не удалять; по умолчанию интервал автоматического удаления равен четырем дням.) При чтении сообщения Amazon SQS использует простую стратегию блокировки — если выполняется чтение, сообщение будет недоступно для других одновременных процессов чтения в течение времени, известного как таймаут видимости сообщения. По умолчанию это время равно 30 секундам, но при необходимости вы можете его изменить.

Долговечность сообщений, живущих в инфраструктуре Amazon, внушает уверенность. Подобно SimpleDB и даже S3, компоненты в мире AWS существенно избыточны. Если читающий процесс (или процессы) будут неожиданно остановлены во время обработки сообщения, существует достаточно высокая вероятность того, что сообщение сохранится. Что еще важнее, если какой-то ресурс в сети AWS тоже решит скончаться, вы можете быть уверены, что критически важные сообщения не потеряются, — они по-прежнему будут существовать на некотором числе других машин. И, наконец, как и в случае всех других продуктов AWS, вы можете привязать физическое положение инфраструктуры своих сообщений к региону - США, Европа и т.д.


Чтение сообщений SQS

Код для записи сообщения в очередь SQS занимает всего три строки. Для чтения сообщения требуется немногим больше. При этом первые две строки остаются без изменений, учитывая, что вам нужно подключиться к AWS и получить дескриптор той же очереди. Amazon SQS не предлагает функций обратного вызова и упреждающего уведомления при поступлении сообщений. Вы должны периодически опрашивать очередь SQS и проверять, не появились ли новые сообщения. Соответственно чтение очереди SQS потребует нескольких дополнительных строк кода.

Хочу сделать одно предупреждение относительно реализации стратегии опроса: перед обработкой сообщения обязательно проверяйте, что вы действительно получили достоверное сообщение. Если вы не будете этого делать, вы рано или поздно увидите злосчастное NullPointerException.

Например, если у меня есть действующее соединение с AWS и дескриптор очереди с сообщениями, я могу получать сообщения, как показано в листинге 4:

Листинг 4. Получение сообщений через SQS
 while (true) {
  List<Message> msgs = sqs.receiveMessage(
     new ReceiveMessageRequest(url).withMaxNumberOfMessages(1)).getMessages();

  if (msgs.size() > 0) {
   Message message = msgs.get(0);
   System.out.println("Это сообщение " + message.getBody());
   sqs.deleteMessage(new DeleteMessageRequest(url, message.getReceiptHandle()));
  } else {
    System.out.println("ничего не найдено, повторная попытка через 30 секунд");
    Thread.sleep(3000); 
  }
}

В листинге 4 ссылка на sqs имеет тип AmazonSQS, что видно из листинга 1. Этот объект имеет метод receiveMessage, который принимает ReceiveMessageRequest. ReceiveMessageRequest можно настроить так, чтобы он запрашивал заданное число сообщений в очереди. В своем случае я настроил его так, чтобы он просто выбирал сообщения по одному. Независимо от количества запрошенных сообщений метод receiveMessage возвращает список типов сообщений.

Реализация стратегии опроса

Как я уже говорил, считывание SQS выполняется через опрос; причем метод receiveMessage блокировку не выполняет. Поэтому я должен убедиться, что соответствующий список List (msgs) действительно содержит сообщения. Если из очереди ничего не извлечено, то вызов функции getMessages на ReceiveMessageRequest возвратит не null, а пустой список.

Если я извлек достоверное сообщение, я могу получить его содержимое или тело, вызвав функцию getBody. Не забывайте, что как только вы получаете дескриптор действительного сообщения, SQS его блокирует. По умолчанию у меня есть 30 секунд, чтобы что-то сделать с этим сообщением. Если я хочу навсегда исключить сообщение из обработки, его надо удалить. Для этого я вызываю функцию deleteMessage, которая выполняет запрос DeleteMessageRequest.

Копии сообщения различаются по дескриптору приема, как id. Этот дескриптор непосредственно связан не с сообщением, а с событием считывания. Сообщение, которое считывалось несколько раз (например, если оно не было удалено, или если процесс чтения был неудачен), может иметь несколько разных дескрипторов приема. В результате, если вы захотите удалить сообщение, вы должны указать дескриптор приема через вызов getReceiptHandle .

Вместо того чтобы постоянно проверять, есть ли в очереди сообщение, я использовал функцию ожидания, которая делает паузу в 30 секунд, если не было получено никаких сообщений. Конечно, в некоторых случаях паузы могут быть нежелательны или, наоборот, время ожидания лучше увеличить.

Приведя эти несколько строк кода, я достаточно полно охватил возможности Amazon SQS. И хотя AWS SDK предлагает множество других функций и возможностей, приведенного кода вполне достаточно для чтения и записи сообщений в очереди SQS.

Теперь давайте посмотрим, что будет, если применить этот код.


Magnus встречается с Amazon SQS

В прошлом месяце я написал простое мобильное Web-приложение Magnus, которое использовал для демонстрации некоторых возможностей Amazon Elastic Beanstalk (см. Ресурсы). Magnus имеет чудесную функцию сохранения информации о местоположении, полученной от мобильного устройства абонента, — как раз той информации, которую многие хотели бы предоставить, а многие другие хотели бы получить.

Регистрация местоположения — это хорошо, но что людям действительно по душе, так это графики (а также блестящие кнопки со скругленными углами). Для построения графиков и анализа может потребоваться довольно сложная обработка, если при этом приходится ворошить неимоверные объемы данных. (Кто-нибудь уже пробовал Hadoop?) Справиться с этим позволяет проверенный временем принцип извлечения, преобразования и загрузки или ETL. На самом деле ETL — достаточно широкий термин, охватывающий множество вещей. (На этой аббревиатуре делаются целые компании и карьеры!) В данном случае имеется в виду, что я собираюсь проанализировать некоторые данные MongoDB и создать на основе этих данных новый документ.

Применение ETL в случае Amazon SQS

Когда речь заходит об анализе данных, существует множество возможностей выполнения запросов и предоставления ответов. Web-приложение Magnus использует лишь малую часть этого потенциала: оно извлекает и представляет данные, относящиеся к географическим координатам, времени и учетным записям пользователей. Технически Magnus анализирует широту и долготу, идентификатор учетной записи пользователя, метки времени и взаимосвязи между этими данными.

Magnus может создать графическое представление этих данных, привязав учетные записи пользователей к географическому положению (например, построив карту с маркерами, указывающими на владельца учетной записи в любой заданный момент времени). Также оно может показать перемещение владельца учетной записи в данном регионе (еще одна карта). Для предоставления такой информации используется ETL-подобный процесс, работающий в автономном режиме. Предоставление данных в реальном времени может оказаться слишком затратным с точки зрения обработки, поэтому эти функции анализа можно считать функциями квазиреального времени.

Чтобы использовать Amazon SQS в Magnus, понадобится некоторая подготовка. Во-первых, мне нужно средство получения учетных данных AWS. Мне нравится Play (см. Ресурсы),поэтому в качестве среды разработки я буду использовать именно его. Для получения учетных данных я могу использовать автоматически считываемый файл параметров доступа application.conf среды Play.

Листинг 5. Добавление конфигурационных данных AWS в файл application.conf
#AWS configuration
aws_access_key_id=1S..........MR2
aws_secret_access_key=S3.........ZM

После определения свойств я могу получить их, вызвав объект Play среды Play, как показано в листинге 6:

Листинг 6. Получение информации AWS в Play
public class Application extends Controller {

 private static final String AWS_KEY = 
    Play.configuration.get("aws_access_key_id").toString();
 private static final String AWS_SECRET = 
    Play.configuration.get("aws_secret_access_key").toString();

//....
}

Определив эту схему, я могу переходить к делу. Код в листинге 7 подобен фрагменту кода, который я использовал в своем введении в Amazon Elastic Beanstalk, опубликованном в прошлом месяце. В данном случае я просто добавил в saveLocation некоторый код, помещающий простой документ JSON в очередь с именем "locations_queue". JSON выглядит примерно следующим образом: {"id":"4d6baeb52a54f1000001"}. Идентификатор сохраненного местоположения предоставляется получателю сообщения для просмотра и анализа.

Листинг 7. Метод saveLocation для помещения сообщений в SQS
public static void saveLocation(String id, JsonObject body) throws Exception {
 String eventname = body.getAsJsonPrimitive("name").getAsString();
 double latitude = body.getAsJsonPrimitive("latitude").getAsDouble();
 double longitude = body.getAsJsonPrimitive("longitude").getAsDouble();
 String when = body.getAsJsonPrimitive("timestamp").getAsString();

 SimpleDateFormat formatter =
   new SimpleDateFormat("dd-MM-yyyy HH:mm");
 Date dt = formatter.parse(when);

 ObjectId oid = new Location(id, dt, latitude, longitude).save();

 AmazonSQS sqs = new AmazonSQSClient(new BasicAWSCredentials(AWS_KEY, AWS_SECRET));

 Map mp = new HashMap<String, String>();
 mp.put("id", oid.toString());

 String url = sqs.createQueue(new CreateQueueRequest("locations_queue")).getQueueUrl();
 sqs.sendMessage(new SendMessageRequest(url, new Gson().toJson(mp)));

 renderJSON(getSuccessMessage());
}

Свидание с Ruby?

Теперь, когда сообщения занесены в очередь SQS, мне нужно извлечь их из очереди и выполнить некоторую обработку. Как вы помните, одним из преимуществ MOM является возможность использования разнородной архитектуры. В связи с этим считыватель SQS можно написать на языке, отличном от Java; более того, он может работать на другой платформе!

Поскольку я могу написать процедуру анализа на любом языке, я собираюсь сделать это на Ruby — чтобы поддержать репутацию крутого парня.

В листинге 8 я использовал gem-компонент языка Ruby right_aws, который помог мне работать с SQS. Во многих смыслах gem можно представить как файл-контейнер. Библиотека right_aws очень похожа на Amazon SDK для Java, хотя она и менее многословна и существенно более проста в работе.

Листинг 8. Создание подключения и очереди в Ruby для SQS
require "right_aws"
#...
sqs  = RightAws::SqsGen2.new(aws_access_key_id, aws_secret_access_key)
queue = sqs.queue('locations_queue')

Как видите, две строки кода из листинга 8 устанавливают соединение с AWS и получают дескриптор моей очереди с именем 'locations_queue'.

Затем я применяю механизм опроса, показанный в листинге 9. Ссылка на @queue представляет собой ту же переменную queue из листинга 8. Однако в данном случае она определяется как часть класса. Таким образом, в листинге 9 я непосредственно переопределяю ее в экземплярную переменную с помощью синтаксиса @ языка Ruby.

Листинг 9. Обработка сообщений из SQS
def process_messages()
  while true
    msg = @queue.pop
    if !msg.nil?
      handle_message(msg) # impl of which does neat stuff
	  msg.delete
    else
      sleep 10
    end
  end
end

После того как я передал сообщение в handle_message, я могу его удалить. Если сообщение не найдено, главный поток ожидает 10 секунд. Строка !msg.nil? аналогична msg != null в коде Java. Однако в Ruby даже null является объектом. Запрос к объекту, является ли он типом nil (через вызов метода nil?), возвращает значение в виде логической переменной.


Заключение

Поскольку AWS основана на Web-сервисах, она доступна из библиотек разных платформ. Гибкость такого подхода продемонстрирована на примере приложения Magnus: я смог вставить сообщения в очередь SQS с помощью кода на языке Java, а затем извлечь их с помощью небольшой программы на языке Ruby. Одним из преимуществ архитектуры на основе очередей является то, что она подразумевает развязку компонентов.

Точно так же, как часто имеет смысл размещать Web-приложения на GAE или Amazon Elastic Beanstalk, бывает целесообразно использовать облачную систему обмена сообщениями. Amazon SQS избавляет вас от забот об установке и поддержке систем организации очередей сообщений. Вы просто создаете очередь и затем помещаете и извлекаете из нее сообщения. Об остальном позаботится Amazon.

Ресурсы

Научиться

Получить продукты и технологии

  • Amazon Simple Queue Service: прочтите справочную документацию и приступайте к работе с Amazon SQS.

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Технология Java, Open source
ArticleID=806657
ArticleTitle=Java development 2.0: Вторая волна разработки Java-приложений: Обмен сообщениями в облаке с помощью Amazon SQS
publish-date=03232012