Содержание


Практическое применение Oozie, механизма управления потоками работ для Hadoop

Comments

Apache Oozie — это механизм планирования потоков работ для платформы Hadoop. Его архитектура, представленная на рисунке 1, упрощает координирование взаимосвязанных, повторяющихся заданий с использованием координатора Oozie, который может запускаться либо в запланированное время, либо исходя из доступности данных. Выполнять и поддерживать набор заданий Oozie Coordinator можно с помощью системы Oozie Bundle. В предлагаемом примере Oozie запускает задание Apache Sqoop, чтобы выполнить действие по импорту данных из СУБД MySQL и передать эти данные в файловую систему HDFS (Hadoop Distributed File System). Выполняется Sqoop-действие слияния с целью замены устаревшего набора данных импортированным набором данных. Действие UNIX Shell обеспечивает выборку метаданных, которые используются для выполнения Sqoop-задания в базе данных MySQL. Также выполняется Java-действие для обновления метаданных в базе данных MySQL, необходимого для Sqoop-задания.

Рисунок 1. Архитектура управления Oozie
MySQL inputs, Oozie processes, HDFS gets results
MySQL inputs, Oozie processes, HDFS gets results

Что необходимо установить

Чтобы в полной мере изучить рассматриваемые в этой статье примеры, вам понадобится следующее программное обеспечение:

В качестве кластера используется распределенный кластер с одним главным узлом имен, двумя основными узлами и восемью узлами задач.

Поток работ Oozie

Поток работ Oozie представляет собой коллекцию действий Oozie, организованную в виде ориентированного ациклического графа (directed acyclic graph, DAG) с зависимостью по управлению. Зависимость по управлению означает, что действие не запускается до тех пор, пока предыдущее действие не завершится успешно. Статья начинается с краткого обзора узлов управления потоком работ, а затем сосредоточивается главным образом на следующих узлах действий в потоке работ:

Узлы управления потоком работ

Узел управления запуском, как показано в листинге 1, — это начальная точка для задания потока работ. При запуске поток работ автоматически переводится на узел, указанный в начале.

Листинг 1. Управляющий узел запуска
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <start to="timeCheck"/>
</workflow-app>

Узел управления завершением, как показано в листинге 2, — это конечная точка для задания потока работ. Он указывает, что действия в потоке работ завершились успешно. Определение потока работ должно включать конечный узел.

Листинг 2. Управляющий узел завершения
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <end name="end"/>
</workflow-app>

Управляющий узел прерывания, как показано в листинге 3, обеспечивает остановку задания потока работ. Если при достижении узла прерывания выполняются одно или несколько действий, запущенных заданием потока работ, то все они останавливаются. Определение потока работ может не содержать узлов прерывания вообще или иметь любое количество таких узлов.

Листинг 3. Управляющий узел прерывания
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
   </kill>
</workflow-app>

Управляющий узел принятия решений, как показано в листинге 4, позволяет потоку работ принимать решение о выборе пути исполнения. Узел принятия решений работает как блок переключения, который имеет набор пар предикаты-переход, а также переход по умолчанию. Предикаты оцениваются по порядку следования, пока какой-нибудь из них не будет удовлетворен; тогда выполняется соответствующий переход. Если ни один из предикатов не удовлетворяется, выполняется переход по умолчанию.

Листинг 4. Управляющий узел принятия решений
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <decision name="master-decision">
       <switch>
         <case to="sqoopMerge1">
                 ${wf:actionData('hiveSwitch')['paramNum'] eq 1}
         </case>
         <default to="sqoopMerge2"/>
       </switch>
   </decision>
</workflow-app>

Узел ветвления разделяет один путь исполнения на множество параллельных путей. Узел соединения ожидает, пока все параллельные пути исполнения от предшествующего узла ветвления не достигнут узла соединения. Узлы ветвления и соединения используются попарно, как показано в листинге 5.

Листинг 5. Управляющий узел ветвления-соединения
<workflow-app xmlns="uri:oozie:workflow:0.2" name="ooziedemo-wf">
    <fork name="forking">
        <path start="sqoopMerge1"/>
        <path start="sqoopMerge2"/>
    </fork>
    <join name="joining" to="hiveSwitch"/>
</workflow-app>

Shell-действие Oozie

Shell-действие в потоке работ Oozie можно сконфигурировать для выполнения набора сценариев Shell в файле. Shell-действие Oozie может содержать элементы job-tracker, name-node и exec с необходимыми аргументами для выполнения задач, как показано в листинге 6. Можно сконфигурировать Shell-действие для создания или удаления файлов и каталогов в HDFS перед запуском Shell-заданий. В Shell-задание можно передать XML-файл с конфигурационными параметрами, используя элемент job-xml с конфигурационными элементами. Можно сконфигурировать дополнительные файлы или архивы, чтобы сделать их доступными для Shell-задания. Результат выполнения Shell-задания можно сделать доступным для задания workflow после завершения Shell-задания, однако необходимо выполнение следующих критериев:

  • Формат результата должен соответствовать формату файла свойств Java.
  • Размер результирующего файла не должен превышать 2 КБ.
Листинг 6. Сценарий Shell
host="XXX.XX.XX.XXX"
port="3306"
username="root"
password=""
database="zzz"
tableName="$1"

####################################
echo "Host: $host"
echo "Database: $database"
echo "Table: $tableName"
####################################

sqoopLstUpd=`mysql --host=$host --port=$port --user=$username --password=$password 
-N -e 'SELECT PARM_DATE_VAL from T_CONTROL_PARM where PARM_NM="SQOOP_INCR_LST_UPD"
 and PARM_GROUP_NM="'$tableName'"' $database`

echo "sqoopLstUpd=$sqoopLstUpd"
echo "tableName=$tableName"

В листинге 7 показано конфигурирование Shell-действия в файле workflow.xml.

Листинг 7. Shell-действие Oozie
<action name="timeCheck">
    <shell xmlns="uri:oozie:shell-action:0.1">
       <job-tracker>${jobTracker}</job-tracker>
       <name-node>${nameNode}</name-node>
       <configuration>
           <property>
               <name>mapred.job.queue.name</name>
               <value>${queueName}</value>
           </property>
       </configuration>
       <exec>${sqoopUpdTrack}</exec>
       <argument>${tableName}</argument>
       <file>${sqoopUpdTrackPath}#${sqoopUpdTrack}</file>
       <capture-output/>
    </shell>
    <ok to="sqoopIncrImport"/>
    <error to="fail"/>
</action>

Для доступа к результату Shell-действия можно использовать инкрементное задание Sqoop, показанное в листинге 8.

Листинг 8. Sqoop-действие Oozie для инкрементного импорта
<action name="sqoopIncrImport">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
           <job-tracker>${jobTracker}</job-tracker>
           <name-node>${nameNode}</name-node>
           <prepare>
               <delete path="${s3BucketLoc}/${tableName}/incr"/>
               <mkdir path="${s3BucketLoc}/${tableName}"/>
           </prepare>
           <configuration>
               <property>
                   <name>mapred.job.queue.name</name>
                   <value>${queueName}</value>
               </property>
           </configuration>
           <arg>import</arg>
           <arg>--connect</arg>
           <arg>${dbURL}</arg>
           <arg>--driver</arg>
           <arg>${mySqlDriver}</arg>
           <arg>--username</arg>
           <arg>${user}</arg>
           <arg>--table</arg>
           <arg>${wf:actionData('timeCheck')['tableName']}</arg>
           <arg>--target-dir</arg>
           <arg>${s3BucketLoc}/${tableName}/incr</arg>
           <arg>--check-column</arg>
           <arg>LAST_UPD</arg>
           <arg>--incremental</arg>
           <arg>lastmodified</arg>
           <arg>--last-value</arg>
           <arg>${wf:actionData('timeCheck')['sqoopLstUpd']}</arg>
           <arg>--m</arg>
           <arg>1</arg>
       </sqoop>
       <ok to="sqoopMetaUpdate"/>
       <error to="fail"/>
   </action>

Java-действие Oozie

Java-действие выполняет метод public static void main (String [] args)указанного основного класса Java. Java-приложения выполняются на кластере Hadoop как задания MapReduce с одной задачей mapper. Задание потока работ ожидает, пока Java-действие завершится, перед тем как переходить к следующему действию. Java-действие конфигурируется с использованием job-tracker, name-node, основного класса Java, опций JVM и входных аргументов, как показано в листинге 9. Можно использовать выражения Expression Language (EL) для назначения параметров значениям свойств. Все выходные параметры должны быть записаны в формате файла свойств Java.

Можно сконфигурировать Java-действие для очистки файлов и каталогов HDFS или для разделения Apache HCatalog перед запуском Java-приложения. Это позволит Oozie повторно попытаться выполнить Java-действие при временном или постоянном сбое.

Листинг 9. Java-действие Oozie
<action name="sqoopMetaUpdate">
          <java>
               <job-tracker>${jobTracker}</job-tracker>
               <name-node>${nameNode}</name-node>
               <configuration>
                   <property>
                      <name>mapred.job.queue.name</name>
                      <value>${queueName}</value>
                   </property>
               </configuration>
               <main-class>SqoopMetaUtil</main-class>
               <java-opts></java-opts>
               <arg>${tableName}</arg>
               <archive>${mySqlDriverPath}</archive>
          </java>
          <ok to="hiveSwitch"/>
          <error to="fail"/>
</action>

Java-действие можно сконфигурировать, используя capture-output для передачи значений следующему действию. Доступ к этим значениям можно получить с использованием EL-функции Hadoop. Можно записать значения в формате файла свойств Java в классе Java, как показано в листинге 10.

Листинг 10. Фрагмент кода Java для передачи значений
String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
       OutputStream os = null;
       if(oozieProp != null){
          File propFile = new File(oozieProp);
          Properties p = new Properties();
          p.setProperty("name", "Autodesk");
          p.setProperty("address", "Sun Rafael");
          try {
               os = new FileOutputStream(propFile);
               p.store(os, "");
          } catch (FileNotFoundException e) {
               System.err.println("<<< FileNotFoundException >>>"+e.getMessage());
          } catch (IOException e) {
               System.err.println("<<< IOException >>>"+e.getMessage());
          }
          finally{
               if(os != null)
               try {
                    os.close();
               } catch (IOException e) {
                    System.err.println("<<< IOException >>>"+e.getMessage());
               }
          }
       }
       else{
            throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES
                    + " System property not defined");
    }

Можно сконфигурировать действия в файле workflow.xml для обеспечения доступа к соответствующему набору значений в файле свойств, как показано в листинге 11.

Листинг 11. Java-действие Oozie для передачи значений
<action name="jProperties">
      <java>
           <job-tracker>${jobTracker}</job-tracker>
           <name-node>${nameNode}</name-node>
           <configuration>
               <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
               </property>
           </configuration>
           <main-class>PropertyExplorer</main-class>
           <java-opts></java-opts>
           <capture-output/>
      </java>
      <ok to="email"/>
      <error to="fail"/>
   </action>

   <action name="email">
         <email xmlns="uri:oozie:email-action:0.1">
            <to>surajit.paul@autodesk.com</to>
            <subject>Oozie workflow finished successfully!</subject>
            <body>${wf:actionData('jProperties')['name']} | 
            ${wf:actionData('jProperties')['address']}</body>
         </email>
         <ok to="end"/>
         <error to="fail"/>
   </action>

Sqoop-действие Oozie

Поток работ Oozie активизирует сценарий Sqoop, который запускает Sqoop-задание на кластере Hadoop. Sqoop-задание выполняет задачи, запуская задания MapReduce на кластере Hadoop. Задание MapReduce, запускаемое сценарием Sqoop, передает данные из реляционной СУБД в HDFS. Можно сконфигурировать Sqoop-действие, как показано в листинге 12, для удаления файлов и каталогов в HDFS перед запуском Sqoop-задания. Так же как и для других действий Oozie, можно сконфигурировать Sqoop-действие с дополнительными свойствами, используя элемент job-xml. Значения свойств, указанные в элементе configuration, заменяют свойства, указанные в элементе job-xml. Sqoop-заданию могут предоставляться дополнительные файлы и архивы.

Листинг 12. Sqoop-действие Oozie для выполнения слияния
<action name="sqoopMerge1">
         <sqoop xmlns="uri:oozie:sqoop-action:0.2">
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
             <prepare>
                 <delete path="${s3BucketLoc}/${tableName}/master1"/>
                 <mkdir path="${s3BucketLoc}/${tableName}"/>
             </prepare>
             <configuration>
                 <property>
                     <name>mapred.job.queue.name</name>
                     <value>${queueName}</value>
                 </property>
             </configuration>
             <arg>merge</arg>
             <arg>--new-data</arg>
             <arg>${s3incr}</arg>
             <arg>--onto</arg>
             <arg>${s3BucketLoc}/${tableName}/master2</arg>
             <arg>--target-dir</arg>
             <arg>${s3BucketLoc}/${tableName}/master1</arg>
             <arg>--jar-file</arg>
             <arg>${tableJarLoc}/${tableName}.jar</arg>
             <arg>--class-name</arg>
             <arg>${tableName}</arg>
             <arg>--merge-key</arg>
             <arg>ROW_ID</arg>
         </sqoop>
         <ok to="hive-master1"/>
         <error to="fail"/>
   </action>

Hive-действие Oozie

Как показано в листинге 13, можно сконфигурировать Hive-действие для исполнения любого сценария Hive с файлами и каталогами в HDFS. Действие запускает задание MapReduce для выполнения этих задач. Hive-действие конфигурируется в Oozie с использованием конфигурационного файла Hive hive-default.xml или hive-site.xml как элемента job-xml. Для этого требуется обеспечить доступ Oozie к среде Hive. Можно сконфигурировать Hive-действие для создания или удаления файлов и каталогов в HDFS перед запуском Hive-задания. Значения свойств, указанные в элементе configuration, заменяют значения, указанные в файле job-xml. Можно добавить дополнительные файлы или архивы, чтобы сделать их доступными для Hive-задания. Oozie исполняет сценарий Hive, указанный с использованием пути в элементе сценария. Можно назначать параметры сценарию Hive как входные параметры через поток работ Oozie.

Листинг 13. Hive-действие Oozie
<action name="hiveSwitch">
     <shell xmlns="uri:oozie:shell-action:0.1">
         <job-tracker>${jobTracker}</job-tracker>
         <name-node>${nameNode}</name-node>
          <configuration>
          <property>
              <name>mapred.job.queue.name</name>
              <value>${queueName}</value>
          </property>
          </configuration>
          <exec>${hiveSwitchScript}</exec>
          <argument>${tableName}</argument>
          <file>${hiveSwitchScriptPath}#${hiveSwitchScript}</file>
       <capture-output/>
       </shell>
       <ok to="master-decision"/>
       <error to="fail"/>
   </action>

Email-действие Oozie

Email-действие Oozie, как показано в листинге 14, обеспечивает возможность отправки электронного письма из потока работ. Email-действие должно содержать адреса «Кому» и (необязательно) «Копия», а также тему и текст сообщения. Можно отправить электронное письмо множеству получателей, указав электронные адреса через запятую. Email-действие выполняется синхронно, и задания потока работ ожидают, пока электронное письмо не будет отправлено, прежде чем запустить следующее действие. Для передачи параметров Email-действию можно использовать EL-выражения Hadoop.

Листинг 14. Email-действие Oozie
<action name="email">
  	<email xmlns="uri:oozie:email-action:0.1">
            <to>surajit.paul@autodesk.com</to>
            <subject>Oozie workflow finished successfully!</subject>
            <body>${wf:actionData('jProperties')['name']} | 
            ${wf:actionData('jProperties')['address']}</body>
        </email>
        <ok to="end"/>
        <error to="fail"/>
   </action>

Заключение

Поток работ Oozie превращается в конвейер обработки данных, объединяющий множество взаимозависимых заданий, через которые проходит поток данных. Технология управления потоками работ Apache Oozie упрощает проектирование логического потока данных, обработки ошибок, механизма восстановления и т. д. Для повышения эффективности управления потоками работ можно соответствующим образом сконфигурировать задание Oozie Coordinator или входящие в комплект приложения, однако обсуждение этих вопросов выходит за пределы данной статьи. Есть и другие эквивалентные механизмы управления потоками работ для Hadoop, такие как Amazon Data Pipeline, Simple Workflow Engine, Azkaban, Cascading и Hamake. Конфигурации Hamake и Oozie задаются в XML, Azkaban конфигурируется с использованием текстового файла, содержащего пары ключ-значение, а Cascading конфигурируется с использованием Java API.


Ресурсы для скачивания


Похожие темы


Комментарии

Войдите или зарегистрируйтесь для того чтобы оставлять комментарии или подписаться на них.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Information Management
ArticleID=980934
ArticleTitle=Практическое применение Oozie, механизма управления потоками работ для Hadoop
publish-date=08182014