Data Analytics

How to Layout Big Data in IBM Cloud Object Storage for Spark SQL

Share this post:

When you have vast quantities of rectangular data, the way you lay it out in object storage systems like IBM Cloud Object Storage (COS) makes a big difference to both the cost and performance of SQL queries; however, this task is not as simple as it sounds. Here we survey some tricks of the trade.

A rectangular dataset can be conceptually thought of  as a table with rows and columns, where each column is assigned a certain data type (integer, string etc.).  This blog is relevant for anyone using Apache Spark SQL (Spark SQL) on data in COS. In particular, it is relevant to users of:

Summary of main takeaways:

  1. Aim for objects sizes of around 128 MB
    • Use maxRecordsPerFile to avoid large objects
    • Use coalelsce and repartition commands to avoid small objects
  2. Use Parquet
  3. Never compress large CSV or JSON objects with Gzip
  4. Use Hive style partitioning
    • Month, date or hour is usually a good top level partitioning column
    • Use additional partitioning column(s) if they are queried frequently
    • Avoid using partitioning columns with cardinalities of 1000 or more

The GridPocket Use Case

GridPocket is a smart grid company developing energy management applications and cloud services for electricity, water, and gas utilities. We used their open source meter_gen data generator to generate meter readings, accompanied by timestamps, geospatial locations and additional information, as CSV data and stored that in COS in a bucket called metergen. Then, we used SQL query to write queries against this dataset.

An example row in JSON format is shown here:

{"vid":"METER753724",
"date":"2017-09-16T21:00:00.000Z",
"index":8366.24,
"sumHC":1.22201,
"sumHP":7.144230000000001,
"type":"elec",
"size":70,
"temp":14.93,
"city":"Paris",
"region":75,
"lat":48.826252,
"lng":2.36259}

Detailed schema information is shown here:

Field Name Unit Data type Description
date seconds integer timestamp of the meter reading
index kilowatt hours float the electrical meter reading
sumHC kilowatt hours float total energy consumed since midnight during off peak hours
sumHP kilowatt hours float total energy consumed since midnight during peak hours
type enumerated string meter type: electricity or gas
vid identifier integer the meter id
size square meters integer residence size
temp celcius float temperature at meter location for specified date/timestamp
city N/A string city where the meter is located
region a number in the range [1,95] integer number of the French region for the meter location
latitude degrees float latitude of the meter location
longitude degrees float longitude of the meter location

 

To this schema, we added a derived column called “dt” to the meter_gen data that contains only the date portion of the timestamp. This means for the row above, we added a column "dt":"2017-09-16".

Object Storage Background

Objects are similar to files, but there are important differences. Like files, objects have both data and metadata, although object metadata is much richer. Unlike files, object data is written once and never modified; you cannot append or update data to an object although you can overwrite it completely. Note, however, that even though update in place is not possible, one can still read an object starting from a certain offset and finishing at another offset within the object. There is no rename operation for objects, so renaming objects can only be done by rewriting the entire object with a new name and deleting the old one.  Objects are accessed via a RESTful API with commands to PUT, GET, POST and DELETE objects. Unlike filesystems, object storage has a flat namespace, although directory hierarchies can be simulated by using some delimiter such as forward slashes ‘/’ in object names. Note that changing the data layout will typically require rewriting the dataset, so it is important to get it right the first time.

Dataset and Object Sizes

We refer to data in the 1 TB range or more to be Big Data, since at this scale the data layout can have a significant effect on performance. Typically, we expect such a dataset to comprise multiple objects in the same COS bucket, where each object contains some subset of the rows. For example, below is a partial object listing of the metergen bucket. Each object has 128 MB of data in CSV format, and all the objects in the listing belong to the same logical dataset. Each object in the listing follows a similar reference pattern; for instance, the first object comprises “metergen” as the bucket name followed by “csv/3-I-0-40000.csv” as the object name. The ‘/’ in object names allows representing virtual folder hierarchies, but the namespace is actually flat.

2017-11-15 15:52:48  128.0 MiB metergen/csv/3-I-0-40000.csv
2017-11-15 15:52:51  128.0 MiB metergen/csv/3-I-120000-160000.csv
2017-11-15 15:52:51  128.0 MiB metergen/csv/3-I-160000-200000.csv
2017-11-15 15:52:52  128.0 MiB metergen/csv/3-I-200000-240000.csv
2017-11-15 15:52:54  128.0 MiB metergen/csv/3-I-240000-280000.csv
2017-11-15 15:52:56  128.0 MiB metergen/csv/3-I-280000-320000.csv
2017-11-15 15:52:58  128.0 MiB metergen/csv/3-I-320000-360000.csv
2017-11-15 15:52:58  128.0 MiB metergen/csv/3-I-360000-400000.csv
2017-11-15 15:53:00  128.0 MiB metergen/csv/3-I-40000-80000.csv

In general, analytics over data stored in an object storage system works best on equally sized objects where the objects are not too small. Objects should typically be larger than 10 MB, and for Hadoop/Spark analytics, 128 MB sized objects are usually closer to the sweet spot. When using object storage, there is a certain overhead for each REST request. Smaller object sizes inflate the number of REST requests required to analyze the data. If the user is charged per REST request, this can also inflate the cost to access the data.  When Spark performs analytics against larger objects (> 128 MB), it can still assign different sections of the large object to different tasks. However, in many cases better performance is achieved by splitting larger objects into smaller ones since this enables more parallelism in the object store.

Tips:

  • Aim for objects sizes of around 128 MB
  • When using Apache Spark to generate or reorganize data,
    1. the maxRecordsPerFile configuration parameter can be used to limit object sizes, and is supported in Spark 2.2 and beyond.
    2. Spark’s coalesce and repartition commands can be used to reduce the number of partitions before writing to storage, which can reduce the number of objects and increase object size. Coalesce is preferable to repartition since it doesn’t require a shuffle.

Storage Formats

SQL Query currently supports CSV, JSON, Parquet, and ORC formats, and new formats may be added in future. Both Parquet and ORC are column based formats specialized for big data analytics and are well integrated into Spark. The Parquet integration in Spark is more mature, although ORC is catching up.

Column based formats physically store data column by column instead of row by row. This allows Spark to support column pruning, so that only columns requested by a query need to be retrieved from COS. They also support column-wise compression, which is typically more efficient than the compression of row based formats, while preserving the basic columnar structure of the data. This means that Spark tasks can access a particular compressed column without necessarily needing to decompress other columns.

Moreover, Parquet and ORC support specialized metadata that can be used to filter columns according to query predicates.  These formats typically store metadata in a footer at the end of the object which contains the schema as well as column offsets and statistics. This provides another important benefit – no schema inference is required. Formats such as JSON and CSV require schema inference to deduce the schema automatically from the dataset – in general a full scan is needed prior to query processing.

In summary, Parquet can potentially provide more than an order of magnitude performance improvement over other formats such as CSV. Conversion to Parquet and ORC can provide significant benefit when the dataset will be extensively queried because of:

  1. Decreased storage costs by using column-wise compression
  2. Reduced query cost and better query performance by scanning less data
  3. Schema inference is avoided

Tips:

  • Never compress large CSV or JSON objects with Gzip. Since Spark distributes work across multiple tasks, each task ideally reads some byte range of an object. However, Gzip is not a “splittable” compression algorithm. This means that in order to read a byte range of a gzipped object, each task will need to decompress the object starting from byte 0. Therefore, in this case a single task will decompress the entire object and share the result with other tasks, which can result in out of memory conditions and requires a shuffle to repartition the result to workers.
  • CSV is preferable to JSON for big data because it is less verbose.
  • Spark SQL can be used to convert data from one format to another. For example, the commands below can be run in the Spark shell to convert a dataset from JSON to Parquet. Note: the Stocator connector is recommended.

Hive Style Partitioning

We refer to Hive style partitioning as a naming convention employed by Hive to store big datasets by mimicking virtual folder hierarchies and encoding information about object contents in object names. For example, the dataset below contains CSV objects, where the root of the hierarchy is “metergen/csv”. Here we partition according to a date column called “dt” so that each object contains rows with the same date, and the respective date is encoded in the object name. For example, all rows in object “metergen/csv/dt=2017-08-21/3-I-0-40000.csv” have the date value “2017-08-21” in the “dt” column.  Using this convention, the dt column is omitted from the object contents. Spark SQL understands this convention and can filter the set of objects retrieved when the query predicates apply to partitioning columns. For our example below, if we query for data from August 22nd, the first 4 objects in the list can be skipped. This can significantly reduce the number of bytes scanned in COS. Spark can also do this for range queries.

metergen/csv/dt=2017-08-21/3-I-0-40000.csv
metergen/csv/dt=2017-08-21/3-I-120000-160000.csv
metergen/csv/dt=2017-08-21/3-I-160000-200000.csv
metergen/csv/dt=2017-08-21/3-I-200000-240000.csv
metergen/csv/dt=2017-08-22/3-I-0-40000.csv
metergen/csv/dt=2017-08-22/3-I-120000-160000.csv
metergen/csv/dt=2017-08-22/3-I-160000-200000.csv
metergen/csv/dt=2017-08-22/3-I-200000-240000.csv

One can partition the data according to multiple columns, which results in multiple columns encoded into the object names, one after the other. For example, partitioning according to both "dt" and "city" could give the listing below:

metergen/csv/dt=2017-08-21/city=Paris/3-I-0-40000.csv
metergen/csv/dt=2017-08-21/city=Paris/3-I-120000-160000.csv
metergen/csv/dt=2017-08-22/city=Nice/3-I-0-40000.csv
metergen/csv/dt=2017-08-22/city=Nice/3-I-120000-160000.csv
metergen/csv/dt=2017-08-22/city=Nice/3-I-160000-200000.csv

Note that Hive style partitioning is orthogonal to data format – the above example could work equally well on Parquet, JSON or CSV data.

Tips:

  • Hive style partitioning should be done on columns with discrete types such as integers (e.g. age), or strings (e.g. city). It does not apply to non-discrete types such as floating point numbers (e.g. temperature).
  • If data is periodically ingested into COS then it makes sense to add new partitions for each period. For example, partition by date or month columns, and set these as the top level partitioning columns in the hierarchy. This has the added benefit that ingestion failures for that period can be more easily cleaned up.
  • If there are frequently queried columns, then these are candidates for additional partitioning columns. Columns which are queried often and whose associated predicates provide high selectivity into the dataset are the best choice.
  • A good partitioning column will typically divide the dataset into hundreds of partitions. Avoid using a column with a cardinality of 1000 or more.
  • Some partitioning columns will introduce skew in the amount of data associated with each partition. If possible, it is preferable to use a column that partitions the dataset evenly.
  • If the hierarchy becomes too deep the resulting objects will become too small, which can decrease performance.
  • Spark can be used to partition data using the partitionBy command. For example, the commands below can be run in the Spark shell to partition by the “dt” column when converting the dataset from JSON to Parquet:
    val in_path: String = "cos://metergen.gp/json/"
    val out_path: String = "cos://metergen.gp/parquet/"
    spark.read.format("json").load(in_path).write. partitionBy(“dt”).format("parquet").save(out_path)

    To partition according to 2 columns (e.g. dt and city), use partitionBy(“dt”,”city”)·
  • When partitioning according to a column that has NULL values, Spark will use “__HIVE_DEFAULT_PARTITION__” in the object name. For example, if our dataset is in JSON format, and some of the rows are missing the “city” field, we might get the following object name after partitioning by dt and city comprising rows without a city:
    metergen/csv/dt=2017-08-22/city=__HIVE_DEFAULT_PARTITION__/3-I-200000-240000.csv

 

The research leading to these results has received funding from the European Community’s Horizon 2020 research and innovation programme under grant agreements n°779747 and 644182

More Data Analytics stories
October 16, 2018

Growing Our Family of Elastic Data Warehouse Offerings

Meet Flex, the newest member of our elastic data warehouse family. It delivers independent scaling of storage and compute, self-service backup and restore, and fast-recovery HA in a configuration optimized for storage-dense workloads.

Continue reading

October 16, 2018

Serverless Data and Analytics

IBM Cloud has a really compelling portfolio of relevant serverless services that makes it a prime player in delivering a serverless data and analytics platform. All of these services have special mutual integration and optimization points.

Continue reading

September 25, 2018

IBM Streaming Analytics is Now Available in Germany

The IBM Streaming Analytics team is excited to announce the General Availability (GA) of IBM Streaming Analytics in Germany.

Continue reading