Содержание


Анализ больших наборов данных с помощью Hive

Comments

Индустрия больших данных научилась собирать и записывать терабайты данных, но задача состоит в формировании прогнозов и принятии решений на основе содержащейся этой информации. Вот почему так важен Apache Hive. Он проецирует структуру на данные и обращается к этим данным посредством SQL-подобных запросов для выполнения задач Map (отображение) и Reduce (сокращение) на больших наборах данных.

Для тех же целей предназначена разработанная IBM закрытая система InfoSphere BigInsights. Она основана на проекте с открытым исходным кодом Apache Hadoop, но имеет дополнительную функциональность для использования в корпоративных решениях. Она упрощает использование Hadoop и создание приложений, работающих с большими наборами данных. Аналогично Hive, она предоставляет SQL-интерфейс для Hadoop, так что пользователь может обращаться к данным в BigInsights, не изучая новый язык программирования. Кроме того, она обеспечивает высокую готовность узла BigInsights NameNode (также известного как MasterNode) для плавного и прозрачного восстановления после сбоев, тем самым уменьшая время простоя системы.

В качестве примера анализа больших данных мы выполним при помощи Hive анализ записей о соединениях (Call Data Records – CDR). Термин CDR используется в телекоммуникационной сфере и относится к любому событию, которое может быть использовано для выставления счета абоненту. Примерами событий, которые записываются в хранилища данных и используются для выставления счетов абонентам, являются, например, начало, окончание и продолжительность соединения или объем переданных через смартфон интернет-данных.

Перед началом работы

Необходимо на базовом уровне знать используемые в статье технологии и концепции. В разделе Ресурсы приведены учебные руководства, охватывающие следующие аспекты:

  • Написание базовых SQL-сценариев (DDL и DML), таких как select, create и insert.
  • Написание и компиляция Java™-программ.
  • Пакетирование байт-кода Java в JAR-файл.

Использование CDR-записей в качестве примера

Подробная запись о соединении (Call Detail Record – CDR), также известная как запись данных о соединении (Call Data Record) – это запись данных, выполняемая телефонной станцией или другой телекоммуникационной аппаратурой, документирующая подробности телефонного соединения, прошедшего через оборудование или устройство. CDR состоит из полей данных, которые описывают телекоммуникационные операции. Этими полями могут быть:

  • Номер телефона абонента.
  • Номер телефона адресата.
  • Время начала разговора.
  • Продолжительность соединения.
  • Номер телефона для выставления счета.
  • Идентификатор оборудования телефонной станции.
  • Идентификатор записи.
  • Результаты вызова (занято или сбой).
  • Маршрут, по которому вызов поступил на станцию.
  • Маршрут, по которому вызов покинул станцию.
  • Вид соединения (голос, SMS и т.д.).

В статье, наряду с CDR-записями, мы также будем использовать другой набор данных – так называемый сетевой журнал. В сетевом журнале фиксируется сетевая активность – звонок пользователя, работа в Интернете и с электронной почтой или просто переход на другую сотовую станцию. Запись сетевого журнала может состоять из следующих полей:

  • Время начала события.
  • IMSI (уникальный идентификатор сотовой сети).
  • IMEI (уникальный идентификатор мобильного телефона).
  • Вид соединения (голос, SMS и т.д.).
  • Тип соты (код типа сотовой станции, записывающей эту информацию).
  • Идентификатор соты (идентификатор сотовой станции, записывающей эту информацию).
  • Номер телефона абонента.
  • Широта (географическая координата станции).
  • Долгота (географическая координата станции).

Поставщиков телекоммуникационных услуг интересует оценка различных тенденций с целью планирования будущих модернизаций и развертываний на основании реальных данных. Например, типичный поставщик хочет знать, через какую сотовую станцию проходит большая часть соединений. Другая ценная информация – какие базовые станции перегружены в различных временных интервалах (особенно закономерности суточного трафика) и какие виды соединений выполняются чаще всего в различных направлениях. Мы прокоррелируем эти источники больших данных, имеющие, как правило, ежедневный объем порядка сотен гигабайт для сотовой сети среднего размера.

Следует отметить, что журналы соединений содержат данные о каждой операции в каждой сотовой сети. По последней информации компания Verizon предоставила Агентству национальной безопасности (NSA) беспрецедентное право получать информацию журналов соединений непосредственно с серверов Verizon. Были переданы миллионы записей журналов соединений, содержащих номера абонентов, продолжительность соединений и расположение конечных станций.

Использование Hive для анализа больших данных

Начнем с коррелирования двух наборов данных, описанных выше. Для этой цели Hive предоставляет SQL-подобную семантику соединения (join). Наиболее распространенной операцией соединения, используемой в приложениях, является внутреннее соединение, которое можно рассматривать как тип соединения по умолчанию. Внутреннее соединение объединяет значения столбцов двух таблиц (назовем их А для CDR-записей и В для записей сетевого журнала) в соответствии с предикатом join. Запрос внутреннего соединения сравнивает каждую строку таблицы А с каждой строкой таблицы B, находя все пары строк, которые удовлетворяют предикату join. Если условие предиката join выполняется, значения столбцов соответствующих записей таблиц А и В объединяются в новую запись. Внутреннее соединение можно рассматривать как декартово произведение двух таблиц, возвращающее записи, удовлетворяющие предикату join. Упрощенный запрос соединения показан ниже.

Запрос внутреннего соединения

В предикате join определяется условие on (см. запрос в листинге 1).

Листинг 1. Запрос внутреннего соединения
hive> insert overwrite table correlation            \
      partition(dt=20130101, hour=12)                  \
      select cdr.timestamp,      cdr.subscriberPhone,  \
      cdr.recipientPhone,        cdr.duration,         \
      cdr.billingPhone,          cdr.exchangeID,       \
      cdr.recordID,              cdr.callResult,       \ 
      cdr.entryRoute,            cdr.exitRoute,        \
      cdr.callType,                                    \
      net.dtstamp,               net.minute            \
     net.second,                 net.IMSI,             \
     net.IMEI,                   net.callType,         \
     net.cellType,               net.cellID,           \
     net.subscriberPhone,        net.cellLat,          \    
     net.cellLon                                       \
      from cdr join net                                \
on (cdr.subscriberPhone = net.subscriberPhone)         \
      where combine(net.dtstamp,                       \
            net.hour,                                  \ 
            net.minute,                                \
            net.second) <= cdr.timestamp            \
      and cdr.timestamp like '2013010112%              \
      and net.dtstamp like '20130101'                  \
      and net.hour = 12;

Рассмотрим запрос подробно. Часть запроса insert overwrite указывает интерпретатору Hive вставить выбранные поля в таблицу, имя которой (в данном случае correlation) следует за ключевым словом table. Ключевое слово join и оператор where строго соответствуют стандартному синтаксису SQL.

Таблицу correlation можно создать с помощью стандартной SQL-команды create table. В приведенном выше запросе ключевое слово partition определяет, в какой раздел таблицы correlation должны быть вставлены данные. Таблица может иметь один или несколько столбцов разделов и отдельный каталог данных для каждой отдельной комбинации значений в столбцах разделов. Листинг 2 является примером создания таблицы с разделами. В качестве примера мы используем таблицу correlation.

Листинг 2. Таблица correlation
hive> create table correlation(
timestamp string,     subscriberPhone string,         \
      recipientPhone string,     duration int,        \
      billingPhone string,     exchangeID string,     \
      recordID string,         callResult string,     \
      entryRoute string,     exitRoute string,        \
    callType string,         dtstamp string,          \
minute int,            second int,                    \
IMSI string,        IMEI string,                      \
callType string,        cellType string,              \
cellID string,        subscriberPhone string,         \
cellLat string,        cellLon string)                \
partitioned by (dt string, hour int)                  \
location '/mnt/user/hive/warehouse/correlation'

Мы также используем специальную определяемую пользователем функцию (UDF) Hive (см. раздел Ресурсы). UDF-функция пишется пользователем и может многократно использоваться в интерфейсе командной строки Hive (CLI). Это обобщающий термин и одновременно Java-класс. Наша UDF-функция combine() объединяет дату, часы, минуты и секунды в формат YYYYMMDDhhmmss. Код Java-класса DateCombiner UDF приведен ниже. Обратите внимание, что мы расширили Java-класс UDF. Также можно использовать Java-класс GenericUDF, который представляет собой новую реализацию семантики UDF. Он обеспечивает более высокую производительность за счет использования отложенных вычислений и укороченных логических операторов. Он также поддерживает непримитивные параметры и переменное число аргументов.

Java-класс UDFпроще в использовании, чем GenericUDF, и обеспечивает приемлемый уровень производительности. Он использует Reflection API, поэтому работает немного медленнее, чем GenericUDF. Он не принимает и не возвращает непримитивные параметры, такие как массивы, структуры или карты, а также не поддерживает переменное число аргументов. Java-класс GenericUDF можно использовать при работе со сложными типами данных, но в большинстве случаев достаточно Java-класса UDF.

При написании UDF-функции прежде всего определяется пакет Java. Затем в исходный код импортируется Java-класс UDF, чтобы компилятор Java знал, какой родительский Java-класс отвечает за методы, используемые наследующим классом. Декларация import является элементом исходного кода времени компиляции и не используется во время выполнения. Причина в том, что байт-код JVM всегда использует полные имена классов.

Затем объявляется класс, расширяющий родительский Java-класс UDF. Определяется метод evaluate(), содержащий логику программы. Короче говоря, UDF должна обладать двумя признаками:

  1. Она должна быть подклассом Java-классов UDF или GenericUDF.
  2. Она должна реализовывать по крайней мере один метод evaluate() (см. листинг 3).
Листинг 3. Определение метода evaluate()
package HiveUDF;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class DateCombiner extends UDF (
public String evaluate(final String dt, final String hh, final String mm,
        final String ss) {
      it(dt==null) { return null; }
      String ho="", mi="", se="",
      if(hh.length() != 2) { ho = "0".concat(hh); } else { ho = hh; }
     if(mm.length() != 2) { mi = "0".concat(mm); } else { mi = mm; }
     if(ss.length() != 2) { se = "0".concat(ss); } else { se = ss; }
     return new String(dt + ho + mi + se);
  }

Есть несколько важных шагов, которые нужно сделать перед использованием UDF и выполнением приведенного выше запроса соединения. Во-первых, UDF должна быть скомпилирована и упакована в JAR-файл. Для компиляции необходимо добавить исполняемый JAR-файл Hive или включить его в classpath Java (см. листинг 4).

Листинг 4. Добавление исполняемого JAR-файла Hive
$javac -cp /apps/hive/lib/hive-exec-0.7.1-cdh3u3.jar DateCombiner.java
$ jar cf HiveUDF.jar HiveUDF/

Затем нужно загрузить JAR-файл UDF в интерфейсе командной строки Hive. Затем нужно создать временную функцию combine, которая соответствует Java-классу DateCombiner UDF (см. листинг 5).

Листинг 5. Функция combine
hive> add jar /home/verrami1/work/JARS/HiveUDF.jar;
hive> create temporary function combine as 'HiveUDF.DateCombiner';

Мы рассмотрели использование Java-класса UDF для внутреннего соединения большого набора данных (в нашем случае это два столбца базы данных CDR). Теперь перейдем к некоторым общим рекомендациям по Hive, касающимся анализа больших данных.

Оптимизация скорости и производительности заданий Hive

В этом разделе мы рассмотрим некоторые рекомендации по созданию и выполнению запросов в Hive. Хотя эти рекомендации не являются специфичными для нашего конкретного случая, они применимы и здесь в силу огромных объемов данных, обрабатываемых телекоммуникационными компаниями.

Разбиение данных на разделы

Правильно выбранные критерии создания разделов при определении таблицы способствуют увеличению скорости обработки заданий Hive. Тщательное создание разделов облегчает поиск и устранение неисправностей. Лучшим способом разделения таблиц является использование одной или нескольких метрик в зависимости от ожидаемого размера таблицы. В нашем примере анализа CDR и сетевых журналов мы ожидаем, что в систему будет записываться один гигабайт CDR-данных. Умножая на 24 часа, получаем дневной объем данных (по которым будет выполняться запрос на выборку) порядка 24 ГБ. Эти данные непрерывно поступают в базу данных и при ненадлежащем их делении на разделы может потребоваться огромный кластер для выполнения даже самых простых запросов. Листинг 6 демонстрирует правильный подход к разделению данных.

Листинг 6. Правильный подход к разделению данных
hive> create table correlation(                         \
     timestamp string,           subscriberPhone string,   \
     recipientPhone string,      duration int,             \
     billingPhone string,        exchangeID string,        \
     recordID string,            callResult string,        \
     entryRoute string,          exitRoute string,         \
     callType string,                                      \
     dtstamp string,              minute int,              \
     second int,                  IMSI string,             \
     IMEI string,                 callType int,            \
     cellType int,                cellID string,           \
     subscriberPhone string, cellLat string,               \
     cellLon string)                                       \
     partitioned by(dt string, hour int)                   \
     location '/mnt/data/correlation';

Каждая система имеет ограничение на физические ресурсы, такие как число ядер и память. При неправильном разбиении базы данных на разделы выполнение пакетного задания анализа данных может привести к внезапному сбою из-за переполнения или нехватки памяти. В результате вы получите множество ошибок, из которых нельзя будет понять, что виновником является именно переполнение памяти.

Детальное описание таблицы

В Hive есть очень хорошая функция, которая позволяет просматривать информацию о таблице, такую как столбцы, типы данных, место хранения, размер и т.д. Для просмотра этой информации используйте команду describe formatted с именем таблицы (см. листинг 7). Так как выводимые данные огромны, мы показали только поля.

Листинг 7. Команда describe formatted
hive> describe formatted correlation;
# col_name        data_type       comment
timestamp         string          None
subscriberPhone   string          None
recipientPhone    string          None
duration          string          None
..
# Partition Information
# col_name        data_type       comment
dt                string           None
hour              int              None
..
CreateTime: Mon, Jun 17 2013 PST
Location: /mnt/data/correlation

Соединение (таблица большего размера справа)

Как уже говорилось, операция соединения сначала вычисляет декартово произведение двух таблиц, а затем возвращает те строки (с объединенными столбцами), которые удовлетворяют предикату join. Процесс соединения требует большого объема вычислений и много памяти. Чтобы эффективно использовать соединение, необходимо располагать большие таблицы (в строках) с правой стороны операции join. Например, если таблица А имеет 1000000 строк, а таблица B имеет 200000 строк, запрос join должен выглядеть примерно так: hive>... B join A ..;.

Оператор join является реализацией операции соединения реляционной алгебры, хорошо описанной математиками. Если записать A join B, Hadoop-задания map и reduce нужно будет выполнить для 1000000 строк таблицы А, чтобы найти соответствующие строки в таблице B. Наоборот, если записать B join A, нужно будет перебрать 200000 строк таблицы В, чтобы найти соответствующие строки в таблице A. Это гораздо более эффективно, поскольку выполняется меньше итераций.

Операторы where (с диапазонами)

Также нужно сделать другую важную вещь – связать запросы с ограничениями, используя операторы where. Обратите внимание, что в нашем запросе join мы использовали четыре условия связывания. Оператор where предотвращает исполнение запроса "в лоб". Представляйте это как динамическое разделение таблицы, из которой извлекаются данные.

Использование стандартных функций, таких как collect_set()

Наряду с множеством полезных стандартных агрегирующих функций, таких как count(), sum(), min(), max() и т.д., Hive предоставляет функцию сбора данных collect_set(). Она возвращает набор объектов, удаляя повторяющиеся элементы. Например, чтобы найти пользователей, зарегистрированных сотовой станцией в полдень 1 января 2013 года, можно использовать запрос, приведенный в листинге 8.

Листинг 8. Функция collect_set()
hive> select cellID, collect_set(subscriberPhone) from correlation 
where dt=?20130101? and hour=12 group by cellID;

Приведение типов

Hive предоставляет функцию cast() для преобразования строки в целое число или число двойной точности и наоборот. Приведение типов является лучшим способом гарантировать, что сравнение будет выполнено так, как задумано. В листинге 9 демонстрируется использование функции cast(). Выражение и тип могут быть целым числом (integer), большим целым числом (bigint), числом с плавающей точкой (float), числом двойной точности (double) или строкой (string). В листинге 9 демонстрируется преобразование строки в целое число и числа двойной точности в строку.

Листинг 9. Функция cast()
cast(expression as <type>)
hive> cast('1' as int)
hive> cast(75.89 as string)

Заключение

В этой статье мы продемонстрировали применение Hive для анализа больших наборов данных с использованием Hadoop в качестве серверной части. Мы показали, как может выглядеть схема большого рабочего набора данных. Мы объяснили на примере, как соединить два больших набора данных и сформировать набор сопоставленных данных. Чтобы сохранить огромный объем сопоставленных данных, мы создали таблицу с разделами. Затем мы рассмотрели рекомендации по рациональному и эффективному использованию Hive.


Ресурсы для скачивания


Похожие темы


Комментарии

Войдите или зарегистрируйтесь для того чтобы оставлять комментарии или подписаться на них.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Information Management
ArticleID=954800
ArticleTitle=Анализ больших наборов данных с помощью Hive
publish-date=11272013