How to Layout Big Data in IBM Cloud Object Storage for Spark SQL
When you have vast quantities of rectangular data, the way you lay it out in object storage systems like IBM Cloud Object Storage (COS) makes a big difference to both the cost and performance of SQL queries; however, this task is not as simple as it sounds. Here we survey some tricks of the trade.
A rectangular dataset can be conceptually thought of as a table with rows and columns, where each column is assigned a certain data type (integer, string etc.). This blog is relevant for anyone using Apache Spark SQL(Spark SQL) on data in COS. In particular, it is relevant to users of:
Use coalelsce and repartition commands to avoid small objects
Never compress large CSV or JSON objects with Gzip
Use Hive style partitioning
Month, date or hour is usually a good top level partitioning column
Use additional partitioning column(s) if they are queried frequently
Avoid using partitioning columns with cardinalities of 1000 or more
The GridPocket Use Case
GridPocket is a smart grid company developing energy management applications and cloud services for electricity, water, and gas utilities. We used their open source meter_gendata generator to generate meter readings, accompanied by timestamps, geospatial locations and additional information, as CSV data and stored that in COS in a bucket called metergen. Then, we used SQL query to write queries against this dataset.
An example row in JSON format is shown here:
Scroll to view full table
Detailed schema information is shown here:
To this schema, we added a derived column called “dt” to the meter_gen data that contains only the date portion of the timestamp. This means for the row above, we added a column "dt":"2017-09-16".
Object Storage Background
Objects are similar to files, but there are important differences. Like files, objects have both data and metadata, although object metadata is much richer. Unlike files, object data is written once and never modified; you cannot append or update data to an object although you can overwrite it completely. Note, however, that even though update in place is not possible, one can still read an object starting from a certain offset and finishing at another offset within the object. There is no rename operation for objects, so renaming objects can only be done by rewriting the entire object with a new name and deleting the old one. Objects are accessed via a RESTful API with commands to PUT, GET, POST and DELETE objects. Unlike filesystems, object storage has a flat namespace, although directory hierarchies can be simulated by using some delimiter such as forward slashes ‘/’ in object names. Note that changing the data layout will typically require rewriting the dataset, so it is important to get it right the first time.
Dataset and Object Sizes
We refer to data in the 1 TB range or more to be Big Data, since at this scale the data layout can have a significant effect on performance. Typically, we expect such a dataset to comprise multiple objects in the same COS bucket, where each object contains some subset of the rows. For example, below is a partial object listing of the metergen bucket. Each object has 128 MB of data in CSV format, and all the objects in the listing belong to the same logical dataset. Each object in the listing follows a similar reference pattern; for instance, the first object comprises “metergen” as the bucket name followed by “csv/3-I-0-40000.csv” as the object name. The ‘/’ in object names allows representing virtual folder hierarchies, but the namespace is actually flat.
In general, analytics over data stored in an object storage system works best on equally sized objects where the objects are not too small. Objects should typically be larger than 10 MB, and for Hadoop/Spark analytics, 128 MB sized objects are usually closer to the sweet spot. When using object storage, there is a certain overhead for each REST request. Smaller object sizes inflate the number of REST requests required to analyze the data. If the user is charged per REST request, this can also inflate the cost to access the data. When Spark performs analytics against larger objects (> 128 MB), it can still assign different sections of the large object to different tasks. However, in many cases better performance is achieved by splitting larger objects into smaller ones since this enables more parallelism in the object store.
Aim for objects sizes of around 128 MB
When using Apache Spark to generate or reorganize data,
the maxRecordsPerFile configuration parameter can be used to limit object sizes, and is supported in Spark 2.2 and beyond.
Spark’s coalesce and repartition commands can be used to reduce the number of partitions before writing to storage, which can reduce the number of objects and increase object size. Coalesce is preferable to repartition since it doesn’t require a shuffle.
SQL Query currently supports CSV, JSON, Parquet, and ORC formats, and new formats may be added in future. Both Parquet and ORC are column based formats specialized for big data analytics and are well integrated into Spark. The Parquet integration in Spark is more mature, although ORC is catching up.
Column based formats physically store data column by column instead of row by row. This allows Spark to support column pruning, so that only columns requested by a query need to be retrieved from COS. They also support column-wise compression, which is typically more efficient than the compression of row based formats, while preserving the basic columnar structure of the data. This means that Spark tasks can access a particular compressed column without necessarily needing to decompress other columns.
Moreover, Parquet and ORC support specialized metadata that can be used to filter columns according to query predicates. These formats typically store metadata in a footer at the end of the object which contains the schema as well as column offsets and statistics. This provides another important benefit – no schema inference is required. Formats such as JSON and CSV require schema inference to deduce the schema automatically from the dataset – in general a full scan is needed prior to query processing.
In summary, Parquet can potentially provide more than an order of magnitude performance improvementover other formats such as CSV. Conversion to Parquet and ORC can provide significant benefit when the dataset will be extensively queried because of:
Decreased storage costs by using column-wise compression
Reduced query cost and better query performance by scanning less data
Schema inference is avoided
Never compress large CSV or JSON objects with Gzip. Since Spark distributes work across multiple tasks, each task ideally reads some byte range of an object. However, Gzip is not a “splittable” compression algorithm. This means that in order to read a byte range of a gzipped object, each task will need to decompress the object starting from byte 0. Therefore, in this case a single task will decompress the entire object and share the result with other tasks, which can result in out of memory conditions and requires a shuffle to repartition the result to workers.
CSV is preferable to JSON for big data because it is less verbose.
Spark SQL can be used to convert data from one format to another. For example, the commands below can be run in the Spark shell to convert a dataset from JSON to Parquet. Note: the Stocator connector is recommended.
Hive Style Partitioning
We refer to Hive style partitioning as a naming convention employed by Hive to store big datasets by mimicking virtual folder hierarchies and encoding information about object contents in object names. For example, the dataset below contains CSV objects, where the root of the hierarchy is “metergen/csv”. Here we partition according to a date column called “dt” so that each object contains rows with the same date, and the respective date is encoded in the object name. For example, all rows in object “metergen/csv/dt=2017-08-21/3-I-0-40000.csv” have the date value “2017-08-21” in the “dt”column. Using this convention, the dt column is omitted from the object contents. Spark SQL understands this convention and can filter the set of objects retrieved when the query predicates apply to partitioning columns. For our example below, if we query for data from August 22nd, the first 4 objects in the list can be skipped. This can significantly reduce the number of bytes scanned in COS. Spark can also do this for range queries.
Scroll to view full table
One can partition the data according to multiple columns, which results in multiple columns encoded into the object names, one after the other. For example, partitioning according to both "dt" and "city" could give the listing below:
Note that Hive style partitioning is orthogonal to data format – the above example could work equally well on Parquet, JSON or CSV data.
Hive style partitioning should be done on columns with discrete types such as integers (e.g. age), or strings (e.g. city). It does not apply to non-discrete types such as floating point numbers (e.g. temperature).
If data is periodically ingested into COS then it makes sense to add new partitions for each period. For example, partition by date or month columns, and set these as the top level partitioning columns in the hierarchy. This has the added benefit that ingestion failures for that period can be more easily cleaned up.
If there are frequently queried columns, then these are candidates for additional partitioning columns. Columns which are queried often and whose associated predicates provide high selectivity into the dataset are the best choice.
A good partitioning column will typically divide the dataset into hundreds of partitions. Avoid using a column with a cardinality of 1000 or more.
Some partitioning columns will introduce skew in the amount of data associated with each partition. If possible, it is preferable to use a column that partitions the dataset evenly.
If the hierarchy becomes too deep the resulting objects will become too small, which can decrease performance.
Spark can be used to partition data using the partitionBy command. For example, the commands below can be run in the Spark shell to partition by the “dt” column when converting the dataset from JSON to Parquet:
val in_path: String = "cos://metergen.gp/json/"
val out_path: String = "cos://metergen.gp/parquet/"
To partition according to 2 columns (e.g. dt and city), use partitionBy(“dt”,”city”)·
Scroll to view full table
When partitioning according to a column that has NULL values, Spark will use “__HIVE_DEFAULT_PARTITION__” in the object name. For example, if our dataset is in JSON format, and some of the rows are missing the “city” field, we might get the following object name after partitioning by dt and city comprising rows without a city: metergen/csv/dt=2017-08-22/city=__HIVE_DEFAULT_PARTITION__/3-I-200000-240000.csv
The research leading to these results has received funding from the European Community’s Horizon 2020 research and innovation programme under grant agreements n°779747 and 644182