IBM Support

Hadoop is not relational or How to perform range grouping in Hive with performance

Preventive Service Planning


Abstract

This article details an approach to range grouping within Hive using a simple Hive java UDF to be able to use equality joins instead of a where clause; without equality joins, Hive will perform the equivalent of a Cartaesian join which may result in a huge amount of disk space utilisation and poor performance to the extent that queries may not complete.

Content

Given the following sample set of data (a subset of the 96 rows used in this article):


    start_content_end;col2;col3;col4;col5
    START;ROW_1_col2;ROW_1_col3;ROW_1_start_data;
    MIDDLE;ROW_2_col2;ROW_2_col3;;
    END;ROW_3_col2;ROW_3_col3;;ROW_3_end_data
    START;ROW_4_col2;ROW_4_col3;ROW_4_start_data;
    MIDDLE;ROW_5_col2;ROW_5_col3;;
    MIDDLE;ROW_6_col2;ROW_6_col3;;
    END;ROW_7_col2;ROW_7_col3;;ROW_7_end_data;
    START;ROW_8_col2;ROW_8_col3;ROW_8_start_data;
    MIDDLE;ROW_9_col2;ROW_9_col3;;
    MIDDLE;ROW_10_col2;ROW_10_col3;;
    MIDDLE;ROW_11_col2;ROW_11_col3;;
    END;ROW_12_col2;ROW_12_col3;;ROW_12_end_data

what is required is to produce the following, adding a GROUP identifier for each "START;MIDDLE{N};END" group and also placing the col4 from the "START" row into the group's col4 and placing col5 from the "END" row into the group's col5:

    group_unique_id;unique_id;start_content_end;col2;col3;col4;col5
    1;1;START;ROW_1_col2;ROW_1_col3;ROW_1_start_data;ROW_3_end_data
    1;2;MIDDLE;ROW_2_col2;ROW_2_col3;ROW_1_start_data;ROW_3_end_data
    1;3;END;ROW_3_col2;ROW_3_col3;ROW_1_start_data;ROW_3_end_data
    2;4;START;ROW_4_col2;ROW_4_col3;ROW_4_start_data;ROW_7_end_data
    2;5;MIDDLE;ROW_5_col2;ROW_5_col3;ROW_4_start_data;ROW_7_end_data
    2;6;MIDDLE;ROW_6_col2;ROW_6_col3;ROW_4_start_data;ROW_7_end_data
    2;7;END;ROW_7_col2;ROW_7_col3;ROW_4_start_data;ROW_7_end_data
    3;8;START;ROW_8_col2;ROW_8_col3;ROW_8_start_data;ROW_12_end_data
    3;9;MIDDLE;ROW_9_col2;ROW_9_col3;ROW_8_start_data;ROW_12_end_data
    3;10;MIDDLE;ROW_10_col2;ROW_10_col3;ROW_8_start_data;ROW_12_end_data
    3;11;MIDDLE;ROW_11_col2;ROW_11_col3;ROW_8_start_data;ROW_12_end_data
    3;12;END;ROW_12_col2;ROW_12_col3;ROW_8_start_data;ROW_12_end_data

The following simple data generation script produces an arbitrary number (in this case, 96) rows consisting of "START", "END" groups each with 1 "START", 2, 3 or 4 "MIDDLE" and 1 "END" rows:

    $ cat create_data.sh
    let i=1
    while [[ i -lt 96 ]]
    do
    # Create a group of 3 entries
       echo "START;ROW_"$i"_col2;ROW_"$i"_col3;ROW_"$i"_start_data;"
       let i="$i"+1
       echo "MIDDLE;ROW_"$i"_col2;ROW_"$i"_col3;;"
       let i="$i"+1
       echo "END;ROW_"$i"_col2;ROW_"$i"_col3;;ROW_"$i"_end_data"
       let i="$i"+1
    # Create a group of 4 entries
       echo "START;ROW_"$i"_col2;ROW_"$i"_col3;ROW_"$i"_start_data;"
       let i="$i"+1
       echo "MIDDLE;ROW_"$i"_col2;ROW_"$i"_col3;;"
       let i="$i"+1
       echo "MIDDLE;ROW_"$i"_col2;ROW_"$i"_col3;;"
       let i="$i"+1
       echo "END;ROW_"$i"_col2;ROW_"$i"_col3;;ROW_"$i"_end_data;"
       let i="$i"+1
    # Create a group of 5 entries
       echo "START;ROW_"$i"_col2;ROW_"$i"_col3;ROW_"$i"_start_data;"
       let i="$i"+1
       echo "MIDDLE;ROW_"$i"_col2;ROW_"$i"_col3;;"
       let i="$i"+1
       echo "MIDDLE;ROW_"$i"_col2;ROW_"$i"_col3;;"
       let i="$i"+1
       echo "MIDDLE;ROW_"$i"_col2;ROW_"$i"_col3;;"
       let i="$i"+1
       echo "END;ROW_"$i"_col2;ROW_"$i"_col3;;ROW_"$i"_end_data"
       let i="$i"+1
    done


The Relational Approach


From a relational approach, this could be solved by:


    Take source table T1 and add unique_id, a sequential key, and produce T2
    Take T2 and extract unique_id, start_content_end columns for rows containing "START" or "END" along with col4 from the "START" row and col5 from the "END" row, and increment group_unique_key when a "START" row is encountered and produce T3
    Join T2 and T3 (alias t3_start) and T3 (alias t3_end) using
    where t2.unique_id between t3_start.unique_id and t3_end.unique_id
    and t3_start.start_content_end = "START" and t3_end.start_content_end = "END"
    and t3_start.group_unique_id = t3_end.group_unique_id
    and produce T4 with group_unique_id;unique_id;start_content_end;col2;col3;col4; col5

Okay, let's take this approach

    1. Create the "group_unique_key" generator java UDF and build script:

      $ cat AutoIncrementUDF.java
      import org.apache.hadoop.hive.ql.exec.UDF;
      import org.apache.hadoop.hive.ql.udf.UDFType;
      import org.apache.hadoop.io.Text;
      @UDFType(stateful = true)
      public class AutoIncrementUDF extends UDF {
         int ctr;
         String check_start="START";
         public int evaluate(String input) {
             if (input.equals(check_start))
                 ctr++;
             return ctr;
             // return input;
         }
      }
      $ cat build.sh
      CLASSPATH=$CLASSPATH:/opt/ibm/biginsights/hive/lib/hive-exec-0.12.0.jar
      javac AutoIncrementUDF.java
      jar -cvf  AutoIncrementUDF.jar AutoIncrementUDF.class


    2. Create the relational approach sql, relational.sql:

      $ cat relational.sql
      CREATE DATABASE IF NOT EXISTS jj_hive_relational;
      USE jj_hive_relational;
      drop table t1;
      drop table t2;
      drop table t3;
      drop table t4;
      -- Add a simple UDFRowSequence temporary function for the unique_id
      ADD JAR /opt/ibm/biginsights/hive/lib/hive-contrib-0.12.0.jar;
      CREATE TEMPORARY FUNCTION row_sequence AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
      -- Add a simple AutoIncrementUDF temporary function for the group_unique_id
      ADD JAR AutoIncrementUDF.jar;
      CREATE TEMPORARY FUNCTION group_increment as 'AutoIncrementUDF';
      CREATE EXTERNAL TABLE t1 (start_content_end string, col2 string, col3 string, col4 string, col5 string)
      ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;' LOCATION '/home/biadmin/range_test';
      CREATE TABLE t2 AS SELECT row_sequence() unique_id, * FROM t1;
      CREATE TABLE t3 AS SELECT group_increment(start_content_end) group_unique_id, unique_id, start_content_end, col2, col3, col4, col5 FROM t2 where start_content_end = "START" or start_content_end = "END";
      CREATE TABLE t4 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;' AS SELECT t3_start.group_unique_id, t2.unique_id, t2.start_content_end, t2.col2, t2.col3, t3_start.col4, t3_end.col5
      FROM t2 JOIN t3 t3_start JOIN t3 t3_end
      WHERE t2.unique_id between t3_start.unique_id and t3_end.unique_id
      and t3_start.group_unique_id = t3_end.group_unique_id
      and t3_start.start_content_end = "START" and t3_end.start_content_end = "END";

    3. Create a wrapper script to make it easy to run, and re-run:

      $ cat run_repro_relational.sh
      ./create_data.sh > test_data.csv
      hadoop fs -mkdir /home/biadmin/range_test
      hadoop fs -rm /home/biadmin/range_test/test_data.csv
      hadoop fs -copyFromLocal test_data.csv /home/biadmin/range_test/
      ./build.sh
      $HIVE_HOME/bin/hive -f relational.sql
      rm test_data_out.csv
      hadoop fs -copyToLocal /biginsights/hive/warehouse/jj_hive_relational.db/t4/000000_0 test_data_out.csv

    4. Run the wrapper (./run_repro_relational.sh) and review the results from test_data_out.csv:

      1;1;START;ROW_1_col2;ROW_1_col3;ROW_1_start_data;ROW_3_end_data
      1;2;MIDDLE;ROW_2_col2;ROW_2_col3;ROW_1_start_data;ROW_3_end_data
      1;3;END;ROW_3_col2;ROW_3_col3;ROW_1_start_data;ROW_3_end_data
      2;4;START;ROW_4_col2;ROW_4_col3;ROW_4_start_data;ROW_7_end_data
      2;5;MIDDLE;ROW_5_col2;ROW_5_col3;ROW_4_start_data;ROW_7_end_data
      2;6;MIDDLE;ROW_6_col2;ROW_6_col3;ROW_4_start_data;ROW_7_end_data
      2;7;END;ROW_7_col2;ROW_7_col3;ROW_4_start_data;ROW_7_end_data
      3;8;START;ROW_8_col2;ROW_8_col3;ROW_8_start_data;ROW_12_end_data
      3;9;MIDDLE;ROW_9_col2;ROW_9_col3;ROW_8_start_data;ROW_12_end_data
      3;10;MIDDLE;ROW_10_col2;ROW_10_col3;ROW_8_start_data;ROW_12_end_data
      3;11;MIDDLE;ROW_11_col2;ROW_11_col3;ROW_8_start_data;ROW_12_end_data
      3;12;END;ROW_12_col2;ROW_12_col3;ROW_8_start_data;ROW_12_end_data

Success! The required output has been generated in the format required!


However, a review of the map reduce task (attempt_0) reveals the following:


    INFO org.apache.hadoop.hive.ql.exec.TableScanOperator: 6 forwarded 96 rows
    ...
    INFO org.apache.hadoop.hive.ql.exec.MapJoinOperator: 4 forwarded 2304 rows
    ...
    INFO org.apache.hadoop.hive.ql.exec.MapJoinOperator: 3 forwarded 55296 rows
    ...
    INFO org.apache.hadoop.hive.ql.exec.FilterOperator: 2 forwarded 96 rows
    INFO org.apache.hadoop.hive.ql.exec.FilterOperator: PASSED:96
    INFO org.apache.hadoop.hive.ql.exec.FilterOperator: FILTERED:55200
    INFO org.apache.hadoop.hive.ql.exec.FileSinkOperator: TABLE_ID_1_ROWCOUNT:96

Why did we end up with an intermediate table of 55,200 rows?

    Starting with the join on T2 (96 rows) and T3 (24 "START" rows) we have 96 * 24 = 2,304 rows
    Joining this result set with T3 (24 "END" rows") we have 2,304 * 24 = 55,296
    Then 55,200 rows are filtered out, leaving the remaining 96 rows

This is, in effect, a Cartesian join which is completely un-scaleable.
Consider 1,000,000 rows with 100,000 "START" and 100,000 "END" records ...

    1,000,000 * 100,000 * 100,000 = 10,000,000,000,000,000!


The Hadoop Approach

What is needed are equality JOINs (there are no RANGE joins like "BETWEEN").
So, consider the following, similar approach:


    Take source table T1 and add group_unique_id and unique_id to produce T2
    Take T2 and extract group_unique_id, unique_id, start_content_end columns for rows containing "START" or "END" along with col4 from the "START" row and col5 from the "END" row and produce T3
    Join T2 and T3 (alias t3_start) and T3 (alias t3_end) using
    from t2 JOIN t3 t3_start on (t2.group_unique_id = t3_start.group_unique_id) JOIN t3 t3_end on (t2.group_unique_id = t3_start.group_unique_id)
    where t3_start.start_content_end = "START" and t3_end.start_content_end = "END"
    and produce t4 with group_unique_id;unique_id;start_content_end;col2;col3;col4;col5

Okay, let's take this approach

    1. Create the "group_unique_key" generator java UDF and build script (as above).
    2. Create the hadoop approach sql, hadoop.sql:

      $ cat hadoop.sql
      CREATE DATABASE IF NOT EXISTS jj_hive_hadoop;
      USE jj_hive_hadoop;
      drop table t1;
      drop table t2;
      drop table t3;
      drop table t4;
      -- Add a simple UDFRowSequence temporary function for the unique_id
      ADD JAR /opt/ibm/biginsights/hive/lib/hive-contrib-0.12.0.jar;
      CREATE TEMPORARY FUNCTION row_sequence AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
      -- Add a simple AutoIncrementUDF temporary function for the group_unique_id
      ADD JAR AutoIncrementUDF.jar;
      CREATE TEMPORARY FUNCTION group_increment as 'AutoIncrementUDF';
      CREATE EXTERNAL TABLE t1 (start_content_end string, col2 string, col3 string, col4 string, col5 string)
      ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;' LOCATION '/home/biadmin/range_test';
      CREATE TABLE t2 AS SELECT group_increment(start_content_end) group_unique_id, row_sequence() unique_id, * FROM t1;
      CREATE TABLE t3 AS SELECT group_unique_id, unique_id, start_content_end, col2, col3, col4, col5 FROM t2 where start_content_end = "START" or start_content_end = "END";
      CREATE TABLE t4 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\;' AS SELECT t3_start.group_unique_id, t2.unique_id, t2.start_content_end, t2.col2, t2.col3, t3_start.col4, t3_end.col5
      from t2 JOIN t3 t3_start on (t2.group_unique_id = t3_start.group_unique_id) JOIN t3 t3_end ON (t2.group_unique_id = t3_end.group_unique_id)
      WHERE t3_start.start_content_end = "START" and t3_end.start_content_end = "END";

    3. Create a wrapper script to make it easy to run, and re-run:

      $ cat run_repro_hadoop.sh
      ./create_data.sh > test_data.csv
      hadoop fs -mkdir /home/biadmin/range_test
      hadoop fs -rm /home/biadmin/range_test/test_data.csv
      hadoop fs -copyFromLocal test_data.csv /home/biadmin/range_test/
      ./build.sh
      $HIVE_HOME/bin/hive -f hadoop.sql
      rm test_data_out.csv
      hadoop fs -copyToLocal /biginsights/hive/warehouse/jj_hive_hadoop.db/t4/000000_0 test_data_out.csv

    4. Run the wrapper (./run_repro_hadoop.sh) and review the results from test_data_out.csv:

      1;1;START;ROW_1_col2;ROW_1_col3;ROW_1_start_data;ROW_3_end_data
      1;2;MIDDLE;ROW_2_col2;ROW_2_col3;ROW_1_start_data;ROW_3_end_data
      1;3;END;ROW_3_col2;ROW_3_col3;ROW_1_start_data;ROW_3_end_data
      2;4;START;ROW_4_col2;ROW_4_col3;ROW_4_start_data;ROW_7_end_data
      2;5;MIDDLE;ROW_5_col2;ROW_5_col3;ROW_4_start_data;ROW_7_end_data
      2;6;MIDDLE;ROW_6_col2;ROW_6_col3;ROW_4_start_data;ROW_7_end_data
      2;7;END;ROW_7_col2;ROW_7_col3;ROW_4_start_data;ROW_7_end_data
      3;8;START;ROW_8_col2;ROW_8_col3;ROW_8_start_data;ROW_12_end_data
      3;9;MIDDLE;ROW_9_col2;ROW_9_col3;ROW_8_start_data;ROW_12_end_data
      3;10;MIDDLE;ROW_10_col2;ROW_10_col3;ROW_8_start_data;ROW_12_end_data
      3;11;MIDDLE;ROW_11_col2;ROW_11_col3;ROW_8_start_data;ROW_12_end_data
      3;12;END;ROW_12_col2;ROW_12_col3;ROW_8_start_data;ROW_12_end_data


    Success! The required output has been generated in the format required!


    Reviewing the job output, we see the following:


      INFO org.apache.hadoop.hive.ql.exec.MapOperator: 6 forwarded 96 rows
      ...
      INFO org.apache.hadoop.hive.ql.exec.TableScanOperator: 3 forwarded 96 rows
      ...
      INFO org.apache.hadoop.hive.ql.exec.MapJoinOperator: 2 forwarded 96 rows
      ...
      INFO org.apache.hadoop.hive.ql.exec.FileSinkOperator: TABLE_ID_1_ROWCOUNT:96


    Because of the equality JOINS only 96 rows are forwarded from each JOIN!

    Conclusion

    Without equality JOINS there is a significant potential for intermediate result sets which consume a large amount of storage and require a lot of scanning - this approach should be avoided as far as practical. This article also demonstrates a simple approach to data generation, a basic Hive UDF creation and usage and a repeatable approach to testing by using a basic wrapper script. There are may approaches to the provided problem, and this is just an approach

[{"Product":{"code":"SSCRJT","label":"IBM Db2 Big SQL"},"Business Unit":{"code":"BU059","label":"IBM Software w\/o TPS"},"Component":"Open Source Tools","Platform":[{"code":"PF016","label":"Linux"}],"Version":"3.0.0.2;4.0.0","Edition":"","Line of Business":{"code":"LOB10","label":"Data and AI"}}]

Document Information

Modified date:
18 July 2020

UID

swg21963475