Анализ больших наборов данных с помощью Hive

Каждые 24 часа в отрасли больших данных собираются и записываются терабайты данных. Растет потребность найти полезную информацию в этом быстро увеличивающемся объеме данных. Общей задачей для всех (от руководителей высшего звена до инженеров) является формирование прогнозов и принятие решений на основе содержащейся этой информации. В статье рассказывается, как анализировать большие наборы данных с помощью Apache Hive, хранилища данных для распределенных приложений, работающих с большими объемами данных.

Зафар Гилани, стажер-исследователь, Telefonica Research

Zafar GilaniЗафар Гилани (Zafar Gilani) – стажер-исследователь в компании Telefonica Research. Занимается распределенными вычислениями и обработкой и анализом больших данных. Раньше Зафар занимался поддержкой Infiniband в MPJ Express и работал научным сотрудником Национальной ускорительной лаборатории SLAC.



Уль Хак, генеральный директор, TunaCode

Photo of Salman Ul HaqСалман Хак (Salman Haq) — один из основателей и генеральный директор компании TunaCode Inc., разрабатывающей высокопроизводительные вычислительные решения для визуализации данных с использованием собственной библиотеки CUVI GPU Imaging в таких отраслях, как производство, оборона, медицина и развлечения. Кроме того, компания является разработчиком решения gKrypt, обеспечивающего сверхскоростное шифрование в оборонной промышленности. Адрес электронной почты: salman@tunacode.com.



27.11.2013

Введение

Индустрия больших данных научилась собирать и записывать терабайты данных, но задача состоит в формировании прогнозов и принятии решений на основе содержащейся этой информации. Вот почему так важен Apache Hive. Он проецирует структуру на данные и обращается к этим данным посредством SQL-подобных запросов для выполнения задач Map (отображение) и Reduce (сокращение) на больших наборах данных.

Для тех же целей предназначена разработанная IBM закрытая система InfoSphere BigInsights. Она основана на проекте с открытым исходным кодом Apache Hadoop, но имеет дополнительную функциональность для использования в корпоративных решениях. Она упрощает использование Hadoop и создание приложений, работающих с большими наборами данных. Аналогично Hive, она предоставляет SQL-интерфейс для Hadoop, так что пользователь может обращаться к данным в BigInsights, не изучая новый язык программирования. Кроме того, она обеспечивает высокую готовность узла BigInsights NameNode (также известного как MasterNode) для плавного и прозрачного восстановления после сбоев, тем самым уменьшая время простоя системы.

В качестве примера анализа больших данных мы выполним при помощи Hive анализ записей о соединениях (Call Data Records – CDR). Термин CDR используется в телекоммуникационной сфере и относится к любому событию, которое может быть использовано для выставления счета абоненту. Примерами событий, которые записываются в хранилища данных и используются для выставления счетов абонентам, являются, например, начало, окончание и продолжительность соединения или объем переданных через смартфон интернет-данных.


Перед началом работы

Необходимо на базовом уровне знать используемые в статье технологии и концепции. В разделе Ресурсы приведены учебные руководства, охватывающие следующие аспекты:

  • Написание базовых SQL-сценариев (DDL и DML), таких как select, create и insert.
  • Написание и компиляция Java™-программ.
  • Пакетирование байт-кода Java в JAR-файл.

Использование CDR-записей в качестве примера

InfoSphere BigInsights Quick Start Edition

InfoSphere BigInsights Quick Start Edition представляет собой бесплатную загружаемую версию платформы IBM InfoSphere BigInsights. Quick Start Edition позволяет опробовать возможности, такие как Big SQL, анализ текста и BigSheets, созданные IBM в развитие проекта с открытым исходным кодом Hadoop. Управляемое обучение с помощью пошаговых руководств и видеоматериалов для самостоятельного изучения поможет приступить к работе с Hadoop. Вы сможете в свое свободное время экспериментировать с большими данными без ограничений по времени или по объему. Смотрите видеоматериалы, изучайте руководства (в формате PDF) и загружайте BigInsights Quick Start Edition.

Подробная запись о соединении (Call Detail Record – CDR), также известная как запись данных о соединении (Call Data Record) – это запись данных, выполняемая телефонной станцией или другой телекоммуникационной аппаратурой, документирующая подробности телефонного соединения, прошедшего через оборудование или устройство. CDR состоит из полей данных, которые описывают телекоммуникационные операции. Этими полями могут быть:

  • Номер телефона абонента.
  • Номер телефона адресата.
  • Время начала разговора.
  • Продолжительность соединения.
  • Номер телефона для выставления счета.
  • Идентификатор оборудования телефонной станции.
  • Идентификатор записи.
  • Результаты вызова (занято или сбой).
  • Маршрут, по которому вызов поступил на станцию.
  • Маршрут, по которому вызов покинул станцию.
  • Вид соединения (голос, SMS и т.д.).

В статье, наряду с CDR-записями, мы также будем использовать другой набор данных – так называемый сетевой журнал. В сетевом журнале фиксируется сетевая активность – звонок пользователя, работа в Интернете и с электронной почтой или просто переход на другую сотовую станцию. Запись сетевого журнала может состоять из следующих полей:

  • Время начала события.
  • IMSI (уникальный идентификатор сотовой сети).
  • IMEI (уникальный идентификатор мобильного телефона).
  • Вид соединения (голос, SMS и т.д.).
  • Тип соты (код типа сотовой станции, записывающей эту информацию).
  • Идентификатор соты (идентификатор сотовой станции, записывающей эту информацию).
  • Номер телефона абонента.
  • Широта (географическая координата станции).
  • Долгота (географическая координата станции).

Поставщиков телекоммуникационных услуг интересует оценка различных тенденций с целью планирования будущих модернизаций и развертываний на основании реальных данных. Например, типичный поставщик хочет знать, через какую сотовую станцию проходит большая часть соединений. Другая ценная информация – какие базовые станции перегружены в различных временных интервалах (особенно закономерности суточного трафика) и какие виды соединений выполняются чаще всего в различных направлениях. Мы прокоррелируем эти источники больших данных, имеющие, как правило, ежедневный объем порядка сотен гигабайт для сотовой сети среднего размера.

Следует отметить, что журналы соединений содержат данные о каждой операции в каждой сотовой сети. По последней информации компания Verizon предоставила Агентству национальной безопасности (NSA) беспрецедентное право получать информацию журналов соединений непосредственно с серверов Verizon. Были переданы миллионы записей журналов соединений, содержащих номера абонентов, продолжительность соединений и расположение конечных станций.


Использование Hive для анализа больших данных

Начнем с коррелирования двух наборов данных, описанных выше. Для этой цели Hive предоставляет SQL-подобную семантику соединения (join). Наиболее распространенной операцией соединения, используемой в приложениях, является внутреннее соединение, которое можно рассматривать как тип соединения по умолчанию. Внутреннее соединение объединяет значения столбцов двух таблиц (назовем их А для CDR-записей и В для записей сетевого журнала) в соответствии с предикатом join. Запрос внутреннего соединения сравнивает каждую строку таблицы А с каждой строкой таблицы B, находя все пары строк, которые удовлетворяют предикату join. Если условие предиката join выполняется, значения столбцов соответствующих записей таблиц А и В объединяются в новую запись. Внутреннее соединение можно рассматривать как декартово произведение двух таблиц, возвращающее записи, удовлетворяющие предикату join. Упрощенный запрос соединения показан ниже.

Запрос внутреннего соединения

В предикате join определяется условие on (см. запрос в листинге 1).

Листинг 1. Запрос внутреннего соединения
hive> insert overwrite table correlation            \
      partition(dt=20130101, hour=12)                  \
      select cdr.timestamp,      cdr.subscriberPhone,  \
      cdr.recipientPhone,        cdr.duration,         \
      cdr.billingPhone,          cdr.exchangeID,       \
      cdr.recordID,              cdr.callResult,       \ 
      cdr.entryRoute,            cdr.exitRoute,        \
      cdr.callType,                                    \
      net.dtstamp,               net.minute            \
     net.second,                 net.IMSI,             \
     net.IMEI,                   net.callType,         \
     net.cellType,               net.cellID,           \
     net.subscriberPhone,        net.cellLat,          \    
     net.cellLon                                       \
      from cdr join net                                \
on (cdr.subscriberPhone = net.subscriberPhone)         \
      where combine(net.dtstamp,                       \
            net.hour,                                  \ 
            net.minute,                                \
            net.second) <= cdr.timestamp            \
      and cdr.timestamp like '2013010112%              \
      and net.dtstamp like '20130101'                  \
      and net.hour = 12;

Рассмотрим запрос подробно. Часть запроса insert overwrite указывает интерпретатору Hive вставить выбранные поля в таблицу, имя которой (в данном случае correlation) следует за ключевым словом table. Ключевое слово join и оператор where строго соответствуют стандартному синтаксису SQL.

Таблицу correlation можно создать с помощью стандартной SQL-команды create table. В приведенном выше запросе ключевое слово partition определяет, в какой раздел таблицы correlation должны быть вставлены данные. Таблица может иметь один или несколько столбцов разделов и отдельный каталог данных для каждой отдельной комбинации значений в столбцах разделов. Листинг 2 является примером создания таблицы с разделами. В качестве примера мы используем таблицу correlation.

Листинг 2. Таблица correlation
hive> create table correlation(
timestamp string,     subscriberPhone string,         \
      recipientPhone string,     duration int,        \
      billingPhone string,     exchangeID string,     \
      recordID string,         callResult string,     \
      entryRoute string,     exitRoute string,        \
    callType string,         dtstamp string,          \
minute int,            second int,                    \
IMSI string,        IMEI string,                      \
callType string,        cellType string,              \
cellID string,        subscriberPhone string,         \
cellLat string,        cellLon string)                \
partitioned by (dt string, hour int)                  \
location '/mnt/user/hive/warehouse/correlation'

Мы также используем специальную определяемую пользователем функцию (UDF) Hive (см. раздел Ресурсы). UDF-функция пишется пользователем и может многократно использоваться в интерфейсе командной строки Hive (CLI). Это обобщающий термин и одновременно Java-класс. Наша UDF-функция combine() объединяет дату, часы, минуты и секунды в формат YYYYMMDDhhmmss. Код Java-класса DateCombiner UDF приведен ниже. Обратите внимание, что мы расширили Java-класс UDF. Также можно использовать Java-класс GenericUDF, который представляет собой новую реализацию семантики UDF. Он обеспечивает более высокую производительность за счет использования отложенных вычислений и укороченных логических операторов. Он также поддерживает непримитивные параметры и переменное число аргументов.

Java-класс UDFпроще в использовании, чем GenericUDF, и обеспечивает приемлемый уровень производительности. Он использует Reflection API, поэтому работает немного медленнее, чем GenericUDF. Он не принимает и не возвращает непримитивные параметры, такие как массивы, структуры или карты, а также не поддерживает переменное число аргументов. Java-класс GenericUDF можно использовать при работе со сложными типами данных, но в большинстве случаев достаточно Java-класса UDF.

При написании UDF-функции прежде всего определяется пакет Java. Затем в исходный код импортируется Java-класс UDF, чтобы компилятор Java знал, какой родительский Java-класс отвечает за методы, используемые наследующим классом. Декларация import является элементом исходного кода времени компиляции и не используется во время выполнения. Причина в том, что байт-код JVM всегда использует полные имена классов.

Затем объявляется класс, расширяющий родительский Java-класс UDF. Определяется метод evaluate(), содержащий логику программы. Короче говоря, UDF должна обладать двумя признаками:

  1. Она должна быть подклассом Java-классов UDF или GenericUDF.
  2. Она должна реализовывать по крайней мере один метод evaluate() (см. листинг 3).
Листинг 3. Определение метода evaluate()
package HiveUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class DateCombiner extends UDF (
public String evaluate(final String dt, final String hh, final String mm,
        final String ss) {
      it(dt==null) { return null; }
      String ho="", mi="", se="",
      if(hh.length() != 2) { ho = "0".concat(hh); } else { ho = hh; }
     if(mm.length() != 2) { mi = "0".concat(mm); } else { mi = mm; }
     if(ss.length() != 2) { se = "0".concat(ss); } else { se = ss; }
     return new String(dt + ho + mi + se);
  }

Есть несколько важных шагов, которые нужно сделать перед использованием UDF и выполнением приведенного выше запроса соединения. Во-первых, UDF должна быть скомпилирована и упакована в JAR-файл. Для компиляции необходимо добавить исполняемый JAR-файл Hive или включить его в classpath Java (см. листинг 4).

Листинг 4. Добавление исполняемого JAR-файла Hive
$javac -cp /apps/hive/lib/hive-exec-0.7.1-cdh3u3.jar DateCombiner.java
$ jar cf HiveUDF.jar HiveUDF/

Затем нужно загрузить JAR-файл UDF в интерфейсе командной строки Hive. Затем нужно создать временную функцию combine, которая соответствует Java-классу DateCombiner UDF (см. листинг 5).

Листинг 5. Функция combine
hive> add jar /home/verrami1/work/JARS/HiveUDF.jar;
hive> create temporary function combine as 'HiveUDF.DateCombiner';

Мы рассмотрели использование Java-класса UDF для внутреннего соединения большого набора данных (в нашем случае это два столбца базы данных CDR). Теперь перейдем к некоторым общим рекомендациям по Hive, касающимся анализа больших данных.


Оптимизация скорости и производительности заданий Hive

В этом разделе мы рассмотрим некоторые рекомендации по созданию и выполнению запросов в Hive. Хотя эти рекомендации не являются специфичными для нашего конкретного случая, они применимы и здесь в силу огромных объемов данных, обрабатываемых телекоммуникационными компаниями.

Разбиение данных на разделы

Правильно выбранные критерии создания разделов при определении таблицы способствуют увеличению скорости обработки заданий Hive. Тщательное создание разделов облегчает поиск и устранение неисправностей. Лучшим способом разделения таблиц является использование одной или нескольких метрик в зависимости от ожидаемого размера таблицы. В нашем примере анализа CDR и сетевых журналов мы ожидаем, что в систему будет записываться один гигабайт CDR-данных. Умножая на 24 часа, получаем дневной объем данных (по которым будет выполняться запрос на выборку) порядка 24 ГБ. Эти данные непрерывно поступают в базу данных и при ненадлежащем их делении на разделы может потребоваться огромный кластер для выполнения даже самых простых запросов. Листинг 6 демонстрирует правильный подход к разделению данных.

Листинг 6. Правильный подход к разделению данных
hive> create table correlation(                         \
     timestamp string,           subscriberPhone string,   \
     recipientPhone string,      duration int,             \
     billingPhone string,        exchangeID string,        \
     recordID string,            callResult string,        \
     entryRoute string,          exitRoute string,         \
     callType string,                                      \
     dtstamp string,              minute int,              \
     second int,                  IMSI string,             \
     IMEI string,                 callType int,            \
     cellType int,                cellID string,           \
     subscriberPhone string, cellLat string,               \
     cellLon string)                                       \
     partitioned by(dt string, hour int)                   \
     location '/mnt/data/correlation';

Каждая система имеет ограничение на физические ресурсы, такие как число ядер и память. При неправильном разбиении базы данных на разделы выполнение пакетного задания анализа данных может привести к внезапному сбою из-за переполнения или нехватки памяти. В результате вы получите множество ошибок, из которых нельзя будет понять, что виновником является именно переполнение памяти.

Детальное описание таблицы

В Hive есть очень хорошая функция, которая позволяет просматривать информацию о таблице, такую как столбцы, типы данных, место хранения, размер и т.д. Для просмотра этой информации используйте команду describe formatted с именем таблицы (см. листинг 7). Так как выводимые данные огромны, мы показали только поля.

Листинг 7. Команда describe formatted
hive> describe formatted correlation;
# col_name        data_type       comment
timestamp         string          None
subscriberPhone   string          None
recipientPhone    string          None
duration          string          None
..
# Partition Information
# col_name        data_type       comment
dt                string           None
hour              int              None
..
CreateTime: Mon, Jun 17 2013 PST
Location: /mnt/data/correlation

Соединение (таблица большего размера справа)

Как уже говорилось, операция соединения сначала вычисляет декартово произведение двух таблиц, а затем возвращает те строки (с объединенными столбцами), которые удовлетворяют предикату join. Процесс соединения требует большого объема вычислений и много памяти. Чтобы эффективно использовать соединение, необходимо располагать большие таблицы (в строках) с правой стороны операции join. Например, если таблица А имеет 1000000 строк, а таблица B имеет 200000 строк, запрос join должен выглядеть примерно так: hive>... B join A ..;.

Оператор join является реализацией операции соединения реляционной алгебры, хорошо описанной математиками. Если записать A join B, Hadoop-задания map и reduce нужно будет выполнить для 1000000 строк таблицы А, чтобы найти соответствующие строки в таблице B. Наоборот, если записать B join A, нужно будет перебрать 200000 строк таблицы В, чтобы найти соответствующие строки в таблице A. Это гораздо более эффективно, поскольку выполняется меньше итераций.

Операторы where (с диапазонами)

Также нужно сделать другую важную вещь – связать запросы с ограничениями, используя операторы where. Обратите внимание, что в нашем запросе join мы использовали четыре условия связывания. Оператор where предотвращает исполнение запроса "в лоб". Представляйте это как динамическое разделение таблицы, из которой извлекаются данные.

Использование стандартных функций, таких как collect_set()

Наряду с множеством полезных стандартных агрегирующих функций, таких как count(), sum(), min(), max() и т.д., Hive предоставляет функцию сбора данных collect_set(). Она возвращает набор объектов, удаляя повторяющиеся элементы. Например, чтобы найти пользователей, зарегистрированных сотовой станцией в полдень 1 января 2013 года, можно использовать запрос, приведенный в листинге 8.

Листинг 8. Функция collect_set()
hive> select cellID, collect_set(subscriberPhone) from correlation 
where dt=?20130101? and hour=12 group by cellID;

Приведение типов

Hive предоставляет функцию cast() для преобразования строки в целое число или число двойной точности и наоборот. Приведение типов является лучшим способом гарантировать, что сравнение будет выполнено так, как задумано. В листинге 9 демонстрируется использование функции cast(). Выражение и тип могут быть целым числом (integer), большим целым числом (bigint), числом с плавающей точкой (float), числом двойной точности (double) или строкой (string). В листинге 9 демонстрируется преобразование строки в целое число и числа двойной точности в строку.

Листинг 9. Функция cast()
cast(expression as <type>)
hive> cast('1' as int)
hive> cast(75.89 as string)

Заключение

В этой статье мы продемонстрировали применение Hive для анализа больших наборов данных с использованием Hadoop в качестве серверной части. Мы показали, как может выглядеть схема большого рабочего набора данных. Мы объяснили на примере, как соединить два больших набора данных и сформировать набор сопоставленных данных. Чтобы сохранить огромный объем сопоставленных данных, мы создали таблицу с разделами. Затем мы рассмотрели рекомендации по рациональному и эффективному использованию Hive.

Ресурсы

Научиться

Получить продукты и технологии

Обсудить

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Information Management
ArticleID=954800
ArticleTitle=Анализ больших наборов данных с помощью Hive
publish-date=11272013