Развертывание масштабируемых Web-сервисов и их поддержка с помощью Flume

Генерируемые системой log-файлы могут оказаться полезными при выявлении причин различных программных или аппаратных неисправностей. Из полученной информации можно извлечь полезные сведения, которые могут помочь усовершенствовать системную архитектуру, предотвратить падение производительности системы и снизить время ее простоев. В последнее время организации стали использовать данные log-файлов для понимания сути различных проблем. Flume представляет собой отказоустойчивую распределенную службу для эффективного сбора, накопления и перемещения в централизованное хранилище больших объемов данных log-файлов. Из этой статьи вы узнаете о том, как можно использовать Flume для развертывания простого распределенного Web-сервиса в кластере Hadoop.

Зафар Гилани, стажер-исследователь, Telefonica Research

Zafar GilaniЗафар Гилани (Zafar Gilani) – стажер-исследователь в компании Telefonica Research. Занимается распределенными вычислениями и обработкой и анализом больших данных. Раньше Зафар занимался поддержкой Infiniband в MPJ Express и работал научным сотрудником Национальной ускорительной лаборатории SLAC.



11.11.2013

Архитектура Flume

Flume – это распределенная, надежная и служба высогой готовности для сбора, накопления и перемещения в централизованное хранилище больших объемов потоковых данных, получаемых из множества источников.

Рисунок 1. Архитектура Flume
Архитектура Flume

Событие Flume можно рассматривать как фрагмент потока данных, обладающий определенной полезной нагрузкой (измеряемой в байтах) и необязательным набором строковых атрибутов. Агент Flume – это JVM-процесс, содержащий компоненты, через которые события передаются из внешних источников к следующему узлу назначения.

Решение InfoSphere® BigInsights™ позволяет выполнять непрерывный анализ и хранение потоковых данных с малым временем задержки. Для конфигурирования процессов-агентов и процессов-сборщиков, описанных выше, можно использовать продукт InfoSphere Streams (см. раздел Ресурсы). Можно поступить по-другому: использовать Flume для сбора данных из удаленных источников, а сборщик, отвечающий за хранение данных в распределенной файловой системе (DFS), сконфигурировать на сервере InfoSphere BigInsights. В этой статье мы будем использовать Flume одновременно и в качестве агента, и в качестве сборщика, а хранилищем будет являться кластер с распределенной файловой системой Hadoop (Hadoop Distributed File System, HDFS).


Модель информационного потока

Агент Flume состоит из трех главных компонентов: источник данных (source), информационный канал (channel) и получатель данных (sink). Источник принимает события, передаваемые ему внешними источниками, например, Web-службами. Внешний источник передает события в Flume в требуемом формате. Когда источник данных Flume получает событие, он сохраняет его в одном или нескольких каналах. Канал является пассивным хранилищем, в котором событие хранится до тех пор, пока не будет передано получателю. Например, для локальной файловой системы используется файловый канал; получатель извлекает событие из канала и помещает его во внешнее хранилище (например, в файловую систему HDFS) или направляет в источник данных следующего агента Flume (т. е. передает событие следующему узлу). Источник и получатель данных каждого отдельного агента работают асинхронно, поэтапно обрабатывая события в информационном канале.

В зависимости от поставленных задач источник может работать с различными форматами. Например, для приема событий Avro от клиентов Avro можно использовать источник Flume Avro, который образует половину механизма многоуровнего сбора данных Flume. Внутренние механизмы этого источника используются для прослушивания и обработки событий приемопередатчика NettyTransceiver. Источник Avro можно совместить со встроенным получателем AvroSink для создания топологий многоуровневого сбора данных. Другие распространенные форматы сетевых потоков, поддерживаемые Flume – Thrift, Syslog и Netcat.


Avro

Avro – это формат сериализации данных, разработанный компанией Apache. Это инфраструктура на базе RPC, широко используемая в продуктах Apache (например, Flume и Hadoop) для передачи и сохранения данных (см. раздел Ресурсы). Инфраструктура Avro предоставляет структуры данных с широкими возможностями, компактный и быстрый двоичный формат данных, а также простую интеграцию с динамическими языками, такими как C++, Java™, Perl и Python. В качестве языка описания интерфейса (Interface Description Language, IDL) для определения типов данных и протоколов в Avro используется JSON.

Работа Avro основана на схемах, которые хранятся вместе с данными. Это позволяет быстро и просто упорядочивать данные, поскольку в этом случае отсутствуют накладные затраты на обработку каждого значения. При вызове удаленной процедуры (RPC) обмен схемой происходит во время установления связи между клиентом и сервером. Поскольку в Avro используется JSON, соответствие между полями легко устанавливается.


Надежность, восстанавливаемость и многоузловые потоки

Для гарантированной доставки событий во Flume используется механизм на основе транзакций. Каждое событие представляет собой отдельную транзакцию, и все события последовательно размещаются в каналах каждого агента. Каждое событие в потоке передается либо следующему агенту (например, в источник "bar" одноименного агента на рис. 2), либо в конечное хранилище (например, в файловую систему HDFS). События удаляются из канала только после сохранения в канале следующего агента или в конечном хранилище; таким образом события находятся в очереди до тех пор, пока не будет получено подтверждение об их успешном сохранении. В этом алгоритме задействованы источник и получатель данных, которые помещают информацию о доставке или сохранении событий в генерируемые каналом транзакции. Такой механизм обеспечивает сквозную надежность потока для одноузловой доставки сообщений в Flume.

Восстанавливаемость обеспечивается за счет того, что все события в канале выстраиваются в очередь; это позволяет избегать сбоев при восстановлении. В Flume поддерживается использование надежного файлового канала на базе локальной файловой системы (все данные надежно сохраняются в постоянной памяти). Если в работе системы возникает сбой или ошибка, то при использовании файлового канала все потерянные события могут быть восстановлены. Существует также канал, в котором очередь событий хранится в оперативной памяти; данный способ обеспечивает более высокую скорость работы, но в случае возникновения сбоя все события, находящиеся в памяти, будет невозможно восстановить.

В Flume можно создавать многоузловые потоки, в которых события передаются между несколькими агентами, прежде чем достигнуть конечной точки. В таких конфигурациях получатель данных предыдущего узла и источник данных следующего узла используют транзакции для гарантированного сохранения данных в канале следующего узла.

Рисунок 2. Многоузловые потоки
Многоузловые потоки

Архитектура системы

В этом разделе я покажу, как настроить масштабируемый Web-сервис с помощью Flume. Для этого нам потребуется написать код для чтения RSS-каналов. Также нам потребуется сконфигурировать агенты и сборщики данных Flume, которые будут получать данные RSS-каналов и сохранять их в файловой системе HDFS.

InfoSphere BigInsights Quick Start Edition

InfoSphere BigInsights Quick Start Edition – это доступная для загрузки бесплатная версия InfoSphere BigInsights, разработанная компанией IBM на базе Hadoop. Эта версия позволяет ознакомиться со всеми возможностями (такими как Big SQL, текстовый анализ и BigSheets), которые компания IBM разработала для расширения функциональности Open Source-проекта Hadoop. Извлечь максимальную пользу от работы с Hadoop вам помогут обучающие материалы, включая различные инструкции, пошаговые руководства и видеоматериалы. Вы сможете работать с большими объемами данных без ограничений по времени или объему. Смотрите видеоматериалы, изучайте руководства (в формате PDF) и загрузите BigInsights Quick Start Edition прямо сейчас.

Конфигурация агента Flume хранится в локальном текстовом файле, похожем на файл свойств Java. Один конфигурационный файл может содержать конфигурации одного или нескольких агентов. Для каждого агента задаются свойства источника, получателя и канала, а также определяются связи между ними, формирующие потоки данных.

Для источника Avro требуется указать имя или IP-адрес узла, а также номер порта для получения данных. Для канала памяти можно задать максимальный размер очереди, а для получателя HDFS требуется указать URI файловой системы и путь для создания файлов. Получатель Avro может быть перенаправляющим получателем (avro-forward-sink), передающим данные следующему агенту Flume.

Идея нашего примера заключается в создании миниатюрной системы Flume для сбора данных из распределенных RSS-каналов (log-данных). Мы будем использовать агентов в качестве узлов, получающих данные (в нашем случае это будут данные RSS-каналов) от считывателя RSS-каналов. Эти агенты будут передавать RSS-данные узлу-сборщику, который будет сохранять их в кластере HDFS. В нашем примере мы будем использовать два узла-агента Flume, один узел-сборщик и три узла кластера HDFS. В таблице 1 указаны источники и получатели данных для узлов-агентов и узла-сборщика.

Таблица 1. Источники и получатели данных для узлов-агентов и узлов-сборщиков
УзлыИсточникПолучатель
АгентRSS-каналСборщик
СборщикАгентыHDFS

На рисунке 3 изображена архитектура нашей многоузловой системы с двумя агентами, сборщиком и кластером HDFS. Web-канал RSS (его код будет представлен ниже) является источником Avro для обоих агентов, а данные RSS-канала размещаются в канале оперативной памяти. Как только RSS-данные накапливаются в каналах памяти двух агентов, получатели Avro начинают передавать эти события источнику Avro узла-сборщика. Для передачи данных в кластер HDFS узел-сборщик использует канал памяти и HDFS-получатель. Конфигурации агентов и сборщика будут представлены ниже.

Рисунок 3. Обзор архитектуры многоузловой системы
Обзор архитектуры многоузловой системы

Давайте посмотрим, как можно создать простой сервис для чтения новостей с помощью Flume. В листинге 1 представлен Java-код приложения для чтения Web-каналов RSS компании BBC. Как вы, возможно, уже знаете, RSS – это семейство стандартизированных форматов Web-каналов, используемых для публикации часто обновляющихся материалов – например, блогов, заголовков новостей, аудио- и видеоматериалов. В RSS используется модель подписки на публикации, в которой осуществляется регулярная проверка обновлений содержимого RSS-каналов, на которые вы подписаны.

В листинге 1 используются API-интерфейсы Java Net and Javax XML, при помощи которых выполняется чтение и предварительная обработка содержимого URL-источников модели W3C Document, после чего информация записывается в канал Flume.

Листинг 1. Java-код приложения для чтения RSS (RSSReader.java)
import java.net.URL;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.CharacterData;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

public class RSSReader {
  private static RSSReader instance = null;
  private RSSReader() {
  }
  public static RSSReader getInstance() {
    if(instance == null) {
      instance = new RSSReader();
    }
    return instance;
  }
  public void writeNews() {
    try {
      DocumentBuilder builder = DocumentBuilderFactory.newInstance().
newDocumentBuilder();
      URL u = new URL("http://feeds.bbci.co.uk/news/world/rss.xml
?edition=uk#");
      Document doc = builder.parse(u.openStream());
      NodeList nodes = doc.getElementsByTagName("item");
      for(int i=0;i<nodes.getLength();i++) {
        Element element = (Element)nodes.item(i);
        System.out.println("Title: " + getElementValue(element,"title"));
        System.out.println("Link: " + getElementValue(element,"link"));
        System.out.println("Publish Date: " + getElementValue(element,"pubDate"));
        System.out.println("author: " + getElementValue(element,"dc:creator"));
        System.out.println("comments: " + getElementValue(element,"wfw:comment"));
        System.out.println("description: " + getElementValue(element,"description"));
        System.out.println();
      }
    } catch(Exception ex) {
      ex.printStackTrace();
    }
  }
  private String getCharacterDataFromElement(Element e) {
    try {
      Node child = e.getFirstChild();
      if(child instanceof CharacterData) {
        CharacterData cd = (CharacterData) child;
        return cd.getData();
      }
    } catch(Exception ex) {
    }
    return "";
  }
  protected float getFloat(String value) {
    if(value != null && !value.equals("")) {
      return Float.parseFloat(value);
    }
    return 0;
  }
  protected String getElementValue(Element parent,String label) {
    return getCharacterDataFromElement((Element)parent.getElements
ByTagName(label).item(0));
  }
  public static void main(String[] args) {
    RSSReader reader = RSSReader.getInstance();
    reader.writeNews();
  }
}

В следующих листингах содержатся тестовые конфигурации для агентов (узлы с адресами 10.0.0.1 и 10.0.0.2) и сборщика (узел с адресом 10.0.0.3). В конфигурационных файлах задана семантика для источников, каналов и получателей. Для каждого типа источника также необходимо определить тип, команду, а также опции и стандартные действия при возникновении ошибок. Для каждого канала необходимо указать его тип и емкость (максимальное количество хранимых событий), а также емкость транзакций (максимальное количество событий, принимаемых или отправляемых каналом в рамках одной транзакции). Для каждого получателя необходимо указать его тип, имя (IP-адрес получателя события) и порт. В случае с HDFS-получателем необходимо указать путь к NameNode-узлу HDFS.

В листинге 2 содержится конфигурация для узла 10.0.0.1.

Листинг 2. Конфигурация агента 1 (файл flume-conf.properties на узле 10.0.0.1)
# В конфигурационном файле необходимо указать источники,
# каналы и получателей.
# Источники, каналы и получатели указываются для каждого агента,
# в нашем случае он называется 'agent'.
agent.sources = reader
agent.channels = memoryChannel
agent.sinks = avro-forward-sink

# Указываем тип каждого источника.
agent.sources.reader.type = exec
agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt
# Устройство stderr игнорируется, если не задано значение logStdErr=true. 
# Если по какой-либо причине процесс прекращается, то источник также 
# завершает работу и больше не генерирует никакие данные.
agent.sources.reader.logStdErr = true
agent.sources.reader.restart = true
 
# Указываем тип канала.
agent.sources.reader.channels = memoryChannel

# Необходимо указать тип каждого получателя.
agent.sinks.avro-forward-sink.type = avro
agent.sinks.avro-forward-sink.hostname = 10.0.0.3
agent.sinks.avro-forward-sink.port = 60000

# Указываем канал, который будет использоваться получателем.
agent.sinks.avro-forward-sink.channel = memoryChannel

# Указываем тип каждого канала.
agent.channels.memoryChannel.type = memory

# Для каждого типа канала (получатель или источник)
# можно задать дополнительные конфигурационные параметры.
# В нашем случае мы указываем емкость канала памяти.
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 100

В листинге 3 содержится конфигурация для узла 10.0.0.2.

Листинг 3. Конфигурация агента 2 (файл flume-conf.properties на узле 10.0.0.2)
agent.sources = reader
agent.channels = memoryChannel
agent.sinks = avro-forward-sink

# Указываем тип каждого источника.
agent.sources.reader.type = exec
agent.sources.reader.command = tail -f /var/log/flume-ng/source.txt
# Устройство stderr игнорируется, если не задано значение logStdErr=true. 
# Если по какой-либо причине процесс прекращается, то источник также 
# завершает работу и больше не генерирует никакие данные.
agent.sources.reader.logStdErr = true
agent.sources.reader.restart = true
 
# Указываем тип канала.
agent.sources.reader.channels = memoryChannel

# Необходимо указать тип каждого получателя.
agent.sinks.avro-forward-sink.type = avro
agent.sinks.avro-forward-sink.hostname = 10.0.0.3
agent.sinks.avro-forward-sink.port = 60000

# Указываем канал, который будет использоваться получателем.
agent.sinks.avro-forward-sink.channel = memoryChannel

# Указываем тип каждого канала.
agent.channels.memoryChannel.type = memory

# Для каждого типа канала (получатель или источник)
# можно задать дополнительные конфигурационные параметры.
# В нашем случае мы указываем емкость канала памяти.
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 100

В листинге 4 содержится конфигурация для узла 10.0.0.3.

Листинг 4. Конфигурация сборщика (файл flume-conf.properties на узле 10.0.0.3)
# В конфигурационном файле необходимо указать источники,
# каналы и получателей.
# Источники, каналы и получатели указываются для каждого агента,
# в нашем случае он называется 'agent'
agent.sources = avro-collection-source
agent.channels = memoryChannel
agent.sinks = hdfs-sink

# Указываем тип каждого источника.
agent.sources.avro-collection-source.type = avro
agent.sources.avro-collection-source.bind = 10.0.0.3
agent.sources.avro-collection-source.port = 60000

# Указываем тип канала.
agent.sources.avro-collection-source.channels = memoryChannel

# Необходимо указать тип каждого получателя.
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = hdfs://10.0.10.1:8020/flume

# Указываем канал, который будет использоваться получателем.
agent.sinks.hdfs-sink.channel = memoryChannel

# Указываем тип каждого канала.
agent.channels.memoryChannel.type = memory

# Для каждого типа канала (получатель или источник)
# можно задать дополнительные конфигурационные параметры.
# В нашем случае мы указываем емкость канала памяти.
agent.channels.memoryChannel.capacity = 10000

Дальнейшие действия

Теперь, когда у нас есть код приложения для чтения RSS-каналов и мы сконфигурировали агенты и сборщик Flume, можно переходить к настройке всей системы. Для этого нужно выполнить три действия.

Шаг 1

Скомпилированный Java-код следует запустить в фоновом режиме, чтобы он постоянно работал.

Листинг 5. Скомпилированный Java-код
$ javac RSSReader.java
$ java -cp /root/RSSReader RSSReader > /var/log/flume-ng/source.txt &

Шаг 2

Прежде чем запускать агенты, необходимо изменить конфигурационные файлы с помощью шаблона из директории $FLUME_HOME/conf/. После изменения конфигурационных файлов агенты можно запускать с помощью следующих команд.

В листинге 6 показан запуск агента на узле 1.

Листинг 6. Запуск агента на узле 1
Agent node 1 (on 10.0.0.1):
$ $FLUME_HOME/bin/flume-ng agent -n agent1 -c conf -f 
$FLUME_HOME/conf/flume-conf.properties

В листинге 7 показан запуск агента на узле 2.

Листинг 7. Запуск агента на узле 2
Agent node 2 (on 10.0.0.2):
$ $FLUME_HOME/bin/flume-ng agent -n agent2 -c conf -f 
$FLUME_HOME/conf/flume-conf.properties

Здесь переменная $FLUME_HOME определена в качестве переменной среды командного интерпретатора bash или .bashrc и содержит путь к домашней директории Flume (например, (/home/user/flume-1.4/).

Шаг 3

В листинге 8 показан запуск сборщика. Следует отметить, что конфигурационный файл определяет режим работы узла (в частности, является ли узел агентом или сборщиком).

Листинг 8. Запуск сборщика на узле 10.0.0.3
$ $FLUME_HOME/bin/flume-ng agent -n collector -c conf -f 
$FLUME_HOME/conf/flume-conf.properties

Заключение

В этой статье вы познакомились с Flume – распределенной и надежной службой высокой готовности для эффективного сбора больших объемов log-данных. Вы узнали, как можно использовать Flume для развертывания одноузловых и многоузловых потоков. Также был подробно рассмотрен пример, в котором мы создали многоузловой Web-сервис по сбору данных. В этом примере агенты Avro считывали данные из RSS-каналов, а сборщик HDFS сохранял их в файловой системе. Flume можно использовать для построения масштабируемых распределенных систем сбора больших объемов потоковых данных, получаемых из множества источников

Ресурсы

Научиться

  • Оригинал статьи: Deploying and managing scalable web services with Flume (EN).
  • Узнайте о настройке кластера Hadoop (EN).
  • Ознакомьтесь с руководством по Flume (EN).
  • Узнайте больше об Avro (EN).
  • Прочтите статью "Визуализация и анализ цепочек поставок" и узнайте о визуализации данных цепочек поставок.
  • Прочтите статью "Essentials, Part 1, Lesson 1: Compiling and Running a Simple Program" (EN).
  • Познакомьтесь с книгой Пита Уордена (Pete Warden) Big Data Glossary (EN) (издательство O'Reilly Media, ISBN: 1449314597, 2011 г.).
  • Узнайте больше о "больших данных", посетив раздел Big Data на сайте developerWorks (EN), в котором вы найдете техническую документацию, пошаговые инструкции и руководства, обучающие материалы, ссылки на загрузки, информацию о продуктах и многое другое.
  • В разделе InfoSphere BigInsights (EN) Web-сайта developerWorks вы найдете все необходимое для начала работы с этим продуктом. IBM InfoSphere BigInsights расширяет функциональность Open Source-проекта Hadoop, добавляя в него такие функции как Big SQL, текстовый анализ и BigSheets.
  • Просмотрите руководства для самостоятельного изучения (в формате PDF) (EN) и научитесь управлять средой больших данных, импортировать данные для анализа и анализировать их при помощи BigSheets. Узнайте, как создать ваше первое приложение для работы с большими данными, разработать запросы Big SQL для их анализа, а также создать модуль извлечения данных из текстовых документов с помощью InfoSphere BigInsights.
  • В разделе InfoSphere Streams (EN) Web-сайта developerWorks вы найдете все необходимое для начала работы с этим продуктом. IBM InfoSphere Streams – это мощная вычислительная платформа, позволяющая клиентским приложениям быстро получать, анализировать и сопоставлять информацию, поступающую в реальном времени из тысяч источников.
  • Следите на последними новостями на портале Web-трансляций и технических мероприятий developerWorks (EN).

Получить продукты и технологии

Обсудить

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Open source, Information Management
ArticleID=952496
ArticleTitle=Развертывание масштабируемых Web-сервисов и их поддержка с помощью Flume
publish-date=11112013