This post describes the problem of small ORC and Parquet files in HDFS and how it affects Big SQL read performance. It explores possible solutions using existing tools to compact small files in larger ones with the goal of improving read performance.
Multiple Hadoop small files -defined as the ones significantly smaller than the HDFS block size (64 MB by default)- are well-known as a big problem in Hadoop Distributed File System (HDFS). HDFS is meant for storing big volume of data, ideally in the form of large files. Storing a substantial number of small sized files in HDFS instead of fewer larger ones, adds additional overhead to the NameNode in managing a much broad directory tree of files. Also, MapReduce and other jobs reading HDFS files will be negatively impacted since it would involve more communication with HDFS to get the files information.
The small files read performance issue is more acute for storage formats where additional metadata are embedded into the file to describe the complex content stored. Two common file storage formats used by IBM Db2 Big SQL are ORC and Parquet. These file formats store data in columnar format to optimize reading and filtering subset of columns. ORC and Parquet formats encode information about the columns and row groups into the file itself. As a result, the metadata need to be processed before the data in the file can be decompressed, deserialized and read. Due to this overhead, processing multiple small size files in these formats -which are logically tied together, such as files belonging to a Big SQL table or partition- poses significant cost and it degrades read performance in IBM Db2 Big SQL.
Proposed solution: Compaction
A good practice to avoid small files at storage level is to run compaction on the directories containing many small files that logically belong together. In Big SQL, files that are part of the same table are typically stored in the same directory. Inspect Files tool, presented in the article Inspect Files tooling for IBM Db2 Big SQL helps to identify problematic small files in HDSF and provides recommendations for files compaction. The merge of these files into larger ones will contribute to improve the Big SQL reads performance by minimizing the metadata to be processed and aligning file sizes to HDFS blocks more efficiently.
ORC and Parquet offer their own different tooling to handle file merging or compaction:
- ORC concatenate action using HIVE DDL
- Merge command using Parquet tools
Compacting ORC files
Using Hive Data Definition Language (Hive DDL), users can request an efficient merge of small ORC files into larger ones by issuing a concatenate action on their table or partition. The command looks like:
ALTER TABLE table_name [PARTITION (partition_key = 'partition_value')] CONCATENATE;
Hive will merge the ORC files at stripe level avoiding the overhead of compressing/decompressing and coding/decoding the data.
Compacting Parquet files
As part of the Apache Parquet project there is a set of java based command line tools called parquet-tools. The latest parquet-tools version includes a merge command to logically append small parquet files to larger ones. The command concatenates the parquet blocks in binary without serialization/deserialization, merges footers and modifies the path and the offset metadata.
To run the parquet-tools merge command in HDFS:
hadoop jar parquet-tools-1.9.0.jar merge <input> <output>
where, input is the source parquet files or directory and output is the destination parquet file merging the original content. This merge command does not remove or overwrite the original files. So, it requires a manual exercise of creating a temporary directory and replacing the original small files by the compacted ones to make it known to Big SQL or Apache Hive.
Alternatively, and regardless the files storage format, a solution to consider is to recreate the tables and compact them by performing and INSERT…SELECT.
Compact small files using INSERT…SELECT
Compacting the inefficiently split data by creating a new table as a copy of the original one is straight forward using INSERT… SELECT syntax. This process will reorganize the data into a relatively small number of larger files based on degree of parallelism for the insert.
This is an example of how to create a new table and then insert the data from the old one within Big SQL:
CREATE TABLE new_table LIKE old_table; INSERT INTO new_table select * from old_table;
This solution also allows you to combine files from a single partition by copying the data partition into a new table, dropping the original partition and inserting the new compacted one.
Finally, if you need to keep the small files for record, the recommendation is to archive them using Hadoop Archive Resources (har) and save your NameNode from the cost of managing a large number of resources and objects.
Internal tests show that the compaction of ORC and Parquet small files helps to improve the Big SQL read performance significantly.
The scenario tested for ORC and Parquet formats involves:
- 1 million rows table stored in two ways:
- 30 non-optimal small files in HDFS with different sizes.
- 2 compacted large files in HDFS result of the small files merged using parquet tools or Hive DDL depending of the file format.
Run a query computing the addition of a numerical column to force the stress of the whole table by visiting all the column content:
SELECT SUM(column_name) FROM table_name;
The results show:
- 2x time to run the query on non-compacted vs compacted table in ORC format.
- 1.6x time to run the query on non-compacted vs compacted table in parquet format.
This is the output of the compaction test for ORC file format, where SLS_SALES_FACT_ORC is the non-compacted table and SLS_SALES_FACT_COMPACTED_ORC is the table result of running the CONCATENATE command on the non-compacted table:
[host][bigsql] 1> SELECT COUNT(*) FROM GOSALESDW.SLS_SALES_FACT_ORC; +---------+ | 1 | +---------+ | 1000000 | +---------+ 1 row in results(first row: 0.393s; total: 0.395s) [host][bigsql] 1> SELECT SUM(GROSS_PROFIT) FROM GOSALESDW.SLS_SALES_FACT_ORC; WARN [State: 01003][Code: 0]: Statement processing was successful.. SQLCODE=0, SQLSTATE=01003, DRIVER=3.72.24 +------------------+ | 1 | +------------------+ | 4615167406.55999 | +------------------+ 1 row in results(total: 7.681s) [bigsql@host root]$ hdfs dfs -ls /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc Found 30 items drwxrwxrwx - bigsql hadoop 0 2017-12-14 17:54 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/._biginsights_stats -rw-r--r-- 3 bigsql hadoop 443027 2017-12-14 17:53 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_-307213540_201712140553882_1.1 -rw-r--r-- 3 bigsql hadoop 179411 2017-12-14 17:53 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_-307213540_201712140553882_1.2 -rw-r--r-- 3 bigsql hadoop 897796 2017-12-14 17:49 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_-412191103_201712140549330_1.1 -rw-r--r-- 3 bigsql hadoop 1176528 2017-12-14 17:49 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_-412191103_201712140549330_1.2 -rw-r--r-- 3 bigsql hadoop 3356 2017-12-14 17:59 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_-710448882_201712140559369_1.1 ... -rw-r--r-- 3 bigsql hadoop 52854 2017-12-14 18:09 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_-451786036_201712140609603_2.2 -rw-r--r-- 3 bigsql hadoop 435203 2017-12-14 18:09 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_215889957_201712140609092_1.1 -rw-r--r-- 3 bigsql hadoop 1218285 2017-12-14 18:09 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_215889957_201712140609092_1.2 -rw-r--r-- 3 bigsql hadoop 2112684 2017-12-14 17:51 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_orc/i_1512166475533_866891040_201712140551232_2.2 [host][bigsql] 1> SELECT COUNT(*) FROM GOSALESDW.SLS_SALES_FACT_COMPACTED_ORC; +---------+ | 1 | +---------+ | 1000000 | +---------+ 1 row in results(first row: 0.379s; total: 0.381s) [host][bigsql] 1> SELECT SUM(GROSS_PROFIT) FROM GOSALESDW.SLS_SALES_FACT_COMPACTED_ORC; WARN [State: 01003][Code: 0]: Statement processing was successful.. SQLCODE=0, SQLSTATE=01003, DRIVER=3.72.24 +------------------+ | 1 | +------------------+ | 4615167406.55999 | +------------------+ 1 row in results(total: 3.673 s) [bigsql@host root]$ hdfs dfs -ls /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_compacted_orc Found 3 items drwxrwxrwx - bigsql hadoop 0 2017-12-14 18:14 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_compacted_orc/._biginsights_stats -rwxrwxrwx 3 bigsql hadoop 19602272 2017-12-15 12:19 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_compacted_orc/000000_0 -rwxrwxrwx 3 bigsql hadoop 3720403 2017-12-15 12:19 /apps/hive/warehouse/gosalesdw.db/sls_sales_fact_compacted_orc/000001_0
Note: these tests measure the performance of the compacted tables using a single data set. We encourage you to test your own benchmarks before running the files compaction and study the performance benefits as consequence of the operation.
Notes to solution
Modifications of files at storage level as proposed above are recommended to run offline. The real issue is how to write or remove files in such a way that it does not impact current running queries that are accessing the old files. Doing the compaction of files itself is not complicated, however the logistics to not impact running jobs using the files can become problematic.
Additionally, tables information is stored in Big SQL, as well as in the Hive Metastore, which contains details about the actual files associated with the tables. At the minimum, the Hive Metastore needs to be updated to reflect the new files when using the Parquet tool to compact files. Big SQL has logic to pick up changes in the Hive Metastore and propagate them into its own catalog.
Special acknowledgments to Tuong Truong for his work reviewing this post.